Unverified Commit 86748148 authored by Eugen Rochko's avatar Eugen Rochko Committed by GitHub

Change tootctl to use inline parallelization instead of Sidekiq (#11776)

- Remove --background option
- Add --concurrency(=5) option
- Add progress bars
parent 9045f5e3
......@@ -77,6 +77,7 @@ gem 'rails-settings-cached', '~> 0.6'
gem 'redis', '~> 4.1', require: ['redis', 'redis/connection/hiredis']
gem 'mario-redis-lock', '~> 1.2', require: 'redis_lock'
gem 'rqrcode', '~> 0.10'
gem 'ruby-progressbar', '~> 1.10'
gem 'sanitize', '~> 5.1'
gem 'sidekiq', '~> 5.2'
gem 'sidekiq-scheduler', '~> 3.0'
......
......@@ -769,6 +769,7 @@ DEPENDENCIES
rspec-sidekiq (~> 3.0)
rubocop (~> 0.74)
rubocop-rails (~> 2.3)
ruby-progressbar (~> 1.10)
sanitize (~> 5.1)
sidekiq (~> 5.2)
sidekiq-bulk (~> 0.2.0)
......
......@@ -129,6 +129,7 @@ class MediaAttachment < ApplicationRecord
scope :unattached, -> { where(status_id: nil, scheduled_status_id: nil) }
scope :local, -> { where(remote_url: '') }
scope :remote, -> { where.not(remote_url: '') }
scope :cached, -> { remote.where.not(file_file_name: nil) }
default_scope { order(id: :asc) }
......
......@@ -43,6 +43,8 @@ class PreviewCard < ApplicationRecord
validates_attachment_size :image, less_than: LIMIT
remotable_attachment :image, LIMIT
scope :cached, -> { where.not(image_file_name: [nil, '']) }
before_save :extract_dimensions, if: :link?
def save_with_optional_image!
......
# frozen_string_literal: true
class Maintenance::DestroyMediaWorker
include Sidekiq::Worker
sidekiq_options queue: 'pull'
def perform(media_attachment_id)
media = media_attachment_id.is_a?(MediaAttachment) ? media_attachment_id : MediaAttachment.find(media_attachment_id)
media.destroy
rescue ActiveRecord::RecordNotFound
true
end
end
# frozen_string_literal: true
class Maintenance::RedownloadAccountMediaWorker
include Sidekiq::Worker
sidekiq_options queue: 'pull', retry: false
def perform(account_id)
account = account_id.is_a?(Account) ? account_id : Account.find(account_id)
account.reset_avatar!
account.reset_header!
account.save
rescue ActiveRecord::RecordNotFound
true
end
end
# frozen_string_literal: true
class Maintenance::UncacheMediaWorker
include Sidekiq::Worker
sidekiq_options queue: 'pull'
def perform(media_attachment_id)
media = media_attachment_id.is_a?(MediaAttachment) ? media_attachment_id : MediaAttachment.find(media_attachment_id)
return if media.file.blank?
media.file.destroy
media.save
rescue ActiveRecord::RecordNotFound
true
end
end
# frozen_string_literal: true
class Maintenance::UncachePreviewWorker
include Sidekiq::Worker
sidekiq_options queue: 'pull'
def perform(preview_card_id)
preview_card = PreviewCard.find(preview_card_id)
return if preview_card.image.blank?
preview_card.image.destroy
preview_card.save
rescue ActiveRecord::RecordNotFound
true
end
end
This diff is collapsed.
......@@ -6,6 +6,8 @@ require_relative 'cli_helper'
module Mastodon
class CacheCLI < Thor
include CLIHelper
def self.exit_on_failure?
true
end
......@@ -16,6 +18,8 @@ module Mastodon
say('OK', :green)
end
option :concurrency, type: :numeric, default: 5, aliases: [:c]
option :verbose, type: :boolean, aliases: [:v]
desc 'recount TYPE', 'Update hard-cached counters'
long_desc <<~LONG_DESC
Update hard-cached counters of TYPE by counting referenced
......@@ -25,32 +29,24 @@ module Mastodon
size of the database.
LONG_DESC
def recount(type)
processed = 0
case type
when 'accounts'
Account.local.includes(:account_stat).find_each do |account|
processed, = parallelize_with_progress(Account.local.includes(:account_stat)) do |account|
account_stat = account.account_stat
account_stat.following_count = account.active_relationships.count
account_stat.followers_count = account.passive_relationships.count
account_stat.statuses_count = account.statuses.where.not(visibility: :direct).count
account_stat.save if account_stat.changed?
processed += 1
say('.', :green, false)
end
when 'statuses'
Status.includes(:status_stat).find_each do |status|
processed, = parallelize_with_progress(Status.includes(:status_stat)) do |status|
status_stat = status.status_stat
status_stat.replies_count = status.replies.where.not(visibility: :direct).count
status_stat.reblogs_count = status.reblogs.count
status_stat.favourites_count = status.favourites.count
status_stat.save if status_stat.changed?
processed += 1
say('.', :green, false)
end
else
say("Unknown type: #{type}", :red)
......
......@@ -7,3 +7,52 @@ ActiveRecord::Base.logger = dev_null
ActiveJob::Base.logger = dev_null
HttpLog.configuration.logger = dev_null
Paperclip.options[:log] = false
module Mastodon
module CLIHelper
def create_progress_bar(total = nil)
ProgressBar.create(total: total, format: '%c/%u |%b%i| %e')
end
def parallelize_with_progress(scope)
ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency]
progress = create_progress_bar(scope.count)
pool = Concurrent::FixedThreadPool.new(options[:concurrency])
total = Concurrent::AtomicFixnum.new(0)
aggregate = Concurrent::AtomicFixnum.new(0)
scope.reorder(nil).find_in_batches do |items|
futures = []
items.each do |item|
futures << Concurrent::Future.execute(executor: pool) do
ActiveRecord::Base.connection_pool.with_connection do
begin
progress.log("Processing #{item.id}") if options[:verbose]
result = yield(item)
aggregate.increment(result) if result.is_a?(Integer)
rescue => e
progress.log pastel.red("Error processing #{item.id}: #{e}")
ensure
progress.increment
end
end
end
end
total.increment(items.size)
futures.map(&:value)
end
progress.finish
[total.value, aggregate.value]
end
def pastel
@pastel ||= Pastel.new
end
end
end
......@@ -7,10 +7,14 @@ require_relative 'cli_helper'
module Mastodon
class DomainsCLI < Thor
include CLIHelper
def self.exit_on_failure?
true
end
option :concurrency, type: :numeric, default: 5, aliases: [:c]
option :verbose, type: :boolean, aliases: [:v]
option :dry_run, type: :boolean
option :whitelist_mode, type: :boolean
desc 'purge [DOMAIN]', 'Remove accounts from a DOMAIN without a trace'
......@@ -24,7 +28,6 @@ module Mastodon
are removed from the database.
LONG_DESC
def purge(domain = nil)
removed = 0
dry_run = options[:dry_run] ? ' (DRY RUN)' : ''
scope = begin
......@@ -38,25 +41,22 @@ module Mastodon
end
end
scope.find_each do |account|
processed, = parallelize_with_progress(scope) do |account|
SuspendAccountService.new.call(account, destroy: true) unless options[:dry_run]
removed += 1
say('.', :green, false)
end
DomainBlock.where(domain: domain).destroy_all unless options[:dry_run]
say
say("Removed #{removed} accounts#{dry_run}", :green)
say("Removed #{processed} accounts#{dry_run}", :green)
custom_emojis = CustomEmoji.where(domain: domain)
custom_emojis_count = custom_emojis.count
custom_emojis.destroy_all unless options[:dry_run]
say("Removed #{custom_emojis_count} custom emojis", :green)
end
option :concurrency, type: :numeric, default: 50, aliases: [:c]
option :silent, type: :boolean, default: false, aliases: [:s]
option :format, type: :string, default: 'summary', aliases: [:f]
option :exclude_suspended, type: :boolean, default: false, aliases: [:x]
desc 'crawl [START]', 'Crawl all known peers, optionally beginning at START'
......@@ -69,8 +69,6 @@ module Mastodon
The --concurrency (-c) option controls the number of threads performing HTTP
requests at the same time. More threads means the crawl may complete faster.
The --silent (-s) option controls progress output.
The --format (-f) option controls how the data is displayed at the end. By
default (`summary`), a summary of the statistics is returned. The other options
are `domains`, which returns a newline-delimited list of all discovered peers,
......@@ -87,6 +85,7 @@ module Mastodon
start_at = Time.now.to_f
seed = start ? [start] : Account.remote.domains
blocked_domains = Regexp.new('\\.?' + DomainBlock.where(severity: 1).pluck(:domain).join('|') + '$')
progress = create_progress_bar
pool = Concurrent::ThreadPoolExecutor.new(min_threads: 0, max_threads: options[:concurrency], idletime: 10, auto_terminate: true, max_queue: 0)
......@@ -95,7 +94,6 @@ module Mastodon
next if options[:exclude_suspended] && domain.match(blocked_domains)
stats[domain] = nil
processed.increment
begin
Request.new(:get, "https://#{domain}/api/v1/instance").perform do |res|
......@@ -115,11 +113,11 @@ module Mastodon
next unless res.code == 200
stats[domain]['activity'] = Oj.load(res.to_s)
end
say('.', :green, false) unless options[:silent]
rescue StandardError
failed.increment
say('.', :red, false) unless options[:silent]
ensure
processed.increment
progress.increment unless progress.finished?
end
end
......@@ -133,10 +131,9 @@ module Mastodon
pool.shutdown
pool.wait_for_termination(20)
ensure
progress.finish
pool.shutdown
say unless options[:silent]
case options[:format]
when 'summary'
stats_to_summary(stats, processed, failed, start_at)
......
......@@ -6,55 +6,33 @@ require_relative 'cli_helper'
module Mastodon
class FeedsCLI < Thor
include CLIHelper
def self.exit_on_failure?
true
end
option :all, type: :boolean, default: false
option :background, type: :boolean, default: false
option :concurrency, type: :numeric, default: 5, aliases: [:c]
option :verbose, type: :boolean, aliases: [:v]
option :dry_run, type: :boolean, default: false
option :verbose, type: :boolean, default: false
desc 'build [USERNAME]', 'Build home and list feeds for one or all users'
long_desc <<-LONG_DESC
Build home and list feeds that are stored in Redis from the database.
With the --all option, all active users will be processed.
Otherwise, a single user specified by USERNAME.
With the --background option, regeneration will be queued into Sidekiq,
and the command will exit as soon as possible.
With the --dry-run option, no work will be done.
With the --verbose option, when accounts are processed sequentially in the
foreground, the IDs of the accounts will be printed.
LONG_DESC
def build(username = nil)
dry_run = options[:dry_run] ? '(DRY RUN)' : ''
if options[:all] || username.nil?
processed = 0
queued = 0
User.active.select(:id, :account_id).reorder(nil).find_in_batches do |users|
if options[:background]
RegenerationWorker.push_bulk(users.map(&:account_id)) unless options[:dry_run]
queued += users.size
else
users.each do |user|
RegenerationWorker.new.perform(user.account_id) unless options[:dry_run]
options[:verbose] ? say(user.account_id) : say('.', :green, false)
processed += 1
end
end
processed, = parallelize_with_progress(Account.joins(:user).merge(User.active)) do |account|
PrecomputeFeedService.new.call(account) unless options[:dry_run]
end
if options[:background]
say("Scheduled feed regeneration for #{queued} accounts #{dry_run}", :green, true)
else
say
say("Regenerated feeds for #{processed} accounts #{dry_run}", :green, true)
end
say("Regenerated feeds for #{processed} accounts #{dry_run}", :green, true)
elsif username.present?
account = Account.find_local(username)
......@@ -63,11 +41,7 @@ module Mastodon
exit(1)
end
if options[:background]
RegenerationWorker.perform_async(account.id) unless options[:dry_run]
else
RegenerationWorker.new.perform(account.id) unless options[:dry_run]
end
PrecomputeFeedService.new.call(account) unless options[:dry_run]
say("OK #{dry_run}", :green, true)
else
......
......@@ -7,14 +7,15 @@ require_relative 'cli_helper'
module Mastodon
class MediaCLI < Thor
include ActionView::Helpers::NumberHelper
include CLIHelper
def self.exit_on_failure?
true
end
option :days, type: :numeric, default: 7
option :background, type: :boolean, default: false
option :verbose, type: :boolean, default: false
option :days, type: :numeric, default: 7, aliases: [:d]
option :concurrency, type: :numeric, default: 5, aliases: [:c]
option :verbose, type: :boolean, default: false, aliases: [:v]
option :dry_run, type: :boolean, default: false
desc 'remove', 'Remove remote media files'
long_desc <<-DESC
......@@ -22,49 +23,25 @@ module Mastodon
The --days option specifies how old media attachments have to be before
they are removed. It defaults to 7 days.
With the --background option, instead of deleting the files sequentially,
they will be queued into Sidekiq and the command will exit as soon as
possible. In Sidekiq they will be processed with higher concurrency, but
it may impact other operations of the Mastodon server, and it may overload
the underlying file storage.
With the --dry-run option, no work will be done.
With the --verbose option, when media attachments are processed sequentially in the
foreground, the IDs of the media attachments will be printed.
DESC
def remove
time_ago = options[:days].days.ago
queued = 0
processed = 0
size = 0
dry_run = options[:dry_run] ? '(DRY RUN)' : ''
time_ago = options[:days].days.ago
dry_run = options[:dry_run] ? '(DRY RUN)' : ''
if options[:background]
MediaAttachment.where.not(remote_url: '').where.not(file_file_name: nil).where('created_at < ?', time_ago).select(:id, :file_file_size).reorder(nil).find_in_batches do |media_attachments|
queued += media_attachments.size
size += media_attachments.reduce(0) { |sum, m| sum + (m.file_file_size || 0) }
Maintenance::UncacheMediaWorker.push_bulk(media_attachments.map(&:id)) unless options[:dry_run]
end
else
MediaAttachment.where.not(remote_url: '').where.not(file_file_name: nil).where('created_at < ?', time_ago).reorder(nil).find_in_batches do |media_attachments|
media_attachments.each do |m|
size += m.file_file_size || 0
Maintenance::UncacheMediaWorker.new.perform(m) unless options[:dry_run]
options[:verbose] ? say(m.id) : say('.', :green, false)
processed += 1
end
end
end
processed, aggregate = parallelize_with_progress(MediaAttachment.cached.where.not(remote_url: '').where('created_at < ?', time_ago)) do |media_attachment|
next if media_attachment.file.blank?
size = media_attachment.file_file_size
say
unless options[:dry_run]
media_attachment.file.destroy
media_attachment.save
end
if options[:background]
say("Scheduled the deletion of #{queued} media attachments (approx. #{number_to_human_size(size)}) #{dry_run}", :green, true)
else
say("Removed #{processed} media attachments (approx. #{number_to_human_size(size)}) #{dry_run}", :green, true)
size
end
say("Removed #{processed} media attachments (approx. #{number_to_human_size(aggregate)}) #{dry_run}", :green, true)
end
end
end
......@@ -8,87 +8,52 @@ require_relative 'cli_helper'
module Mastodon
class PreviewCardsCLI < Thor
include ActionView::Helpers::NumberHelper
include CLIHelper
def self.exit_on_failure?
true
end
option :days, type: :numeric, default: 180
option :background, type: :boolean, default: false
option :verbose, type: :boolean, default: false
option :concurrency, type: :numeric, default: 5, aliases: [:c]
option :verbose, type: :boolean, aliases: [:v]
option :dry_run, type: :boolean, default: false
option :link, type: :boolean, default: false
desc 'remove', 'Remove preview cards'
long_desc <<-DESC
Removes locally thumbnails for previews.
Removes local thumbnails for preview cards.
The --days option specifies how old preview cards have to be before
they are removed. It defaults to 180 days.
they are removed. It defaults to 180 days. Since preview cards will
not be re-fetched unless the link is re-posted after 2 weeks from
last time, it is not recommended to delete preview cards within the
last 14 days.
With the --background option, instead of deleting the files sequentially,
they will be queued into Sidekiq and the command will exit as soon as
possible. In Sidekiq they will be processed with higher concurrency, but
it may impact other operations of the Mastodon server, and it may overload
the underlying file storage.
With the --dry-run option, no work will be done.
With the --verbose option, when preview cards are processed sequentially in the
foreground, the IDs of the preview cards will be printed.
With the --link option, delete only link-type preview cards.
With the --link option, only link-type preview cards will be deleted,
leaving video and photo cards untouched.
DESC
def remove
prompt = TTY::Prompt.new
time_ago = options[:days].days.ago
queued = 0
processed = 0
size = 0
dry_run = options[:dry_run] ? '(DRY RUN)' : ''
link = options[:link] ? 'link-type ' : ''
scope = PreviewCard.where.not(image_file_name: nil)
scope = scope.where.not(image_file_name: '')
scope = scope.where(type: :link) if options[:link]
scope = scope.where('updated_at < ?', time_ago)
time_ago = options[:days].days.ago
dry_run = options[:dry_run] ? ' (DRY RUN)' : ''
link = options[:link] ? 'link-type ' : ''
scope = PreviewCard.cached
scope = scope.where(type: :link) if options[:link]
scope = scope.where('updated_at < ?', time_ago)
if time_ago > 2.weeks.ago
prompt.say "\n"
prompt.say('The preview cards less than the past two weeks will not be re-acquired even when needed.')
prompt.say "\n"
processed, aggregate = parallelize_with_progress(scope) do |preview_card|
next if preview_card.image.blank?
unless prompt.yes?('Are you sure you want to delete the preview cards?', default: false)
prompt.say "\n"
prompt.warn 'Nothing execute. Bye!'
prompt.say "\n"
exit(1)
end
end
size = preview_card.image_file_size
if options[:background]
scope.select(:id, :image_file_size).reorder(nil).find_in_batches do |preview_cards|
queued += preview_cards.size
size += preview_cards.reduce(0) { |sum, p| sum + (p.image_file_size || 0) }
Maintenance::UncachePreviewWorker.push_bulk(preview_cards.map(&:id)) unless options[:dry_run]
unless options[:dry_run]
preview_card.image.destroy
preview_card.save
end
else
scope.select(:id, :image_file_size).reorder(nil).find_in_batches do |preview_cards|
preview_cards.each do |p|
size += p.image_file_size || 0
Maintenance::UncachePreviewWorker.new.perform(p.id) unless options[:dry_run]
options[:verbose] ? say(p.id) : say('.', :green, false)
processed += 1
end
end
size
end
say
if options[:background]
say("Scheduled the deletion of #{queued} #{link}preview cards (approx. #{number_to_human_size(size)}) #{dry_run}", :green, true)
else
say("Removed #{processed} #{link}preview cards (approx. #{number_to_human_size(size)}) #{dry_run}", :green, true)
end
say("Removed #{processed} #{link}preview cards (approx. #{number_to_human_size(aggregate)})#{dry_run}", :green, true)
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment