Fix trend calculation working on too many items at a time (#25835)
This commit is contained in:
		@@ -3,6 +3,8 @@
 | 
				
			|||||||
class Trends::Links < Trends::Base
 | 
					class Trends::Links < Trends::Base
 | 
				
			||||||
  PREFIX = 'trending_links'
 | 
					  PREFIX = 'trending_links'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  BATCH_SIZE = 100
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  self.default_options = {
 | 
					  self.default_options = {
 | 
				
			||||||
    threshold: 5,
 | 
					    threshold: 5,
 | 
				
			||||||
    review_threshold: 3,
 | 
					    review_threshold: 3,
 | 
				
			||||||
@@ -67,10 +69,23 @@ class Trends::Links < Trends::Base
 | 
				
			|||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def refresh(at_time = Time.now.utc)
 | 
					  def refresh(at_time = Time.now.utc)
 | 
				
			||||||
    preview_cards = PreviewCard.where(id: (recently_used_ids(at_time) + PreviewCardTrend.pluck(:preview_card_id)).uniq)
 | 
					    # First, recalculate scores for links that were trending previously. We split the queries
 | 
				
			||||||
 | 
					    # to avoid having to load all of the IDs into Ruby just to send them back into Postgres
 | 
				
			||||||
 | 
					    PreviewCard.where(id: PreviewCardTrend.select(:preview_card_id)).find_in_batches(batch_size: BATCH_SIZE) do |preview_cards|
 | 
				
			||||||
      calculate_scores(preview_cards, at_time)
 | 
					      calculate_scores(preview_cards, at_time)
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Then, calculate scores for links that were used today. There are potentially some
 | 
				
			||||||
 | 
					    # duplicate items here that we might process one more time, but that should be fine
 | 
				
			||||||
 | 
					    PreviewCard.where(id: recently_used_ids(at_time)).find_in_batches(batch_size: BATCH_SIZE) do |preview_cards|
 | 
				
			||||||
 | 
					      calculate_scores(preview_cards, at_time)
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Now that all trends have up-to-date scores, and all the ones below the threshold have
 | 
				
			||||||
 | 
					    # been removed, we can recalculate their positions
 | 
				
			||||||
 | 
					    PreviewCardTrend.connection.exec_update('UPDATE preview_card_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM preview_card_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE preview_card_trends.id = t0.id')
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def request_review
 | 
					  def request_review
 | 
				
			||||||
    PreviewCardTrend.pluck('distinct language').flat_map do |language|
 | 
					    PreviewCardTrend.pluck('distinct language').flat_map do |language|
 | 
				
			||||||
      score_at_threshold  = PreviewCardTrend.where(language: language, allowed: true).order(rank: :desc).where('rank <= ?', options[:review_threshold]).first&.score || 0
 | 
					      score_at_threshold  = PreviewCardTrend.where(language: language, allowed: true).order(rank: :desc).where('rank <= ?', options[:review_threshold]).first&.score || 0
 | 
				
			||||||
@@ -139,10 +154,7 @@ class Trends::Links < Trends::Base
 | 
				
			|||||||
    to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
 | 
					    to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
 | 
				
			||||||
    to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
 | 
					    to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    PreviewCardTrend.transaction do
 | 
					 | 
				
			||||||
    PreviewCardTrend.upsert_all(to_insert.map { |(score, preview_card)| { preview_card_id: preview_card.id, score: score, language: preview_card.language, allowed: preview_card.trendable? || false } }, unique_by: :preview_card_id) if to_insert.any?
 | 
					    PreviewCardTrend.upsert_all(to_insert.map { |(score, preview_card)| { preview_card_id: preview_card.id, score: score, language: preview_card.language, allowed: preview_card.trendable? || false } }, unique_by: :preview_card_id) if to_insert.any?
 | 
				
			||||||
    PreviewCardTrend.where(preview_card_id: to_delete.map { |(_, preview_card)| preview_card.id }).delete_all if to_delete.any?
 | 
					    PreviewCardTrend.where(preview_card_id: to_delete.map { |(_, preview_card)| preview_card.id }).delete_all if to_delete.any?
 | 
				
			||||||
      PreviewCardTrend.connection.exec_update('UPDATE preview_card_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM preview_card_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE preview_card_trends.id = t0.id')
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,6 +3,8 @@
 | 
				
			|||||||
class Trends::Statuses < Trends::Base
 | 
					class Trends::Statuses < Trends::Base
 | 
				
			||||||
  PREFIX = 'trending_statuses'
 | 
					  PREFIX = 'trending_statuses'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  BATCH_SIZE = 100
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  self.default_options = {
 | 
					  self.default_options = {
 | 
				
			||||||
    threshold: 5,
 | 
					    threshold: 5,
 | 
				
			||||||
    review_threshold: 3,
 | 
					    review_threshold: 3,
 | 
				
			||||||
@@ -58,10 +60,23 @@ class Trends::Statuses < Trends::Base
 | 
				
			|||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def refresh(at_time = Time.now.utc)
 | 
					  def refresh(at_time = Time.now.utc)
 | 
				
			||||||
    statuses = Status.where(id: (recently_used_ids(at_time) + StatusTrend.pluck(:status_id)).uniq).includes(:status_stat, :account)
 | 
					    # First, recalculate scores for statuses that were trending previously. We split the queries
 | 
				
			||||||
 | 
					    # to avoid having to load all of the IDs into Ruby just to send them back into Postgres
 | 
				
			||||||
 | 
					    Status.where(id: StatusTrend.select(:status_id)).includes(:status_stat, :account).find_in_batches(batch_size: BATCH_SIZE) do |statuses|
 | 
				
			||||||
      calculate_scores(statuses, at_time)
 | 
					      calculate_scores(statuses, at_time)
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Then, calculate scores for statuses that were used today. There are potentially some
 | 
				
			||||||
 | 
					    # duplicate items here that we might process one more time, but that should be fine
 | 
				
			||||||
 | 
					    Status.where(id: recently_used_ids(at_time)).includes(:status_stat, :account).find_in_batches(batch_size: BATCH_SIZE) do |statuses|
 | 
				
			||||||
 | 
					      calculate_scores(statuses, at_time)
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Now that all trends have up-to-date scores, and all the ones below the threshold have
 | 
				
			||||||
 | 
					    # been removed, we can recalculate their positions
 | 
				
			||||||
 | 
					    StatusTrend.connection.exec_update('UPDATE status_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM status_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE status_trends.id = t0.id')
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def request_review
 | 
					  def request_review
 | 
				
			||||||
    StatusTrend.pluck('distinct language').flat_map do |language|
 | 
					    StatusTrend.pluck('distinct language').flat_map do |language|
 | 
				
			||||||
      score_at_threshold = StatusTrend.where(language: language, allowed: true).order(rank: :desc).where('rank <= ?', options[:review_threshold]).first&.score || 0
 | 
					      score_at_threshold = StatusTrend.where(language: language, allowed: true).order(rank: :desc).where('rank <= ?', options[:review_threshold]).first&.score || 0
 | 
				
			||||||
@@ -117,10 +132,7 @@ class Trends::Statuses < Trends::Base
 | 
				
			|||||||
    to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
 | 
					    to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
 | 
				
			||||||
    to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
 | 
					    to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    StatusTrend.transaction do
 | 
					 | 
				
			||||||
    StatusTrend.upsert_all(to_insert.map { |(score, status)| { status_id: status.id, account_id: status.account_id, score: score, language: status.language, allowed: status.trendable? || false } }, unique_by: :status_id) if to_insert.any?
 | 
					    StatusTrend.upsert_all(to_insert.map { |(score, status)| { status_id: status.id, account_id: status.account_id, score: score, language: status.language, allowed: status.trendable? || false } }, unique_by: :status_id) if to_insert.any?
 | 
				
			||||||
    StatusTrend.where(status_id: to_delete.map { |(_, status)| status.id }).delete_all if to_delete.any?
 | 
					    StatusTrend.where(status_id: to_delete.map { |(_, status)| status.id }).delete_all if to_delete.any?
 | 
				
			||||||
      StatusTrend.connection.exec_update('UPDATE status_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM status_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE status_trends.id = t0.id')
 | 
					 | 
				
			||||||
    end
 | 
					 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user