ActivityPub migration procedure (#4617)
* ActivityPub migration procedure Once one account is detected as going from OStatus to ActivityPub, invalidate WebFinger cache for other accounts from the same domain * Unsubscribe from PuSH updates once we receive an ActivityPub payload * Re-subscribe to PuSH unless already unsubscribed, regardless of protocol
This commit is contained in:
		| @@ -7,6 +7,7 @@ class ActivityPub::InboxesController < Api::BaseController | ||||
|  | ||||
|   def create | ||||
|     if signed_request_account | ||||
|       upgrade_account | ||||
|       process_payload | ||||
|       head 201 | ||||
|     else | ||||
| @@ -24,6 +25,11 @@ class ActivityPub::InboxesController < Api::BaseController | ||||
|     @body ||= request.body.read | ||||
|   end | ||||
|  | ||||
|   def upgrade_account | ||||
|     return unless signed_request_account.subscribed? | ||||
|     Pubsubhubbub::UnsubscribeWorker.perform_async(signed_request_account.id) | ||||
|   end | ||||
|  | ||||
|   def process_payload | ||||
|     ActivityPub::ProcessingWorker.perform_async(signed_request_account.id, body.force_encoding('UTF-8')) | ||||
|   end | ||||
|   | ||||
| @@ -17,7 +17,7 @@ module Admin | ||||
|     end | ||||
|  | ||||
|     def unsubscribe | ||||
|       UnsubscribeService.new.call(@account) | ||||
|       Pubsubhubbub::UnsubscribeWorker.perform_async(@account.id) | ||||
|       redirect_to admin_account_path(@account.id) | ||||
|     end | ||||
|  | ||||
|   | ||||
| @@ -12,7 +12,8 @@ class ActivityPub::ProcessAccountService < BaseService | ||||
|     @domain   = domain | ||||
|     @account  = Account.find_by(uri: @uri) | ||||
|  | ||||
|     create_account if @account.nil? | ||||
|     create_account  if @account.nil? | ||||
|     upgrade_account if @account.ostatus? | ||||
|     update_account | ||||
|  | ||||
|     @account | ||||
| @@ -24,6 +25,7 @@ class ActivityPub::ProcessAccountService < BaseService | ||||
|  | ||||
|   def create_account | ||||
|     @account = Account.new | ||||
|     @account.protocol    = :activitypub | ||||
|     @account.username    = @username | ||||
|     @account.domain      = @domain | ||||
|     @account.uri         = @uri | ||||
| @@ -50,6 +52,10 @@ class ActivityPub::ProcessAccountService < BaseService | ||||
|     @account.save! | ||||
|   end | ||||
|  | ||||
|   def upgrade_account | ||||
|     ActivityPub::PostUpgradeWorker.perform_async(@account.domain) | ||||
|   end | ||||
|  | ||||
|   def image_url(key) | ||||
|     value = first_of_value(@json[key]) | ||||
|  | ||||
|   | ||||
| @@ -2,7 +2,7 @@ | ||||
|  | ||||
| class UnsubscribeService < BaseService | ||||
|   def call(account) | ||||
|     return unless account.ostatus? | ||||
|     return if account.hub_url.blank? | ||||
|  | ||||
|     @account  = account | ||||
|     @response = build_request.perform | ||||
|   | ||||
							
								
								
									
										15
									
								
								app/workers/activitypub/post_upgrade_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								app/workers/activitypub/post_upgrade_worker.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,15 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class ActivityPub::PostUpgradeWorker | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   sidekiq_options queue: 'pull' | ||||
|  | ||||
|   def perform(domain) | ||||
|     Account.where(domain: domain) | ||||
|            .where(protocol: :ostatus) | ||||
|            .where.not(last_webfingered_at: nil) | ||||
|            .in_batches | ||||
|            .update_all(last_webfingered_at: nil) | ||||
|   end | ||||
| end | ||||
							
								
								
									
										15
									
								
								app/workers/pubsubhubbub/unsubscribe_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								app/workers/pubsubhubbub/unsubscribe_worker.rb
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,15 @@ | ||||
| # frozen_string_literal: true | ||||
|  | ||||
| class Pubsubhubbub::UnsubscribeWorker | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false | ||||
|  | ||||
|   def perform(account_id) | ||||
|     account = Account.find(account_id) | ||||
|     logger.debug "PuSH unsubscribing from #{account.acct}" | ||||
|     ::UnsubscribeService.new.call(account) | ||||
|   rescue ActiveRecord::RecordNotFound | ||||
|     true | ||||
|   end | ||||
| end | ||||
| @@ -14,6 +14,6 @@ class Scheduler::SubscriptionsScheduler | ||||
|   private | ||||
|  | ||||
|   def expiring_accounts | ||||
|     Account.where(protocol: :ostatus).expiring(1.day.from_now).partitioned | ||||
|     Account.expiring(1.day.from_now).partitioned | ||||
|   end | ||||
| end | ||||
|   | ||||
| @@ -111,10 +111,7 @@ namespace :mastodon do | ||||
|   namespace :push do | ||||
|     desc 'Unsubscribes from PuSH updates of feeds nobody follows locally' | ||||
|     task clear: :environment do | ||||
|       Account.remote.without_followers.where.not(subscription_expires_at: nil).find_each do |a| | ||||
|         Rails.logger.debug "PuSH unsubscribing from #{a.acct}" | ||||
|         UnsubscribeService.new.call(a) | ||||
|       end | ||||
|       Pubsubhubbub::UnsubscribeWorker.push_bulk(Account.remote.without_followers.where.not(subscription_expires_at: nil).pluck(:id)) | ||||
|     end | ||||
|  | ||||
|     desc 'Re-subscribes to soon expiring PuSH subscriptions (deprecated)' | ||||
|   | ||||
		Reference in New Issue
	
	Block a user