Change trending statuses to only show one status from each account (#18181)
Calculate trends in temporary sets to avoid having to manage items that go below the decay threshold while not having any moments where a half-processed set is accessible to end-users
This commit is contained in:
		@@ -64,33 +64,38 @@ class Trends::Base
 | 
			
		||||
    redis.expire(used_key(at_time), 1.day.seconds)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def trim_older_items
 | 
			
		||||
    redis.zremrangebyscore("#{key_prefix}:all", '-inf', '(0.3')
 | 
			
		||||
    redis.zremrangebyscore("#{key_prefix}:allowed", '-inf', '(0.3')
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def score_at_rank(rank)
 | 
			
		||||
    redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # @param [Integer] id
 | 
			
		||||
  # @param [Float] score
 | 
			
		||||
  # @param [Hash<String, Boolean>] subsets
 | 
			
		||||
  def add_to_and_remove_from_subsets(id, score, subsets = {})
 | 
			
		||||
    subsets.each_key do |subset|
 | 
			
		||||
      key = [key_prefix, subset].compact.join(':')
 | 
			
		||||
  def replace_items(suffix, items)
 | 
			
		||||
    tmp_prefix    = "#{key_prefix}:tmp:#{SecureRandom.alphanumeric(6)}#{suffix}"
 | 
			
		||||
    allowed_items = filter_for_allowed_items(items)
 | 
			
		||||
 | 
			
		||||
      if score.positive? && subsets[subset]
 | 
			
		||||
        redis.zadd(key, score, id)
 | 
			
		||||
      else
 | 
			
		||||
        redis.zrem(key, id)
 | 
			
		||||
      end
 | 
			
		||||
    redis.pipelined do |pipeline|
 | 
			
		||||
      items.each { |item| pipeline.zadd("#{tmp_prefix}:all", item[:score], item[:item].id) }
 | 
			
		||||
      allowed_items.each { |item| pipeline.zadd("#{tmp_prefix}:allowed", item[:score], item[:item].id) }
 | 
			
		||||
 | 
			
		||||
      rename_set(pipeline, "#{tmp_prefix}:all", "#{key_prefix}:all#{suffix}", items)
 | 
			
		||||
      rename_set(pipeline, "#{tmp_prefix}:allowed", "#{key_prefix}:allowed#{suffix}", allowed_items)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def filter_for_allowed_items(items)
 | 
			
		||||
    raise NotImplementedError
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def used_key(at_time)
 | 
			
		||||
    "#{key_prefix}:used:#{at_time.beginning_of_day.to_i}"
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def rename_set(pipeline, from_key, to_key, set_items)
 | 
			
		||||
    if set_items.empty?
 | 
			
		||||
      pipeline.del(to_key)
 | 
			
		||||
    else
 | 
			
		||||
      pipeline.rename(from_key, to_key)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -8,14 +8,15 @@ class Trends::Links < Trends::Base
 | 
			
		||||
    review_threshold: 3,
 | 
			
		||||
    max_score_cooldown: 2.days.freeze,
 | 
			
		||||
    max_score_halflife: 8.hours.freeze,
 | 
			
		||||
    decay_threshold: 1,
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  def register(status, at_time = Time.now.utc)
 | 
			
		||||
    original_status = status.reblog? ? status.reblog : status
 | 
			
		||||
    original_status = status.proper
 | 
			
		||||
 | 
			
		||||
    return unless original_status.public_visibility? && status.public_visibility? &&
 | 
			
		||||
                  !original_status.account.silenced? && !status.account.silenced? &&
 | 
			
		||||
                  !original_status.spoiler_text?
 | 
			
		||||
    return unless (original_status.public_visibility? && status.public_visibility?) &&
 | 
			
		||||
                  !(original_status.account.silenced? || status.account.silenced?) &&
 | 
			
		||||
                  !(original_status.spoiler_text? || original_status.sensitive?)
 | 
			
		||||
 | 
			
		||||
    original_status.preview_cards.each do |preview_card|
 | 
			
		||||
      add(preview_card, status.account_id, at_time) if preview_card.appropriate_for_trends?
 | 
			
		||||
@@ -61,6 +62,9 @@ class Trends::Links < Trends::Base
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def calculate_scores(preview_cards, at_time)
 | 
			
		||||
    global_items = []
 | 
			
		||||
    locale_items = Hash.new { |h, key| h[key] = [] }
 | 
			
		||||
 | 
			
		||||
    preview_cards.each do |preview_card|
 | 
			
		||||
      expected  = preview_card.history.get(at_time - 1.day).accounts.to_f
 | 
			
		||||
      expected  = 1.0 if expected.zero?
 | 
			
		||||
@@ -87,33 +91,23 @@ class Trends::Links < Trends::Base
 | 
			
		||||
 | 
			
		||||
      decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
 | 
			
		||||
 | 
			
		||||
      add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
 | 
			
		||||
        all: true,
 | 
			
		||||
        allowed: preview_card.trendable?,
 | 
			
		||||
      })
 | 
			
		||||
      next unless decaying_score >= options[:decay_threshold]
 | 
			
		||||
 | 
			
		||||
      next unless valid_locale?(preview_card.language)
 | 
			
		||||
 | 
			
		||||
      add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
 | 
			
		||||
        "all:#{preview_card.language}" => true,
 | 
			
		||||
        "allowed:#{preview_card.language}" => preview_card.trendable?,
 | 
			
		||||
      })
 | 
			
		||||
      global_items << { score: decaying_score, item:  preview_card }
 | 
			
		||||
      locale_items[preview_card.language] << { score: decaying_score, item: preview_card } if valid_locale?(preview_card.language)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    trim_older_items
 | 
			
		||||
    replace_items('', global_items)
 | 
			
		||||
 | 
			
		||||
    # Clean up localized sets by calculating the intersection with the main
 | 
			
		||||
    # set. We do this instead of just deleting the localized sets to avoid
 | 
			
		||||
    # having moments where the API returns empty results
 | 
			
		||||
 | 
			
		||||
    redis.pipelined do
 | 
			
		||||
      Trends.available_locales.each do |locale|
 | 
			
		||||
        redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
 | 
			
		||||
        redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:allowed"], aggregate: 'max')
 | 
			
		||||
      end
 | 
			
		||||
    Trends.available_locales.each do |locale|
 | 
			
		||||
      replace_items(":#{locale}", locale_items[locale])
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def filter_for_allowed_items(items)
 | 
			
		||||
    items.select { |item| item[:item].trendable? }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def would_be_trending?(id)
 | 
			
		||||
    score(id) > score_at_rank(options[:review_threshold] - 1)
 | 
			
		||||
  end
 | 
			
		||||
 
 | 
			
		||||
@@ -7,6 +7,7 @@ class Trends::Statuses < Trends::Base
 | 
			
		||||
    threshold: 5,
 | 
			
		||||
    review_threshold: 3,
 | 
			
		||||
    score_halflife: 2.hours.freeze,
 | 
			
		||||
    decay_threshold: 0.3,
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  class Query < Trends::Query
 | 
			
		||||
@@ -31,7 +32,7 @@ class Trends::Statuses < Trends::Base
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def register(status, at_time = Time.now.utc)
 | 
			
		||||
    add(status.proper, status.account_id, at_time) if eligible?(status)
 | 
			
		||||
    add(status.proper, status.account_id, at_time) if eligible?(status.proper)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def add(status, _account_id, at_time = Time.now.utc)
 | 
			
		||||
@@ -74,53 +75,45 @@ class Trends::Statuses < Trends::Base
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def eligible?(status)
 | 
			
		||||
    original_status = status.proper
 | 
			
		||||
 | 
			
		||||
    original_status.public_visibility? &&
 | 
			
		||||
      original_status.account.discoverable? && !original_status.account.silenced? &&
 | 
			
		||||
      original_status.spoiler_text.blank? && !original_status.sensitive? && !original_status.reply?
 | 
			
		||||
    status.public_visibility? && status.account.discoverable? && !status.account.silenced? && status.spoiler_text.blank? && !status.sensitive? && !status.reply?
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def calculate_scores(statuses, at_time)
 | 
			
		||||
    redis.pipelined do
 | 
			
		||||
      statuses.each do |status|
 | 
			
		||||
        expected  = 1.0
 | 
			
		||||
        observed  = (status.reblogs_count + status.favourites_count).to_f
 | 
			
		||||
    global_items = []
 | 
			
		||||
    locale_items = Hash.new { |h, key| h[key] = [] }
 | 
			
		||||
 | 
			
		||||
        score = begin
 | 
			
		||||
          if expected > observed || observed < options[:threshold]
 | 
			
		||||
            0
 | 
			
		||||
          else
 | 
			
		||||
            ((observed - expected)**2) / expected
 | 
			
		||||
          end
 | 
			
		||||
    statuses.each do |status|
 | 
			
		||||
      expected  = 1.0
 | 
			
		||||
      observed  = (status.reblogs_count + status.favourites_count).to_f
 | 
			
		||||
 | 
			
		||||
      score = begin
 | 
			
		||||
        if expected > observed || observed < options[:threshold]
 | 
			
		||||
          0
 | 
			
		||||
        else
 | 
			
		||||
          ((observed - expected)**2) / expected
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f))
 | 
			
		||||
 | 
			
		||||
        add_to_and_remove_from_subsets(status.id, decaying_score, {
 | 
			
		||||
          all: true,
 | 
			
		||||
          allowed: status.trendable? && status.account.discoverable?,
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
        next unless valid_locale?(status.language)
 | 
			
		||||
 | 
			
		||||
        add_to_and_remove_from_subsets(status.id, decaying_score, {
 | 
			
		||||
          "all:#{status.language}" => true,
 | 
			
		||||
          "allowed:#{status.language}" => status.trendable? && status.account.discoverable?,
 | 
			
		||||
        })
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      trim_older_items
 | 
			
		||||
      decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f))
 | 
			
		||||
 | 
			
		||||
      # Clean up localized sets by calculating the intersection with the main
 | 
			
		||||
      # set. We do this instead of just deleting the localized sets to avoid
 | 
			
		||||
      # having moments where the API returns empty results
 | 
			
		||||
      next unless decaying_score >= options[:decay_threshold]
 | 
			
		||||
 | 
			
		||||
      Trends.available_locales.each do |locale|
 | 
			
		||||
        redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
 | 
			
		||||
        redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:allowed"], aggregate: 'max')
 | 
			
		||||
      end
 | 
			
		||||
      global_items << { score: decaying_score, item: status }
 | 
			
		||||
      locale_items[status.language] << { account_id: status.account_id, score: decaying_score, item: status } if valid_locale?(status.language)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    replace_items('', global_items)
 | 
			
		||||
 | 
			
		||||
    Trends.available_locales.each do |locale|
 | 
			
		||||
      replace_items(":#{locale}", locale_items[locale])
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def filter_for_allowed_items(items)
 | 
			
		||||
    # Show only one status per account, pick the one with the highest score
 | 
			
		||||
    # that's also eligible to trend
 | 
			
		||||
 | 
			
		||||
    items.group_by { |item| item[:account_id] }.values.filter_map { |account_items| account_items.select { |item| item[:item].trendable? && item[:item].account.discoverable? }.max_by { |item| item[:score] } }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def would_be_trending?(id)
 | 
			
		||||
 
 | 
			
		||||
@@ -8,6 +8,7 @@ class Trends::Tags < Trends::Base
 | 
			
		||||
    review_threshold: 3,
 | 
			
		||||
    max_score_cooldown: 2.days.freeze,
 | 
			
		||||
    max_score_halflife: 4.hours.freeze,
 | 
			
		||||
    decay_threshold: 1,
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  def register(status, at_time = Time.now.utc)
 | 
			
		||||
@@ -26,7 +27,6 @@ class Trends::Tags < Trends::Base
 | 
			
		||||
  def refresh(at_time = Time.now.utc)
 | 
			
		||||
    tags = Tag.where(id: (recently_used_ids(at_time) + currently_trending_ids(false, -1)).uniq)
 | 
			
		||||
    calculate_scores(tags, at_time)
 | 
			
		||||
    trim_older_items
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def request_review
 | 
			
		||||
@@ -53,6 +53,8 @@ class Trends::Tags < Trends::Base
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def calculate_scores(tags, at_time)
 | 
			
		||||
    items = []
 | 
			
		||||
 | 
			
		||||
    tags.each do |tag|
 | 
			
		||||
      expected  = tag.history.get(at_time - 1.day).accounts.to_f
 | 
			
		||||
      expected  = 1.0 if expected.zero?
 | 
			
		||||
@@ -79,11 +81,16 @@ class Trends::Tags < Trends::Base
 | 
			
		||||
 | 
			
		||||
      decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
 | 
			
		||||
 | 
			
		||||
      add_to_and_remove_from_subsets(tag.id, decaying_score, {
 | 
			
		||||
        all: true,
 | 
			
		||||
        allowed: tag.trendable?,
 | 
			
		||||
      })
 | 
			
		||||
      next unless decaying_score >= options[:decay_threshold]
 | 
			
		||||
 | 
			
		||||
      items << { score: decaying_score, item: tag }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    replace_items('', items)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def filter_for_allowed_items(items)
 | 
			
		||||
    items.select { |item| item[:item].trendable? }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def would_be_trending?(id)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user