Clean up reblog tracking keys, related improvements (#5428)
* Clean up reblog-tracking sets from FeedManager Builds on #5419, with a few minor optimizations and cleanup of sets after they are no longer needed. * Update tests, fix multiply-reblogged case Previously, we would have lost the fact that a given status was reblogged if the displayed reblog of it was removed, now we don't. Also added tests to make sure FeedManager#trim cleans up our reblog tracking keys, fixed up FeedCleanupScheduler to use the right loop, and fixed the test for it.
This commit is contained in:
		| @@ -56,7 +56,17 @@ class FeedManager | ||||
|     falloff_rank = FeedManager::REBLOG_FALLOFF - 1 | ||||
|     falloff_range = redis.zrevrange(timeline_key, falloff_rank, falloff_rank, with_scores: true) | ||||
|     falloff_score = falloff_range&.first&.last&.to_i || 0 | ||||
|     redis.zremrangebyscore(reblog_key, 0, falloff_score) | ||||
|  | ||||
|     # Get any reblogs we might have to clean up after. | ||||
|     redis.zrangebyscore(reblog_key, 0, falloff_score).each do |reblogged_id| | ||||
|       # Remove it from the set of reblogs we're tracking *first* to avoid races. | ||||
|       redis.zrem(reblog_key, reblogged_id) | ||||
|       # Just drop any set we might have created to track additional reblogs. | ||||
|       # This means that if this reblog is deleted, we won't automatically insert | ||||
|       # another reblog, but also that any new reblog can be inserted into the | ||||
|       # feed. | ||||
|       redis.del(key(type, account_id, "reblogs:#{reblogged_id}")) | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   def push_update_required?(timeline_type, account_id) | ||||
| @@ -177,23 +187,28 @@ class FeedManager | ||||
|     reblog_key   = key(timeline_type, account.id, 'reblogs') | ||||
|  | ||||
|     if status.reblog? | ||||
|       reblog_set_key = key(timeline_type, account.id, "reblogs:#{status.reblog_of_id}") | ||||
|  | ||||
|       # If the original status or a reblog of it is within | ||||
|       # REBLOG_FALLOFF statuses from the top, do not re-insert it into | ||||
|       # the feed | ||||
|       rank = redis.zrevrank(timeline_key, status.reblog_of_id) | ||||
|  | ||||
|       redis.sadd(reblog_set_key, status.reblog_of_id) unless rank.nil? | ||||
|       redis.sadd(reblog_set_key, status.id) | ||||
|  | ||||
|       return false if !rank.nil? && rank < FeedManager::REBLOG_FALLOFF | ||||
|  | ||||
|       reblog_rank = redis.zrevrank(reblog_key, status.reblog_of_id) | ||||
|       return false unless reblog_rank.nil? | ||||
|  | ||||
|       redis.zadd(timeline_key, status.id, status.id) | ||||
|       redis.zadd(reblog_key, status.id, status.reblog_of_id) | ||||
|       if reblog_rank.nil? | ||||
|         # This is not something we've already seen reblogged, so we | ||||
|         # can just add it to the feed (and note that we're | ||||
|         # reblogging it). | ||||
|         redis.zadd(timeline_key, status.id, status.id) | ||||
|         redis.zadd(reblog_key, status.id, status.reblog_of_id) | ||||
|       else | ||||
|         # Another reblog of the same status was already in the | ||||
|         # REBLOG_FALLOFF most recent statuses, so we note that this | ||||
|         # is an "extra" reblog, by storing it in reblog_set_key. | ||||
|         reblog_set_key = key(timeline_type, account.id, "reblogs:#{status.reblog_of_id}") | ||||
|         redis.sadd(reblog_set_key, status.id) | ||||
|         return false | ||||
|       end | ||||
|     else | ||||
|       redis.zadd(timeline_key, status.id, status.id) | ||||
|     end | ||||
| @@ -207,23 +222,21 @@ class FeedManager | ||||
|   # do so if appropriate. | ||||
|   def remove_from_feed(timeline_type, account, status) | ||||
|     timeline_key = key(timeline_type, account.id) | ||||
|     reblog_key   = key(timeline_type, account.id, 'reblogs') | ||||
|  | ||||
|     if status.reblog? | ||||
|       # 1. If the reblogging status is not in the feed, stop. | ||||
|       status_rank = redis.zrevrank(timeline_key, status.id) | ||||
|       return false if status_rank.nil? | ||||
|  | ||||
|       # 2. Remove the reblogged status from the `:reblogs` zset. | ||||
|       redis.zrem(reblog_key, status.reblog_of_id) | ||||
|  | ||||
|       # 3. Remove reblog from set of this status's reblogs, and | ||||
|       # re-insert another reblog or original into the feed if | ||||
|       # one remains in the set | ||||
|       # 2. Remove reblog from set of this status's reblogs. | ||||
|       reblog_set_key = key(timeline_type, account.id, "reblogs:#{status.reblog_of_id}") | ||||
|  | ||||
|       redis.srem(reblog_set_key, status.id) | ||||
|       other_reblog = redis.srandmember(reblog_set_key) | ||||
|       # 3. Re-insert another reblog or original into the feed if one | ||||
|       # remains in the set. We could pick a random element, but this | ||||
|       # set should generally be small, and it seems ideal to show the | ||||
|       # oldest potential such reblog. | ||||
|       other_reblog = redis.smembers(reblog_set_key).map(&:to_i).sort.first | ||||
|  | ||||
|       redis.zadd(timeline_key, other_reblog, other_reblog) if other_reblog | ||||
|  | ||||
|   | ||||
| @@ -5,18 +5,36 @@ class Scheduler::FeedCleanupScheduler | ||||
|   include Sidekiq::Worker | ||||
|  | ||||
|   def perform | ||||
|     reblogged_id_sets = {} | ||||
|     feedmanager = FeedManager.instance | ||||
|  | ||||
|     redis.pipelined do | ||||
|       inactive_users.each do |account_id| | ||||
|         redis.del(FeedManager.instance.key(:home, account_id)) | ||||
|         redis.del(FeedManager.instance.key(:home, account_id, 'reblogs')) | ||||
|       inactive_user_ids.each do |account_id| | ||||
|         redis.del(feedmanager.key(:home, account_id)) | ||||
|         reblog_key = feedmanager.key(:home, account_id, 'reblogs') | ||||
|         # We collect a future for this: we don't block while getting | ||||
|         # it, but we can iterate over it later. | ||||
|         reblogged_id_sets[account_id] = redis.zrange(reblog_key, 0, -1) | ||||
|         redis.del(reblog_key) | ||||
|       end | ||||
|     end | ||||
|  | ||||
|     # Remove all of the reblog tracking keys we just removed the | ||||
|     # references to. | ||||
|     redis.pipelined do | ||||
|       reblogged_id_sets.each do |account_id, future| | ||||
|         future.value.each do |reblogged_id| | ||||
|           reblog_set_key = feedmanager.key(:home, account_id, "reblogs:#{reblogged_id}") | ||||
|           redis.del(reblog_set_key) | ||||
|         end | ||||
|       end | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   private | ||||
|  | ||||
|   def inactive_users | ||||
|     @inactive_users ||= User.confirmed.inactive.pluck(:account_id) | ||||
|   def inactive_user_ids | ||||
|     @inactive_user_ids ||= User.confirmed.inactive.pluck(:account_id) | ||||
|   end | ||||
|  | ||||
|   def redis | ||||
|   | ||||
| @@ -211,6 +211,22 @@ RSpec.describe FeedManager do | ||||
|         expect(FeedManager.instance.push('type', account, reblogs.last)).to be false | ||||
|       end | ||||
|  | ||||
|       it 'does not save a new reblog of a multiply-reblogged-then-unreblogged status' do | ||||
|         account   = Fabricate(:account) | ||||
|         reblogged = Fabricate(:status) | ||||
|         reblogs = 3.times.map { Fabricate(:status, reblog: reblogged) } | ||||
|  | ||||
|         # Accept the reblogs | ||||
|         FeedManager.instance.push('type', account, reblogs[0]) | ||||
|         FeedManager.instance.push('type', account, reblogs[1]) | ||||
|  | ||||
|         # Unreblog the first one | ||||
|         FeedManager.instance.unpush('type', account, reblogs[0]) | ||||
|  | ||||
|         # The last reblog should still be ignored | ||||
|         expect(FeedManager.instance.push('type', account, reblogs.last)).to be false | ||||
|       end | ||||
|  | ||||
|       it 'saves a new reblog of a long-ago-reblogged status' do | ||||
|         account = Fabricate(:account) | ||||
|         reblogged = Fabricate(:status) | ||||
| @@ -230,6 +246,38 @@ RSpec.describe FeedManager do | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   describe '#trim' do | ||||
|     let(:receiver) { Fabricate(:account) } | ||||
|  | ||||
|     it 'cleans up reblog tracking keys' do | ||||
|       reblogged      = Fabricate(:status) | ||||
|       status         = Fabricate(:status, reblog: reblogged) | ||||
|       another_status = Fabricate(:status, reblog: reblogged) | ||||
|       reblogs_key    = FeedManager.instance.key('type', receiver.id, 'reblogs') | ||||
|       reblog_set_key = FeedManager.instance.key('type', receiver.id, "reblogs:#{reblogged.id}") | ||||
|  | ||||
|       FeedManager.instance.push('type', receiver, status) | ||||
|       FeedManager.instance.push('type', receiver, another_status) | ||||
|  | ||||
|       # We should have a tracking set and an entry in reblogs. | ||||
|       expect(Redis.current.exists(reblog_set_key)).to be true | ||||
|       expect(Redis.current.zrange(reblogs_key, 0, -1)).to eq [reblogged.id.to_s] | ||||
|  | ||||
|       # Push everything off the end of the feed. | ||||
|       FeedManager::MAX_ITEMS.times do | ||||
|         FeedManager.instance.push('type', receiver, Fabricate(:status)) | ||||
|       end | ||||
|  | ||||
|       # `trim` should be called automatically, but do it anyway, as | ||||
|       # we're testing `trim`, not side effects of `push`. | ||||
|       FeedManager.instance.trim('type', receiver.id) | ||||
|  | ||||
|       # We should not have any reblog tracking data. | ||||
|       expect(Redis.current.exists(reblog_set_key)).to be false | ||||
|       expect(Redis.current.zrange(reblogs_key, 0, -1)).to be_empty | ||||
|     end | ||||
|   end | ||||
|  | ||||
|   describe '#unpush' do | ||||
|     let(:receiver) { Fabricate(:account) } | ||||
|  | ||||
| @@ -265,20 +313,22 @@ RSpec.describe FeedManager do | ||||
|       expect(Redis.current.zrange("feed:type:#{receiver.id}", 0, -1)).to be_empty | ||||
|     end | ||||
|  | ||||
|     it 'leaves a reblogged status if another reblog was in feed' do | ||||
|       reblogged      = Fabricate(:status) | ||||
|       status         = Fabricate(:status, reblog: reblogged) | ||||
|       another_status = Fabricate(:status, reblog: reblogged) | ||||
|     it 'leaves a multiply-reblogged status if another reblog was in feed' do | ||||
|       reblogged = Fabricate(:status) | ||||
|       reblogs   = 3.times.map { Fabricate(:status, reblog: reblogged) } | ||||
|  | ||||
|       FeedManager.instance.push('type', receiver, status) | ||||
|       FeedManager.instance.push('type', receiver, another_status) | ||||
|       reblogs.each do |reblog| | ||||
|         FeedManager.instance.push('type', receiver, reblog) | ||||
|       end | ||||
|  | ||||
|       # The reblogging status should show up under normal conditions. | ||||
|       expect(Redis.current.zrange("feed:type:#{receiver.id}", 0, -1)).to eq [status.id.to_s] | ||||
|       expect(Redis.current.zrange("feed:type:#{receiver.id}", 0, -1)).to eq [reblogs.first.id.to_s] | ||||
|  | ||||
|       FeedManager.instance.unpush('type', receiver, status) | ||||
|       reblogs[0...-1].each do |reblog| | ||||
|         FeedManager.instance.unpush('type', receiver, reblog) | ||||
|       end | ||||
|  | ||||
|       expect(Redis.current.zrange("feed:type:#{receiver.id}", 0, -1)).to eq [another_status.id.to_s] | ||||
|       expect(Redis.current.zrange("feed:type:#{receiver.id}", 0, -1)).to eq [reblogs.last.id.to_s] | ||||
|     end | ||||
|  | ||||
|     it 'sends push updates' do | ||||
|   | ||||
| @@ -9,14 +9,18 @@ describe Scheduler::FeedCleanupScheduler do | ||||
|   it 'clears feeds of inactives' do | ||||
|     Redis.current.zadd(feed_key_for(inactive_user), 1, 1) | ||||
|     Redis.current.zadd(feed_key_for(active_user), 1, 1) | ||||
|     Redis.current.zadd(feed_key_for(inactive_user, 'reblogs'), 2, 2) | ||||
|     Redis.current.sadd(feed_key_for(inactive_user, 'reblogs:2'), 3) | ||||
|  | ||||
|     subject.perform | ||||
|  | ||||
|     expect(Redis.current.zcard(feed_key_for(inactive_user))).to eq 0 | ||||
|     expect(Redis.current.zcard(feed_key_for(active_user))).to eq 1 | ||||
|     expect(Redis.current.exists(feed_key_for(inactive_user, 'reblogs'))).to be false | ||||
|     expect(Redis.current.exists(feed_key_for(inactive_user, 'reblogs:2'))).to be false | ||||
|   end | ||||
|  | ||||
|   def feed_key_for(user) | ||||
|     FeedManager.instance.key(:home, user.account_id) | ||||
|   def feed_key_for(user, subtype = nil) | ||||
|     FeedManager.instance.key(:home, user.account_id, subtype) | ||||
|   end | ||||
| end | ||||
|   | ||||
		Reference in New Issue
	
	Block a user