Fix processing of remote Delete activities (#16084)
* Add tests * Ensure deleted statuses are marked as such * Save some redis memory by not storing URIs in delete_upon_arrival values * Avoid possible race condition when processing incoming Deletes * Avoid potential duplicate Delete forwards * Lower lock durations to reduce issues in case of hard crash of the Rails process * Check for `lock.aquired?` and improve comment * Refactor RedisLock usage in app/lib/activitypub * Fix using incorrect or non-existent sender for relaying Deletes
This commit is contained in:
		| @@ -144,7 +144,7 @@ class ActivityPub::Activity | ||||
|   end | ||||
|  | ||||
|   def delete_later!(uri) | ||||
|     redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri) | ||||
|     redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true) | ||||
|   end | ||||
|  | ||||
|   def status_from_object | ||||
| @@ -210,12 +210,22 @@ class ActivityPub::Activity | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   def lock_or_return(key, expire_after = 7.days.seconds) | ||||
|   def lock_or_return(key, expire_after = 2.hours.seconds) | ||||
|     yield if redis.set(key, true, nx: true, ex: expire_after) | ||||
|   ensure | ||||
|     redis.del(key) | ||||
|   end | ||||
|  | ||||
|   def lock_or_fail(key) | ||||
|     RedisLock.acquire({ redis: Redis.current, key: key }) do |lock| | ||||
|       if lock.acquired? | ||||
|         yield | ||||
|       else | ||||
|         raise Mastodon::RaceConditionError | ||||
|       end | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   def fetch? | ||||
|     !@options[:delivery] | ||||
|   end | ||||
|   | ||||
| @@ -4,29 +4,25 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity | ||||
|   def perform | ||||
|     return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity? | ||||
|  | ||||
|     RedisLock.acquire(lock_options) do |lock| | ||||
|       if lock.acquired? | ||||
|         original_status = status_from_object | ||||
|     lock_or_fail("announce:#{@object['id']}") do | ||||
|       original_status = status_from_object | ||||
|  | ||||
|         return reject_payload! if original_status.nil? || !announceable?(original_status) | ||||
|       return reject_payload! if original_status.nil? || !announceable?(original_status) | ||||
|  | ||||
|         @status = Status.find_by(account: @account, reblog: original_status) | ||||
|       @status = Status.find_by(account: @account, reblog: original_status) | ||||
|  | ||||
|         return @status unless @status.nil? | ||||
|       return @status unless @status.nil? | ||||
|  | ||||
|         @status = Status.create!( | ||||
|           account: @account, | ||||
|           reblog: original_status, | ||||
|           uri: @json['id'], | ||||
|           created_at: @json['published'], | ||||
|           override_timestamps: @options[:override_timestamps], | ||||
|           visibility: visibility_from_audience | ||||
|         ) | ||||
|       @status = Status.create!( | ||||
|         account: @account, | ||||
|         reblog: original_status, | ||||
|         uri: @json['id'], | ||||
|         created_at: @json['published'], | ||||
|         override_timestamps: @options[:override_timestamps], | ||||
|         visibility: visibility_from_audience | ||||
|       ) | ||||
|  | ||||
|         distribute(@status) | ||||
|       else | ||||
|         raise Mastodon::RaceConditionError | ||||
|       end | ||||
|       distribute(@status) | ||||
|     end | ||||
|  | ||||
|     @status | ||||
| @@ -69,8 +65,4 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity | ||||
|   def reblog_of_local_status? | ||||
|     status_from_uri(object_uri)&.account&.local? | ||||
|   end | ||||
|  | ||||
|   def lock_options | ||||
|     { redis: Redis.current, key: "announce:#{@object['id']}" } | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -45,19 +45,15 @@ class ActivityPub::Activity::Create < ActivityPub::Activity | ||||
|   def create_status | ||||
|     return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity? | ||||
|  | ||||
|     RedisLock.acquire(lock_options) do |lock| | ||||
|       if lock.acquired? | ||||
|         return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator | ||||
|     lock_or_fail("create:#{object_uri}") do | ||||
|       return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator | ||||
|  | ||||
|         @status = find_existing_status | ||||
|       @status = find_existing_status | ||||
|  | ||||
|         if @status.nil? | ||||
|           process_status | ||||
|         elsif @options[:delivered_to_account_id].present? | ||||
|           postprocess_audience_and_deliver | ||||
|         end | ||||
|       else | ||||
|         raise Mastodon::RaceConditionError | ||||
|       if @status.nil? | ||||
|         process_status | ||||
|       elsif @options[:delivered_to_account_id].present? | ||||
|         postprocess_audience_and_deliver | ||||
|       end | ||||
|     end | ||||
|  | ||||
| @@ -313,13 +309,9 @@ class ActivityPub::Activity::Create < ActivityPub::Activity | ||||
|     poll = replied_to_status.preloadable_poll | ||||
|     already_voted = true | ||||
|  | ||||
|     RedisLock.acquire(poll_lock_options) do |lock| | ||||
|       if lock.acquired? | ||||
|         already_voted = poll.votes.where(account: @account).exists? | ||||
|         poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri) | ||||
|       else | ||||
|         raise Mastodon::RaceConditionError | ||||
|       end | ||||
|     lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do | ||||
|       already_voted = poll.votes.where(account: @account).exists? | ||||
|       poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri) | ||||
|     end | ||||
|  | ||||
|     increment_voters_count! unless already_voted | ||||
| @@ -508,12 +500,4 @@ class ActivityPub::Activity::Create < ActivityPub::Activity | ||||
|     poll.reload | ||||
|     retry | ||||
|   end | ||||
|  | ||||
|   def lock_options | ||||
|     { redis: Redis.current, key: "create:#{object_uri}" } | ||||
|   end | ||||
|  | ||||
|   def poll_lock_options | ||||
|     { redis: Redis.current, key: "vote:#{replied_to_status.poll_id}:#{@account.id}" } | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -20,33 +20,35 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity | ||||
|   def delete_note | ||||
|     return if object_uri.nil? | ||||
|  | ||||
|     unless invalid_origin?(object_uri) | ||||
|       RedisLock.acquire(lock_options) { |_lock| delete_later!(object_uri) } | ||||
|       Tombstone.find_or_create_by(uri: object_uri, account: @account) | ||||
|     lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do | ||||
|       unless invalid_origin?(object_uri) | ||||
|         # This lock ensures a concurrent `ActivityPub::Activity::Create` either | ||||
|         # does not create a status at all, or has finished saving it to the | ||||
|         # database before we try to load it. | ||||
|         # Without the lock, `delete_later!` could be called after `delete_arrived_first?` | ||||
|         # and `Status.find` before `Status.create!` | ||||
|         lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) } | ||||
|  | ||||
|         Tombstone.find_or_create_by(uri: object_uri, account: @account) | ||||
|       end | ||||
|  | ||||
|       @status   = Status.find_by(uri: object_uri, account: @account) | ||||
|       @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present? | ||||
|  | ||||
|       return if @status.nil? | ||||
|  | ||||
|       forward! if @json['signature'].present? && @status.distributable? | ||||
|       delete_now! | ||||
|     end | ||||
|  | ||||
|     @status   = Status.find_by(uri: object_uri, account: @account) | ||||
|     @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present? | ||||
|  | ||||
|     return if @status.nil? | ||||
|  | ||||
|     if @status.distributable? | ||||
|       forward_for_reply | ||||
|       forward_for_reblogs | ||||
|     end | ||||
|  | ||||
|     delete_now! | ||||
|   end | ||||
|  | ||||
|   def forward_for_reblogs | ||||
|     return if @json['signature'].blank? | ||||
|   def rebloggers_ids | ||||
|     return @rebloggers_ids if defined?(@rebloggers_ids) | ||||
|     @rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id) | ||||
|   end | ||||
|  | ||||
|     rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id) | ||||
|     inboxes        = Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - [@account.preferred_inbox_url] | ||||
|  | ||||
|     ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url| | ||||
|       [payload, rebloggers_ids.first, inbox_url] | ||||
|     end | ||||
|   def inboxes_for_reblogs | ||||
|     Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes | ||||
|   end | ||||
|  | ||||
|   def replied_to_status | ||||
| @@ -58,13 +60,19 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity | ||||
|     !replied_to_status.nil? && replied_to_status.account.local? | ||||
|   end | ||||
|  | ||||
|   def forward_for_reply | ||||
|     return unless @json['signature'].present? && reply_to_local? | ||||
|   def inboxes_for_reply | ||||
|     replied_to_status.account.followers.inboxes | ||||
|   end | ||||
|  | ||||
|     inboxes = replied_to_status.account.followers.inboxes - [@account.preferred_inbox_url] | ||||
|   def forward! | ||||
|     inboxes = inboxes_for_reblogs | ||||
|     inboxes += inboxes_for_reply if reply_to_local? | ||||
|     inboxes -= [@account.preferred_inbox_url] | ||||
|  | ||||
|     ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url| | ||||
|       [payload, replied_to_status.account_id, inbox_url] | ||||
|     sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first | ||||
|  | ||||
|     ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url| | ||||
|       [payload, sender_id, inbox_url] | ||||
|     end | ||||
|   end | ||||
|  | ||||
| @@ -75,8 +83,4 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity | ||||
|   def payload | ||||
|     @payload ||= Oj.dump(@json) | ||||
|   end | ||||
|  | ||||
|   def lock_options | ||||
|     { redis: Redis.current, key: "create:#{object_uri}" } | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -16,6 +16,8 @@ class RemoveStatusService < BaseService | ||||
|     @account  = status.account | ||||
|     @options  = options | ||||
|  | ||||
|     @status.discard | ||||
|  | ||||
|     RedisLock.acquire(lock_options) do |lock| | ||||
|       if lock.acquired? | ||||
|         remove_from_self if @account.local? | ||||
|   | ||||
| @@ -49,4 +49,24 @@ RSpec.describe ActivityPub::Activity::Delete do | ||||
|       end | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   context 'when the status has been reported' do | ||||
|     describe '#perform' do | ||||
|       subject { described_class.new(json, sender) } | ||||
|       let!(:reporter) { Fabricate(:account) } | ||||
|  | ||||
|       before do | ||||
|         reporter.reports.create!(target_account: status.account, status_ids: [status.id], forwarded: false) | ||||
|         subject.perform | ||||
|       end | ||||
|  | ||||
|       it 'marks the status as deleted' do | ||||
|         expect(Status.find_by(id: status.id)).to be_nil | ||||
|       end | ||||
|  | ||||
|       it 'actually keeps a copy for inspection' do | ||||
|         expect(Status.with_discarded.find_by(id: status.id)).to_not be_nil | ||||
|       end | ||||
|     end | ||||
|   end | ||||
| end | ||||
|   | ||||
		Reference in New Issue
	
	Block a user