Serialize ActivityPub alternate link into OStatus deletes, handle it (#4730)
Requires moving Atom rendering from DistributionWorker (where `stream_entry.status` is already nil) to inline (where `stream_entry.status.destroyed?` is true) and distributing that. Unfortunately, such XML renderings can no longer be easily chained together into one payload of n items.
This commit is contained in:
		| @@ -3,7 +3,9 @@ | ||||
| class OStatus::Activity::Deletion < OStatus::Activity::Base | ||||
|   def perform | ||||
|     Rails.logger.debug "Deleting remote status #{id}" | ||||
|     status = Status.find_by(uri: id, account: @account) | ||||
|  | ||||
|     status   = Status.find_by(uri: id, account: @account) | ||||
|     status ||= Status.find_by(uri: activitypub_uri, account: @account) if activitypub_uri? | ||||
|  | ||||
|     if status.nil? | ||||
|       redis.setex("delete_upon_arrival:#{@account.id}:#{id}", 6 * 3_600, id) | ||||
|   | ||||
| @@ -79,6 +79,9 @@ class OStatus::AtomSerializer | ||||
|  | ||||
|     if stream_entry.status.nil? | ||||
|       append_element(entry, 'content', 'Deleted status') | ||||
|     elsif stream_entry.status.destroyed? | ||||
|       append_element(entry, 'content', 'Deleted status') | ||||
|       append_element(entry, 'link', nil, rel: :alternate, type: 'application/activity+json', href: ActivityPub::TagManager.instance.uri_for(stream_entry.status)) if stream_entry.account.local? | ||||
|     else | ||||
|       serialize_status_attributes(entry, stream_entry.status) | ||||
|     end | ||||
|   | ||||
| @@ -51,6 +51,7 @@ class Status < ApplicationRecord | ||||
|  | ||||
|   has_one :notification, as: :activity, dependent: :destroy | ||||
|   has_one :preview_card, dependent: :destroy | ||||
|   has_one :stream_entry, as: :activity, inverse_of: :status | ||||
|  | ||||
|   validates :uri, uniqueness: true, unless: :local? | ||||
|   validates :text, presence: true, unless: :reblog? | ||||
| @@ -90,7 +91,11 @@ class Status < ApplicationRecord | ||||
|   end | ||||
|  | ||||
|   def verb | ||||
|     reblog? ? :share : :post | ||||
|     if destroyed? | ||||
|       :delete | ||||
|     else | ||||
|       reblog? ? :share : :post | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   def object_type | ||||
| @@ -110,7 +115,11 @@ class Status < ApplicationRecord | ||||
|   end | ||||
|  | ||||
|   def title | ||||
|     reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}" | ||||
|     if destroyed? | ||||
|       "#{account.acct} deleted status" | ||||
|     else | ||||
|       reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}" | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   def hidden? | ||||
|   | ||||
| @@ -20,9 +20,10 @@ class BatchedRemoveStatusService < BaseService | ||||
|     @activity_json_batches = [] | ||||
|     @json_payloads         = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h | ||||
|     @activity_json         = {} | ||||
|     @activity_xml          = {} | ||||
|  | ||||
|     # Ensure that rendered XML reflects destroyed state | ||||
|     Status.where(id: statuses.map(&:id)).in_batches.destroy_all | ||||
|     statuses.each(&:destroy) | ||||
|  | ||||
|     # Batch by source account | ||||
|     statuses.group_by(&:account_id).each do |_, account_statuses| | ||||
| @@ -31,7 +32,7 @@ class BatchedRemoveStatusService < BaseService | ||||
|       unpush_from_home_timelines(account_statuses) | ||||
|  | ||||
|       if account.local? | ||||
|         batch_stream_entries(account_statuses) | ||||
|         batch_stream_entries(account, account_statuses) | ||||
|         batch_activity_json(account, account_statuses) | ||||
|       end | ||||
|     end | ||||
| @@ -42,18 +43,16 @@ class BatchedRemoveStatusService < BaseService | ||||
|       batch_salmon_slaps(status) if status.local? | ||||
|     end | ||||
|  | ||||
|     Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } | ||||
|     Pubsubhubbub::RawDistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } | ||||
|     NotificationWorker.push_bulk(@salmon_batches) { |batch| batch } | ||||
|     ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch } | ||||
|   end | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def batch_stream_entries(statuses) | ||||
|     stream_entry_ids = statuses.map { |s| s.stream_entry.id } | ||||
|  | ||||
|     stream_entry_ids.each_slice(100) do |batch_of_stream_entry_ids| | ||||
|       @stream_entry_batches << [batch_of_stream_entry_ids] | ||||
|   def batch_stream_entries(account, statuses) | ||||
|     statuses.each do |status| | ||||
|       @stream_entry_batches << [build_xml(status.stream_entry), account.id] | ||||
|     end | ||||
|   end | ||||
|  | ||||
| @@ -101,11 +100,10 @@ class BatchedRemoveStatusService < BaseService | ||||
|   def batch_salmon_slaps(status) | ||||
|     return if @mentions[status.id].empty? | ||||
|  | ||||
|     payload    = stream_entry_to_xml(status.stream_entry.reload) | ||||
|     recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id) | ||||
|  | ||||
|     recipients.each do |recipient_id| | ||||
|       @salmon_batches << [payload, status.account_id, recipient_id] | ||||
|       @salmon_batches << [build_xml(status.stream_entry), status.account_id, recipient_id] | ||||
|     end | ||||
|   end | ||||
|  | ||||
| @@ -145,6 +143,12 @@ class BatchedRemoveStatusService < BaseService | ||||
|     ).as_json) | ||||
|   end | ||||
|  | ||||
|   def build_xml(stream_entry) | ||||
|     return @activity_xml[stream_entry.id] if @activity_xml.key?(stream_entry.id) | ||||
|  | ||||
|     @activity_xml[stream_entry.id] = stream_entry_to_xml(stream_entry) | ||||
|   end | ||||
|  | ||||
|   def sign_json(status, json) | ||||
|     Oj.dump(ActivityPub::LinkedDataSignature.new(json).sign!(status.account)) | ||||
|   end | ||||
|   | ||||
| @@ -22,8 +22,6 @@ class RemoveStatusService < BaseService | ||||
|  | ||||
|     return unless @account.local? | ||||
|  | ||||
|     @stream_entry = @stream_entry.reload | ||||
|  | ||||
|     remove_from_remote_followers | ||||
|     remove_from_remote_affected | ||||
|   end | ||||
| @@ -62,7 +60,7 @@ class RemoveStatusService < BaseService | ||||
|  | ||||
|   def remove_from_remote_followers | ||||
|     # OStatus | ||||
|     Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id) | ||||
|     Pubsubhubbub::RawDistributionWorker.perform_async(salmon_xml, @account.id) | ||||
|  | ||||
|     # ActivityPub | ||||
|     ActivityPub::DeliveryWorker.push_bulk(@account.followers.inboxes) do |inbox_url| | ||||
|   | ||||
							
								
								
									
										22
									
								
								app/workers/pubsubhubbub/raw_distribution_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								app/workers/pubsubhubbub/raw_distribution_worker.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,22 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class Pubsubhubbub::RawDistributionWorker | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   sidekiq_options queue: 'push' | ||||
|  | ||||
|   def perform(xml, source_account_id) | ||||
|     @account       = Account.find(source_account_id) | ||||
|     @subscriptions = active_subscriptions.to_a | ||||
|  | ||||
|     Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription| | ||||
|       [subscription.id, xml] | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def active_subscriptions | ||||
|     Subscription.where(account: @account).active.select('id, callback_url, domain') | ||||
|   end | ||||
| end | ||||
| @@ -48,11 +48,10 @@ RSpec.describe BatchedRemoveStatusService do | ||||
|     expect(Redis.current).to have_received(:publish).with('timeline:public', any_args).at_least(:once) | ||||
|   end | ||||
|  | ||||
|   it 'sends PuSH update to PuSH subscribers with two payloads united' do | ||||
|   it 'sends PuSH update to PuSH subscribers' do | ||||
|     expect(a_request(:post, 'http://example.com/push').with { |req| | ||||
|       matches = req.body.scan(TagManager::VERBS[:delete]) | ||||
|       matches.size == 2 | ||||
|     }).to have_been_made | ||||
|       matches = req.body.match(TagManager::VERBS[:delete]) | ||||
|     }).to have_been_made.at_least_once | ||||
|   end | ||||
|  | ||||
|   it 'sends Salmon slap to previously mentioned users' do | ||||
|   | ||||
		Reference in New Issue
	
	Block a user