Skip to content

Commit

Permalink
Merge pull request #2 from powerhome/redis-streams-subscribe
Browse files Browse the repository at this point in the history
Eliminates Redis PubSub from Streams implementation
  • Loading branch information
benlangfeld authored Nov 8, 2018
2 parents baf8e39 + e8adbec commit 9a47c0a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 80 deletions.
131 changes: 52 additions & 79 deletions lib/message_bus/backends/redis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ def after_fork
pub_redis.disconnect!
end

def redis_channel_name
db = @redis_config[:db] || 0
"_message_bus_#{db}"
end

# redis connection used for publishing messages
def pub_redis
@pub_redis ||= new_redis_connection
Expand All @@ -78,6 +73,14 @@ def global_backlog_key
"__mb_global_backlogstream_n"
end

def subscription_key(id)
"__mb_subscription_n_#{id}"
end

def unsubscribe_key
"__mb_unsubscribe_n"
end

# use with extreme care, will nuke all of the data
def reset!
pub_redis.keys("__mb_*").each do |k|
Expand All @@ -104,7 +107,6 @@ def expire_all_backlogs!
local backlog_id_key = KEYS[2]
local backlog_key = KEYS[3]
local global_backlog_key = KEYS[4]
local redis_channel_name = KEYS[5]
local global_id = redis.call("INCR", global_id_key)
local backlog_id = redis.call("INCR", backlog_id_key)
Expand All @@ -117,8 +119,6 @@ def expire_all_backlogs!
redis.call("XADD", global_backlog_key, "MAXLEN", max_global_backlog_size, string.format("0-%i", global_id), "payload", global_backlog_message)
redis.call("EXPIRE", global_backlog_key, max_backlog_age)
redis.call("PUBLISH", redis_channel_name, payload)
return backlog_id
LUA
Expand Down Expand Up @@ -152,8 +152,7 @@ def publish(channel, data, opts = nil)
global_id_key,
backlog_id_key,
backlog_key,
global_backlog_key,
redis_channel_name
global_backlog_key
]
)
rescue Redis::CommandError => e
Expand Down Expand Up @@ -249,11 +248,7 @@ def global_backlog(last_id = nil)
items = redis.xrange global_backlog_key, start, "+"

items.map! do |_id, (_, payload)|
pipe = payload.index "|"
message_id = payload[0..pipe].to_i
channel = payload[pipe + 1..-1]
m = get_message(channel, message_id)
m
message_from_global_backlog(payload)
end

items.compact!
Expand Down Expand Up @@ -292,103 +287,81 @@ def subscribe(channel, last_id = nil)
end
end

def process_global_backlog(highest_id, raise_error)
if highest_id > pub_redis.get(global_id_key).to_i
highest_id = 0
end

global_backlog(highest_id).each do |old|
if highest_id + 1 == old.global_id
yield old
highest_id = old.global_id
else
raise BackLogOutOfOrder.new(highest_id) if raise_error

if old.global_id > highest_id
yield old
highest_id = old.global_id
end
end
end

highest_id
end

def global_unsubscribe
if @redis_global
# new connection to avoid deadlock
new_redis_connection.publish(redis_channel_name, UNSUB_MESSAGE)
@redis_global.disconnect
@redis_global = nil
new_redis_connection.xadd(unsubscribe_key, "*", UNSUB_MESSAGE, true)
@redis_global.quit
end
end

def global_subscribe(last_id = nil, &blk)
def global_subscribe(last_id = nil)
raise ArgumentError unless block_given?

highest_id = last_id

clear_backlog = lambda do
retries = 4
begin
highest_id = process_global_backlog(highest_id, retries > 0, &blk)
rescue BackLogOutOfOrder => e
highest_id = e.highest_id
retries -= 1
sleep(rand(50) / 1000.0)
retry
end
end

begin
@redis_global = new_redis_connection

if highest_id
clear_backlog.call(&blk)
end
last_unsubscribe_seen, = @redis_global.xrevrange(unsubscribe_key, "+", "-", "COUNT", "1").last

@redis_global.subscribe(redis_channel_name) do |on|
on.subscribe do
if highest_id
clear_backlog.call(&blk)
end
@subscribed = true
unless highest_id
last_message_id, = @redis_global.xrevrange(global_backlog_key, "+", "-", "COUNT", "1").last
if last_message_id
highest_id = last_message_id.split("-").last.to_i
end
end

subscription_id = SecureRandom.uuid
@redis_global.setex(subscription_key(subscription_id), 5, true)

on.unsubscribe do
@subscribed = false
@subscribed = true

loop do
# If Redis doesn't know about our subscription any more, the stream might also have been reset.
# Assume that our stream pointer is no good and start reading the stream from the beginning again.
unless @redis_global.get(subscription_key(subscription_id)) == "true"
@logger.warn "Subscription #{subscription_id} expired. Reading full backlog."
highest_id = nil
end
@redis_global.setex(subscription_key(subscription_id), 5, true)

on.message do |_c, m|
if m == UNSUB_MESSAGE
@redis_global.unsubscribe
return
end
m = MessageBus::Message.decode m
start = highest_id ? "0-#{highest_id}" : 0

response = @redis_global.xread("BLOCK", 1000, "STREAMS", global_backlog_key, unsubscribe_key, start || 0, last_unsubscribe_seen || 0)
next unless response

stream, messages = response.first

# we have 3 options
#
# 1. message came in the correct order GREAT, just deal with it
# 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
# 3. message came in the incorrect order and is lowest than current highest id, reset
return if stream == unsubscribe_key

if highest_id.nil? || m.global_id == highest_id + 1
messages.each do |_id, (_, payload)|
m = message_from_global_backlog(payload)

if m && (highest_id.nil? || m.global_id > highest_id)
highest_id = m.global_id
yield m
else
clear_backlog.call(&blk)
end
end
end
rescue => error
@logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace}"
@logger.warn "#{error.class}: #{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace}"
sleep 1
retry
ensure
@subscribed = false
end
end

private

def message_from_global_backlog(payload)
pipe = payload.index "|"
message_id = payload[0..pipe].to_i
channel = payload[pipe + 1..-1]
get_message(channel, message_id)
end

def cached_eval(redis, script, script_sha1, params)
begin
redis.evalsha script_sha1, params
Expand Down
4 changes: 3 additions & 1 deletion spec/lib/message_bus/backend_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,13 @@ def new_test_bus

@bus.publish("/foo", "two")

wait_for(100) { got.length == 1 }

@bus.reset!

@bus.publish("/foo", "three")

wait_for(100) do
wait_for(2000) do
got.length == 2
end

Expand Down

0 comments on commit 9a47c0a

Please sign in to comment.