Merge upstream 2.0ish #165
This commit is contained in:
@@ -5,14 +5,18 @@ class ActivityPub::FetchRemoteAccountService < BaseService
|
||||
|
||||
# Should be called when uri has already been checked for locality
|
||||
# Does a WebFinger roundtrip on each call
|
||||
def call(uri, prefetched_json = nil)
|
||||
@json = body_to_json(prefetched_json) || fetch_resource(uri)
|
||||
def call(uri, id: true, prefetched_body: nil)
|
||||
@json = if prefetched_body.nil?
|
||||
fetch_resource(uri, id)
|
||||
else
|
||||
body_to_json(prefetched_body)
|
||||
end
|
||||
|
||||
return unless supported_context? && expected_type?
|
||||
|
||||
@uri = @json['id']
|
||||
@username = @json['preferredUsername']
|
||||
@domain = Addressable::URI.parse(uri).normalized_host
|
||||
@domain = Addressable::URI.parse(@uri).normalized_host
|
||||
|
||||
return unless verified_webfinger?
|
||||
|
||||
@@ -27,17 +31,15 @@ class ActivityPub::FetchRemoteAccountService < BaseService
|
||||
webfinger = Goldfinger.finger("acct:#{@username}@#{@domain}")
|
||||
confirmed_username, confirmed_domain = split_acct(webfinger.subject)
|
||||
|
||||
return true if @username.casecmp(confirmed_username).zero? && @domain.casecmp(confirmed_domain).zero?
|
||||
return webfinger.link('self')&.href == @uri if @username.casecmp(confirmed_username).zero? && @domain.casecmp(confirmed_domain).zero?
|
||||
|
||||
webfinger = Goldfinger.finger("acct:#{confirmed_username}@#{confirmed_domain}")
|
||||
confirmed_username, confirmed_domain = split_acct(webfinger.subject)
|
||||
@username, @domain = split_acct(webfinger.subject)
|
||||
self_reference = webfinger.link('self')
|
||||
|
||||
return false unless @username.casecmp(confirmed_username).zero? && @domain.casecmp(confirmed_domain).zero?
|
||||
return false if self_reference&.href != @uri
|
||||
|
||||
@username = confirmed_username
|
||||
@domain = confirmed_domain
|
||||
|
||||
true
|
||||
rescue Goldfinger::Error
|
||||
false
|
||||
|
@@ -4,13 +4,26 @@ class ActivityPub::FetchRemoteKeyService < BaseService
|
||||
include JsonLdHelper
|
||||
|
||||
# Returns account that owns the key
|
||||
def call(uri, prefetched_json = nil)
|
||||
@json = body_to_json(prefetched_json) || fetch_resource(uri)
|
||||
def call(uri, id: true, prefetched_body: nil)
|
||||
if prefetched_body.nil?
|
||||
if id
|
||||
@json = fetch_resource_without_id_validation(uri)
|
||||
if person?
|
||||
@json = fetch_resource(@json['id'], true)
|
||||
elsif uri != @json['id']
|
||||
return
|
||||
end
|
||||
else
|
||||
@json = fetch_resource(uri, id)
|
||||
end
|
||||
else
|
||||
@json = body_to_json(prefetched_body)
|
||||
end
|
||||
|
||||
return unless supported_context?(@json) && expected_type?
|
||||
return find_account(uri, @json) if person?
|
||||
return find_account(@json['id'], @json) if person?
|
||||
|
||||
@owner = fetch_resource(owner_uri)
|
||||
@owner = fetch_resource(owner_uri, true)
|
||||
|
||||
return unless supported_context?(@owner) && confirmed_owner?
|
||||
|
||||
@@ -19,9 +32,9 @@ class ActivityPub::FetchRemoteKeyService < BaseService
|
||||
|
||||
private
|
||||
|
||||
def find_account(uri, prefetched_json)
|
||||
def find_account(uri, prefetched_body)
|
||||
account = ActivityPub::TagManager.instance.uri_to_resource(uri, Account)
|
||||
account ||= ActivityPub::FetchRemoteAccountService.new.call(uri, prefetched_json)
|
||||
account ||= ActivityPub::FetchRemoteAccountService.new.call(uri, prefetched_body: prefetched_body)
|
||||
account
|
||||
end
|
||||
|
||||
|
@@ -4,36 +4,33 @@ class ActivityPub::FetchRemoteStatusService < BaseService
|
||||
include JsonLdHelper
|
||||
|
||||
# Should be called when uri has already been checked for locality
|
||||
def call(uri, prefetched_json = nil)
|
||||
@json = body_to_json(prefetched_json) || fetch_resource(uri)
|
||||
def call(uri, id: true, prefetched_body: nil)
|
||||
@json = if prefetched_body.nil?
|
||||
fetch_resource(uri, id)
|
||||
else
|
||||
body_to_json(prefetched_body)
|
||||
end
|
||||
|
||||
return unless supported_context?
|
||||
return unless supported_context? && expected_type?
|
||||
|
||||
activity = activity_json
|
||||
actor_id = value_or_id(activity['actor'])
|
||||
|
||||
return unless expected_type?(activity) && trustworthy_attribution?(uri, actor_id)
|
||||
return if actor_id.nil? || !trustworthy_attribution?(@json['id'], actor_id)
|
||||
|
||||
actor = ActivityPub::TagManager.instance.uri_to_resource(actor_id, Account)
|
||||
actor = ActivityPub::FetchRemoteAccountService.new.call(actor_id) if actor.nil?
|
||||
actor = ActivityPub::FetchRemoteAccountService.new.call(actor_id, id: true) if actor.nil?
|
||||
|
||||
return if actor.suspended?
|
||||
|
||||
ActivityPub::Activity.factory(activity, actor).perform
|
||||
ActivityPub::Activity.factory(activity_json, actor).perform
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def activity_json
|
||||
if %w(Note Article).include? @json['type']
|
||||
{
|
||||
'type' => 'Create',
|
||||
'actor' => first_of_value(@json['attributedTo']),
|
||||
'object' => @json,
|
||||
}
|
||||
else
|
||||
@json
|
||||
end
|
||||
{ 'type' => 'Create', 'actor' => actor_id, 'object' => @json }
|
||||
end
|
||||
|
||||
def actor_id
|
||||
first_of_value(@json['attributedTo'])
|
||||
end
|
||||
|
||||
def trustworthy_attribution?(uri, attributed_to)
|
||||
@@ -44,7 +41,7 @@ class ActivityPub::FetchRemoteStatusService < BaseService
|
||||
super(@json)
|
||||
end
|
||||
|
||||
def expected_type?(json)
|
||||
%w(Create Announce).include? json['type']
|
||||
def expected_type?
|
||||
%w(Note Article).include? @json['type']
|
||||
end
|
||||
end
|
||||
|
@@ -90,7 +90,7 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||
return if value.nil?
|
||||
return value['url'] if value.is_a?(Hash)
|
||||
|
||||
image = fetch_resource(value)
|
||||
image = fetch_resource_without_id_validation(value)
|
||||
image['url'] if image
|
||||
end
|
||||
|
||||
@@ -100,7 +100,7 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||
return if value.nil?
|
||||
return value['publicKeyPem'] if value.is_a?(Hash)
|
||||
|
||||
key = fetch_resource(value)
|
||||
key = fetch_resource_without_id_validation(value)
|
||||
key['publicKeyPem'] if key
|
||||
end
|
||||
|
||||
@@ -130,7 +130,7 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||
return if @json[type].blank?
|
||||
return @collections[type] if @collections.key?(type)
|
||||
|
||||
collection = fetch_resource(@json[type])
|
||||
collection = fetch_resource_without_id_validation(@json[type])
|
||||
|
||||
@collections[type] = collection.is_a?(Hash) && collection['totalItems'].present? && collection['totalItems'].is_a?(Numeric) ? collection['totalItems'] : nil
|
||||
rescue HTTP::Error, OpenSSL::SSL::SSLError
|
||||
|
@@ -3,9 +3,10 @@
|
||||
class ActivityPub::ProcessCollectionService < BaseService
|
||||
include JsonLdHelper
|
||||
|
||||
def call(body, account)
|
||||
def call(body, account, options = {})
|
||||
@account = account
|
||||
@json = Oj.load(body, mode: :strict)
|
||||
@options = options
|
||||
|
||||
return unless supported_context?
|
||||
return if different_actor? && verify_account!.nil?
|
||||
@@ -38,7 +39,7 @@ class ActivityPub::ProcessCollectionService < BaseService
|
||||
end
|
||||
|
||||
def process_item(item)
|
||||
activity = ActivityPub::Activity.factory(item, @account)
|
||||
activity = ActivityPub::Activity.factory(item, @account, @options)
|
||||
activity&.perform
|
||||
end
|
||||
|
||||
|
@@ -29,7 +29,7 @@ class BatchedRemoveStatusService < BaseService
|
||||
statuses.group_by(&:account_id).each do |_, account_statuses|
|
||||
account = account_statuses.first.account
|
||||
|
||||
unpush_from_home_timelines(account_statuses)
|
||||
unpush_from_home_timelines(account, account_statuses)
|
||||
|
||||
if account.local?
|
||||
batch_stream_entries(account, account_statuses)
|
||||
@@ -72,14 +72,15 @@ class BatchedRemoveStatusService < BaseService
|
||||
end
|
||||
end
|
||||
|
||||
def unpush_from_home_timelines(statuses)
|
||||
account = statuses.first.account
|
||||
recipients = account.followers.local.pluck(:id)
|
||||
def unpush_from_home_timelines(account, statuses)
|
||||
recipients = account.followers.local.to_a
|
||||
|
||||
recipients << account.id if account.local?
|
||||
recipients << account if account.local?
|
||||
|
||||
recipients.each do |follower_id|
|
||||
unpush(follower_id, statuses)
|
||||
recipients.each do |follower|
|
||||
statuses.each do |status|
|
||||
FeedManager.instance.unpush(:home, follower, status)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -109,28 +110,6 @@ class BatchedRemoveStatusService < BaseService
|
||||
end
|
||||
end
|
||||
|
||||
def unpush(follower_id, statuses)
|
||||
key = FeedManager.instance.key(:home, follower_id)
|
||||
|
||||
originals = statuses.reject(&:reblog?)
|
||||
reblogs = statuses.select(&:reblog?)
|
||||
|
||||
# Quickly remove all originals
|
||||
redis.pipelined do
|
||||
originals.each do |status|
|
||||
redis.zremrangebyscore(key, status.id, status.id)
|
||||
redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
|
||||
end
|
||||
end
|
||||
|
||||
# For reblogs, re-add original status to feed, unless the reblog
|
||||
# was not in the feed in the first place
|
||||
reblogs.each do |status|
|
||||
redis.zadd(key, status.reblog_of_id, status.reblog_of_id) unless redis.zscore(key, status.reblog_of_id).nil?
|
||||
redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
|
||||
end
|
||||
end
|
||||
|
||||
def redis
|
||||
Redis.current
|
||||
end
|
||||
|
@@ -41,10 +41,11 @@ class FetchAtomService < BaseService
|
||||
return nil if @response.code != 200
|
||||
|
||||
if @response.mime_type == 'application/atom+xml'
|
||||
[@url, @response.to_s, :ostatus]
|
||||
[@url, { prefetched_body: @response.to_s }, :ostatus]
|
||||
elsif ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(@response.mime_type)
|
||||
if supported_activity?(@response.to_s)
|
||||
[@url, @response.to_s, :activitypub]
|
||||
json = body_to_json(@response.to_s)
|
||||
if supported_context?(json) && json['type'] == 'Person' && json['inbox'].present?
|
||||
[json['id'], { prefetched_body: @response.to_s, id: true }, :activitypub]
|
||||
else
|
||||
@unsupported_activity = true
|
||||
nil
|
||||
@@ -79,10 +80,4 @@ class FetchAtomService < BaseService
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
def supported_activity?(body)
|
||||
json = body_to_json(body)
|
||||
return false unless supported_context?(json)
|
||||
json['type'] == 'Person' ? json['inbox'].present? : true
|
||||
end
|
||||
end
|
||||
|
@@ -27,7 +27,8 @@ class FetchLinkCardService < BaseService
|
||||
end
|
||||
|
||||
attach_card if @card&.persisted?
|
||||
rescue HTTP::ConnectionError, OpenSSL::SSL::SSLError
|
||||
rescue HTTP::Error, Addressable::URI::InvalidURIError => e
|
||||
Rails.logger.debug "Error fetching link #{@url}: #{e}"
|
||||
nil
|
||||
end
|
||||
|
||||
|
@@ -5,24 +5,24 @@ class FetchRemoteAccountService < BaseService
|
||||
|
||||
def call(url, prefetched_body = nil, protocol = :ostatus)
|
||||
if prefetched_body.nil?
|
||||
resource_url, body, protocol = FetchAtomService.new.call(url)
|
||||
resource_url, resource_options, protocol = FetchAtomService.new.call(url)
|
||||
else
|
||||
resource_url = url
|
||||
body = prefetched_body
|
||||
resource_url = url
|
||||
resource_options = { prefetched_body: prefetched_body }
|
||||
end
|
||||
|
||||
case protocol
|
||||
when :ostatus
|
||||
process_atom(resource_url, body)
|
||||
process_atom(resource_url, **resource_options)
|
||||
when :activitypub
|
||||
ActivityPub::FetchRemoteAccountService.new.call(resource_url, body)
|
||||
ActivityPub::FetchRemoteAccountService.new.call(resource_url, **resource_options)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_atom(url, body)
|
||||
xml = Nokogiri::XML(body)
|
||||
def process_atom(url, prefetched_body:)
|
||||
xml = Nokogiri::XML(prefetched_body)
|
||||
xml.encoding = 'utf-8'
|
||||
|
||||
account = author_from_xml(xml.at_xpath('/xmlns:feed', xmlns: OStatus::TagManager::XMLNS), false)
|
||||
|
@@ -33,7 +33,7 @@ class FetchRemoteResourceService < BaseService
|
||||
end
|
||||
|
||||
def body
|
||||
fetched_atom_feed.second
|
||||
fetched_atom_feed.second[:prefetched_body]
|
||||
end
|
||||
|
||||
def protocol
|
||||
|
@@ -5,26 +5,26 @@ class FetchRemoteStatusService < BaseService
|
||||
|
||||
def call(url, prefetched_body = nil, protocol = :ostatus)
|
||||
if prefetched_body.nil?
|
||||
resource_url, body, protocol = FetchAtomService.new.call(url)
|
||||
resource_url, resource_options, protocol = FetchAtomService.new.call(url)
|
||||
else
|
||||
resource_url = url
|
||||
body = prefetched_body
|
||||
resource_url = url
|
||||
resource_options = { prefetched_body: prefetched_body }
|
||||
end
|
||||
|
||||
case protocol
|
||||
when :ostatus
|
||||
process_atom(resource_url, body)
|
||||
process_atom(resource_url, **resource_options)
|
||||
when :activitypub
|
||||
ActivityPub::FetchRemoteStatusService.new.call(resource_url, body)
|
||||
ActivityPub::FetchRemoteStatusService.new.call(resource_url, **resource_options)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_atom(url, body)
|
||||
def process_atom(url, prefetched_body:)
|
||||
Rails.logger.debug "Processing Atom for remote status at #{url}"
|
||||
|
||||
xml = Nokogiri::XML(body)
|
||||
xml = Nokogiri::XML(prefetched_body)
|
||||
xml.encoding = 'utf-8'
|
||||
|
||||
account = author_from_xml(xml.at_xpath('/xmlns:entry', xmlns: OStatus::TagManager::XMLNS))
|
||||
@@ -32,7 +32,7 @@ class FetchRemoteStatusService < BaseService
|
||||
|
||||
return nil unless !account.nil? && confirmed_domain?(domain, account)
|
||||
|
||||
statuses = ProcessFeedService.new.call(body, account)
|
||||
statuses = ProcessFeedService.new.call(prefetched_body, account)
|
||||
statuses.first
|
||||
rescue Nokogiri::XML::XPath::SyntaxError
|
||||
Rails.logger.debug 'Invalid XML or missing namespace'
|
||||
|
@@ -3,7 +3,8 @@
|
||||
class MuteService < BaseService
|
||||
def call(account, target_account, notifications: nil)
|
||||
return if account.id == target_account.id
|
||||
FeedManager.instance.clear_from_timeline(account, target_account)
|
||||
account.mute!(target_account, notifications: notifications)
|
||||
BlockWorker.perform_async(account.id, target_account.id)
|
||||
mute
|
||||
end
|
||||
end
|
||||
|
@@ -1,43 +1,7 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class PrecomputeFeedService < BaseService
|
||||
LIMIT = FeedManager::MAX_ITEMS / 4
|
||||
|
||||
def call(account)
|
||||
@account = account
|
||||
populate_feed
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
attr_reader :account
|
||||
|
||||
def populate_feed
|
||||
pairs = statuses.reverse_each.lazy.reject(&method(:status_filtered?)).map(&method(:process_status)).to_a
|
||||
|
||||
redis.pipelined do
|
||||
redis.zadd(account_home_key, pairs) if pairs.any?
|
||||
redis.del("account:#{@account.id}:regeneration")
|
||||
end
|
||||
end
|
||||
|
||||
def process_status(status)
|
||||
[status.id, status.reblog? ? status.reblog_of_id : status.id]
|
||||
end
|
||||
|
||||
def status_filtered?(status)
|
||||
FeedManager.instance.filter?(:home, status, account.id)
|
||||
end
|
||||
|
||||
def account_home_key
|
||||
FeedManager.instance.key(:home, account.id)
|
||||
end
|
||||
|
||||
def statuses
|
||||
Status.as_home_timeline(account).order(account_id: :desc).limit(LIMIT)
|
||||
end
|
||||
|
||||
def redis
|
||||
Redis.current
|
||||
FeedManager.instance.populate_feed(account)
|
||||
end
|
||||
end
|
||||
|
@@ -1,7 +1,9 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ProcessFeedService < BaseService
|
||||
def call(body, account)
|
||||
def call(body, account, options = {})
|
||||
@options = options
|
||||
|
||||
xml = Nokogiri::XML(body)
|
||||
xml.encoding = 'utf-8'
|
||||
|
||||
@@ -20,7 +22,7 @@ class ProcessFeedService < BaseService
|
||||
end
|
||||
|
||||
def process_entry(xml, account)
|
||||
activity = OStatus::Activity::General.new(xml, account)
|
||||
activity = OStatus::Activity::General.new(xml, account, @options)
|
||||
activity.specialize&.perform if activity.status?
|
||||
rescue ActiveRecord::RecordInvalid => e
|
||||
Rails.logger.debug "Nothing was saved for #{activity.id} because: #{e}"
|
||||
|
@@ -102,13 +102,7 @@ class RemoveStatusService < BaseService
|
||||
end
|
||||
|
||||
def unpush(type, receiver, status)
|
||||
if status.reblog? && !redis.zscore(FeedManager.instance.key(type, receiver.id), status.reblog_of_id).nil?
|
||||
redis.zadd(FeedManager.instance.key(type, receiver.id), status.reblog_of_id, status.reblog_of_id)
|
||||
else
|
||||
redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id)
|
||||
end
|
||||
|
||||
Redis.current.publish("timeline:#{receiver.id}", @payload)
|
||||
FeedManager.instance.unpush(type, receiver, status)
|
||||
end
|
||||
|
||||
def remove_from_hashtags
|
||||
|
@@ -74,7 +74,7 @@ class ResolveRemoteAccountService < BaseService
|
||||
end
|
||||
|
||||
def webfinger_update_due?
|
||||
@account.nil? || @account.last_webfingered_at.nil? || @account.last_webfingered_at <= 1.day.ago
|
||||
@account.nil? || @account.possibly_stale?
|
||||
end
|
||||
|
||||
def activitypub_ready?
|
||||
@@ -189,7 +189,7 @@ class ResolveRemoteAccountService < BaseService
|
||||
def actor_json
|
||||
return @actor_json if defined?(@actor_json)
|
||||
|
||||
json = fetch_resource(actor_url)
|
||||
json = fetch_resource(actor_url, false)
|
||||
@actor_json = supported_context?(json) && json['type'] == 'Person' ? json : nil
|
||||
end
|
||||
|
||||
|
@@ -12,7 +12,7 @@ class SendInteractionService < BaseService
|
||||
|
||||
return if !target_account.ostatus? || block_notification?
|
||||
|
||||
delivery = build_request.perform
|
||||
delivery = build_request.perform.flush
|
||||
|
||||
raise Mastodon::UnexpectedResponseError, delivery unless delivery.code > 199 && delivery.code < 300
|
||||
end
|
||||
|
@@ -6,7 +6,7 @@ class SubscribeService < BaseService
|
||||
|
||||
@account = account
|
||||
@account.secret = SecureRandom.hex
|
||||
@response = build_request.perform
|
||||
@response = build_request.perform.flush
|
||||
|
||||
if response_failed_permanently?
|
||||
# We're not allowed to subscribe. Fail and move on.
|
||||
|
@@ -7,7 +7,7 @@ class UnsubscribeService < BaseService
|
||||
@account = account
|
||||
|
||||
begin
|
||||
@response = build_request.perform
|
||||
@response = build_request.perform.flush
|
||||
|
||||
Rails.logger.debug "PuSH unsubscribe for #{@account.acct} failed: #{@response.status}" unless @response.status.success?
|
||||
rescue HTTP::Error, OpenSSL::SSL::SSLError => e
|
||||
|
Reference in New Issue
Block a user