Merge branch 'main' into glitch-soc/merge-upstream
Conflicts: - `Gemfile.lock`: Not a real conflict, upstream-updated dependency (redis) textually too close to glitch-soc-only dependecy. Updated redis gem like upstream did.
This commit is contained in:
@@ -47,7 +47,7 @@ class MoveWorker
|
||||
|
||||
def copy_account_notes!
|
||||
AccountNote.where(target_account: @source_account).find_each do |note|
|
||||
text = I18n.with_locale(note.account.user.locale || I18n.default_locale) do
|
||||
text = I18n.with_locale(note.account.user&.locale || I18n.default_locale) do
|
||||
I18n.t('move_handler.copy_account_note_text', acct: @source_account.acct)
|
||||
end
|
||||
|
||||
@@ -84,7 +84,7 @@ class MoveWorker
|
||||
|
||||
def add_account_note_if_needed!(account, id)
|
||||
unless AccountNote.where(account: account, target_account: @target_account).exists?
|
||||
text = I18n.with_locale(account.user.locale || I18n.default_locale) do
|
||||
text = I18n.with_locale(account.user&.locale || I18n.default_locale) do
|
||||
I18n.t(id, acct: @source_account.acct)
|
||||
end
|
||||
AccountNote.create!(account: account, target_account: @target_account, comment: text)
|
||||
|
96
app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
Normal file
96
app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
Normal file
@@ -0,0 +1,96 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Scheduler::AccountsStatusesCleanupScheduler
|
||||
include Sidekiq::Worker
|
||||
|
||||
# This limit is mostly to be nice to the fediverse at large and not
|
||||
# generate too much traffic.
|
||||
# This also helps limiting the running time of the scheduler itself.
|
||||
MAX_BUDGET = 50
|
||||
|
||||
# This is an attempt to spread the load across instances, as various
|
||||
# accounts are likely to have various followers.
|
||||
PER_ACCOUNT_BUDGET = 5
|
||||
|
||||
# This is an attempt to limit the workload generated by status removal
|
||||
# jobs to something the particular instance can handle.
|
||||
PER_THREAD_BUDGET = 5
|
||||
|
||||
# Those avoid loading an instance that is already under load
|
||||
MAX_DEFAULT_SIZE = 2
|
||||
MAX_DEFAULT_LATENCY = 5
|
||||
MAX_PUSH_SIZE = 5
|
||||
MAX_PUSH_LATENCY = 10
|
||||
# 'pull' queue has lower priority jobs, and it's unlikely that pushing
|
||||
# deletes would cause much issues with this queue if it didn't cause issues
|
||||
# with default and push. Yet, do not enqueue deletes if the instance is
|
||||
# lagging behind too much.
|
||||
MAX_PULL_SIZE = 500
|
||||
MAX_PULL_LATENCY = 300
|
||||
|
||||
# This is less of an issue in general, but deleting old statuses is likely
|
||||
# to cause delivery errors, and thus increase the number of jobs to be retried.
|
||||
# This doesn't directly translate to load, but connection errors and a high
|
||||
# number of dead instances may lead to this spiraling out of control if
|
||||
# unchecked.
|
||||
MAX_RETRY_SIZE = 50_000
|
||||
|
||||
sidekiq_options retry: 0, lock: :until_executed
|
||||
|
||||
def perform
|
||||
return if under_load?
|
||||
|
||||
budget = compute_budget
|
||||
first_policy_id = last_processed_id
|
||||
|
||||
loop do
|
||||
num_processed_accounts = 0
|
||||
|
||||
scope = AccountStatusesCleanupPolicy.where(enabled: true)
|
||||
scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
|
||||
scope.find_each(order: :asc) do |policy|
|
||||
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
|
||||
num_processed_accounts += 1 unless num_deleted.zero?
|
||||
budget -= num_deleted
|
||||
if budget.zero?
|
||||
save_last_processed_id(policy.id)
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
# The idea here is to loop through all policies at least once until the budget is exhausted
|
||||
# and start back after the last processed account otherwise
|
||||
break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
|
||||
first_policy_id = nil
|
||||
end
|
||||
end
|
||||
|
||||
def compute_budget
|
||||
threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
|
||||
[PER_THREAD_BUDGET * threads, MAX_BUDGET].min
|
||||
end
|
||||
|
||||
def under_load?
|
||||
return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
|
||||
queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def queue_under_load?(name, max_size, max_latency)
|
||||
queue = Sidekiq::Queue.new(name)
|
||||
queue.size > max_size || queue.latency > max_latency
|
||||
end
|
||||
|
||||
def last_processed_id
|
||||
Redis.current.get('account_statuses_cleanup_scheduler:last_account_id')
|
||||
end
|
||||
|
||||
def save_last_processed_id(id)
|
||||
if id.nil?
|
||||
Redis.current.del('account_statuses_cleanup_scheduler:last_account_id')
|
||||
else
|
||||
Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
|
||||
end
|
||||
end
|
||||
end
|
Reference in New Issue
Block a user