Avoid duplicate work by merging ReplyDistributionWorker into DistributionWorker (#9660)
This commit is contained in:
		| @@ -44,7 +44,6 @@ class PostStatusService < BaseService | ||||
|     DistributionWorker.perform_async(status.id) | ||||
|     Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id) | ||||
|     ActivityPub::DistributionWorker.perform_async(status.id) | ||||
|     ActivityPub::ReplyDistributionWorker.perform_async(status.id) if status.reply? && status.thread.account.local? | ||||
|  | ||||
|     if options[:idempotency].present? | ||||
|       redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id) | ||||
|   | ||||
| @@ -31,7 +31,14 @@ class ActivityPub::DistributionWorker | ||||
|   end | ||||
|  | ||||
|   def inboxes | ||||
|     @inboxes ||= @account.followers.inboxes | ||||
|     # Deliver the status to all followers. | ||||
|     # If the status is a reply to another local status, also forward it to that | ||||
|     # status' authors' followers. | ||||
|     @inboxes ||= if @status.reply? && @status.thread.account.local? && @status.distributable? | ||||
|                    @account.followers.or(@status.thread.account.followers).inboxes | ||||
|                  else | ||||
|                    @account.followers.inboxes | ||||
|                  end | ||||
|   end | ||||
|  | ||||
|   def signed_payload | ||||
|   | ||||
| @@ -1,42 +0,0 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class ActivityPub::ReplyDistributionWorker | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   sidekiq_options queue: 'push' | ||||
|  | ||||
|   def perform(status_id) | ||||
|     @status  = Status.find(status_id) | ||||
|     @account = @status.thread&.account | ||||
|  | ||||
|     return unless @account.present? && @status.distributable? | ||||
|  | ||||
|     ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| | ||||
|       [payload, @status.account_id, inbox_url] | ||||
|     end | ||||
|   rescue ActiveRecord::RecordNotFound | ||||
|     true | ||||
|   end | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def inboxes | ||||
|     @inboxes ||= @account.followers.inboxes | ||||
|   end | ||||
|  | ||||
|   def signed_payload | ||||
|     Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@status.account)) | ||||
|   end | ||||
|  | ||||
|   def unsigned_payload | ||||
|     ActiveModelSerializers::SerializableResource.new( | ||||
|       @status, | ||||
|       serializer: ActivityPub::ActivitySerializer, | ||||
|       adapter: ActivityPub::Adapter | ||||
|     ).as_json | ||||
|   end | ||||
|  | ||||
|   def payload | ||||
|     @payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload) | ||||
|   end | ||||
| end | ||||
		Reference in New Issue
	
	Block a user