Distrubute statuses as a fan-out-on-write system, with optional precomputing
This commit is contained in:
		
							
								
								
									
										1
									
								
								Gemfile
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								Gemfile
									
									
									
									
									
								
							@@ -30,6 +30,7 @@ gem 'rails_autolink'
 | 
			
		||||
gem 'doorkeeper'
 | 
			
		||||
gem 'rabl'
 | 
			
		||||
gem 'oj'
 | 
			
		||||
gem 'redis', '~>3.2'
 | 
			
		||||
 | 
			
		||||
group :development, :test do
 | 
			
		||||
  gem 'rspec-rails'
 | 
			
		||||
 
 | 
			
		||||
@@ -208,6 +208,7 @@ GEM
 | 
			
		||||
    rake (10.5.0)
 | 
			
		||||
    rdoc (4.2.2)
 | 
			
		||||
      json (~> 1.4)
 | 
			
		||||
    redis (3.2.2)
 | 
			
		||||
    ref (2.0.0)
 | 
			
		||||
    responders (2.1.1)
 | 
			
		||||
      railties (>= 4.2.0, < 5.1)
 | 
			
		||||
@@ -328,6 +329,7 @@ DEPENDENCIES
 | 
			
		||||
  rails (= 4.2.5.1)
 | 
			
		||||
  rails_12factor
 | 
			
		||||
  rails_autolink
 | 
			
		||||
  redis (~> 3.2)
 | 
			
		||||
  rspec-rails
 | 
			
		||||
  rubocop
 | 
			
		||||
  sass-rails (~> 5.0)
 | 
			
		||||
 
 | 
			
		||||
@@ -22,10 +22,12 @@ class Api::StatusesController < ApiController
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def home
 | 
			
		||||
    @statuses = Status.where(account: [current_user.account] + current_user.account.following).order('created_at desc')
 | 
			
		||||
    feed      = Feed.new(:home, current_user.account)
 | 
			
		||||
    @statuses = feed.get(20, (params[:offset] || 0).to_i)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def mentions
 | 
			
		||||
    @statuses = Status.where(id: Mention.where(account: current_user.account).pluck(:status_id)).order('created_at desc')
 | 
			
		||||
    feed      = Feed.new(:mentions, current_user.account)
 | 
			
		||||
    @statuses = feed.get(20, (params[:offset] || 0).to_i)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
@@ -2,6 +2,7 @@ class HomeController < ApplicationController
 | 
			
		||||
  before_action :authenticate_user!
 | 
			
		||||
 | 
			
		||||
  def index
 | 
			
		||||
    @statuses = Status.where(account: ([current_user.account] + current_user.account.following)).where('reblog_of_id IS NULL OR account_id != ?', current_user.account.id).order('created_at desc')
 | 
			
		||||
    feed      = Feed.new(:home, current_user.account)
 | 
			
		||||
    @statuses = feed.get(20, (params[:offset] || 0).to_i)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										27
									
								
								app/models/feed.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								app/models/feed.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,27 @@
 | 
			
		||||
class Feed
 | 
			
		||||
  def initialize(type, account)
 | 
			
		||||
    @type    = type
 | 
			
		||||
    @account = account
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def get(limit, offset = 0)
 | 
			
		||||
    unhydrated = redis.zrevrange(key, offset, limit)
 | 
			
		||||
    status_map = Hash.new
 | 
			
		||||
 | 
			
		||||
    # If we're after most recent items and none are there, we need to precompute the feed
 | 
			
		||||
    return PrecomputeFeedService.new.(@type, @account).take(limit) if unhydrated.empty? && offset == 0
 | 
			
		||||
 | 
			
		||||
    Status.where(id: unhydrated).each { |status| status_map[status.id.to_s] = status }
 | 
			
		||||
    return unhydrated.map { |id| status_map[id] }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def key
 | 
			
		||||
    "feed:#{@type}:#{@account.id}"
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def redis
 | 
			
		||||
    $redis
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@@ -67,5 +67,6 @@ class Status < ActiveRecord::Base
 | 
			
		||||
 | 
			
		||||
  after_create do
 | 
			
		||||
    self.account.stream_entries.create!(activity: self)
 | 
			
		||||
    FanOutOnWriteService.new.(self)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										46
									
								
								app/services/fan_out_on_write_service.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								app/services/fan_out_on_write_service.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,46 @@
 | 
			
		||||
class FanOutOnWriteService < BaseService
 | 
			
		||||
  MAX_FEED_SIZE = 800
 | 
			
		||||
 | 
			
		||||
  # Push a status into home and mentions feeds
 | 
			
		||||
  # @param [Status] status
 | 
			
		||||
  def call(status)
 | 
			
		||||
    replied_to_user = status.reply? ? status.thread.account : nil
 | 
			
		||||
 | 
			
		||||
    # Deliver to local self
 | 
			
		||||
    push(:home, status.account.id, status) if status.account.local?
 | 
			
		||||
 | 
			
		||||
    # Deliver to local followers
 | 
			
		||||
    status.account.followers.each do |follower|
 | 
			
		||||
      next if (status.reply? && !follower.following?(replied_to_user)) || !follower.local?
 | 
			
		||||
      push(:home, follower.id, status)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    # Deliver to local mentioned
 | 
			
		||||
    status.mentions.each do |mentioned_account|
 | 
			
		||||
      next unless mentioned_account.local?
 | 
			
		||||
      push(:mentions, mentioned_account.id, status)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def push(type, receiver_id, status)
 | 
			
		||||
    redis.zadd(key(type, receiver_id), status.created_at.to_i, status.id)
 | 
			
		||||
    trim(type, receiver_id)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def trim(type, receiver_id)
 | 
			
		||||
    return unless redis.zcard(key(type, receiver_id)) > MAX_FEED_SIZE
 | 
			
		||||
 | 
			
		||||
    last = redis.zrevrange(key(type, receiver_id), MAX_FEED_SIZE - 1, MAX_FEED_SIZE - 1)
 | 
			
		||||
    redis.zremrangebyscore(key(type, receiver_id), '-inf', "(#{last.last}")
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def key(type, id)
 | 
			
		||||
    "feed:#{type}:#{id}"
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def redis
 | 
			
		||||
    $redis
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										35
									
								
								app/services/precompute_feed_service.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								app/services/precompute_feed_service.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,35 @@
 | 
			
		||||
class PrecomputeFeedService < BaseService
 | 
			
		||||
  MAX_FEED_SIZE = 800
 | 
			
		||||
 | 
			
		||||
  # Fill up a user's home/mentions feed from DB and return it
 | 
			
		||||
  # @param [Symbol] type :home or :mentions
 | 
			
		||||
  # @param [Account] account
 | 
			
		||||
  # @return [Array]
 | 
			
		||||
  def call(type, account)
 | 
			
		||||
    statuses = send(type.to_s, account).order('created_at desc').limit(MAX_FEED_SIZE)
 | 
			
		||||
    statuses.each { |status| push(type, account.id, status) }
 | 
			
		||||
    statuses
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def push(type, receiver_id, status)
 | 
			
		||||
    redis.zadd(key(type, receiver_id), status.created_at.to_i, status.id)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def home(account)
 | 
			
		||||
    Status.where(account: [account] + account.following)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def mentions(account)
 | 
			
		||||
    Status.where(id: Mention.where(account: account).pluck(:status_id))
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def key(type, id)
 | 
			
		||||
    "feed:#{type}:#{id}"
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def redis
 | 
			
		||||
    $redis
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										1
									
								
								config/initializers/redis.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								config/initializers/redis.rb
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1 @@
 | 
			
		||||
$redis = Redis.new(host: ENV['REDIS_HOST'] || 'localhost', port: ENV['REDIS_PORT'] || 6379)
 | 
			
		||||
		Reference in New Issue
	
	Block a user