Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis streams native ids #3

Open
wants to merge 8 commits into
base: redis-streams
Choose a base branch
from
4 changes: 0 additions & 4 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ Layout/MultilineMethodCallIndentation:
Bundler/OrderedGems:
Enabled: false

# Redis backend is quite big
Metrics/ClassLength:
Max: 310

Metrics/BlockLength:
Exclude:
- '**/spec_helper.rb'
Expand Down
36 changes: 17 additions & 19 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This configuration was generated by
# `rubocop --auto-gen-config`
# on 2018-12-03 15:46:19 +0000 using RuboCop version 0.60.0.
# on 2018-12-04 22:25:14 +0000 using RuboCop version 0.60.0.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
Expand Down Expand Up @@ -81,26 +81,31 @@ Metrics/AbcSize:
Metrics/BlockLength:
Max: 29

# Offense count: 4
# Offense count: 3
# Configuration parameters: CountBlocks.
Metrics/BlockNesting:
Max: 4

# Offense count: 21
# Offense count: 10
# Configuration parameters: CountComments.
Metrics/ClassLength:
Max: 315

# Offense count: 20
Metrics/CyclomaticComplexity:
Max: 30

# Offense count: 44
# Offense count: 46
# Configuration parameters: CountComments, ExcludedMethods.
Metrics/MethodLength:
Max: 105

# Offense count: 1
# Configuration parameters: CountComments.
Metrics/ModuleLength:
Max: 419
Max: 395

# Offense count: 21
# Offense count: 20
Metrics/PerceivedComplexity:
Max: 34

Expand Down Expand Up @@ -324,12 +329,12 @@ Style/GuardClause:
- 'lib/message_bus/client.rb'
- 'lib/message_bus/rack/middleware.rb'

# Offense count: 2
# Offense count: 1
Style/IfInsideElse:
Exclude:
- 'lib/message_bus.rb'

# Offense count: 28
# Offense count: 26
# Cop supports --auto-correct.
Style/IfUnlessModifier:
Exclude:
Expand Down Expand Up @@ -404,7 +409,7 @@ Style/NilComparison:
Style/NumericLiterals:
MinDigits: 15

# Offense count: 16
# Offense count: 17
# Cop supports --auto-correct.
# Configuration parameters: AutoCorrect, EnforcedStyle, IgnoredMethods.
# SupportedStyles: predicate, comparison
Expand All @@ -420,13 +425,6 @@ Style/NumericPredicate:
- 'lib/message_bus/rack/middleware.rb'
- 'lib/message_bus/timer_thread.rb'

# Offense count: 1
# Cop supports --auto-correct.
# Configuration parameters: AllowSafeAssignment, AllowInMultilineConditions.
Style/ParenthesesAroundCondition:
Exclude:
- 'lib/message_bus.rb'

# Offense count: 3
# Cop supports --auto-correct.
# Configuration parameters: PreferredDelimiters.
Expand Down Expand Up @@ -553,7 +551,7 @@ Style/SpecialGlobalVars:
- 'message_bus.gemspec'
- 'spec/spec_helper.rb'

# Offense count: 682
# Offense count: 732
# Cop supports --auto-correct.
# Configuration parameters: EnforcedStyle, ConsistentQuotesInMultiline.
# SupportedStyles: single_quotes, double_quotes
Expand Down Expand Up @@ -591,7 +589,7 @@ Style/SymbolProc:
- 'spec/lib/message_bus/backend_spec.rb'
- 'spec/lib/message_bus_spec.rb'

# Offense count: 2
# Offense count: 4
# Cop supports --auto-correct.
# Configuration parameters: EnforcedStyleForMultiline.
# SupportedStylesForMultiline: comma, consistent_comma, no_comma
Expand Down Expand Up @@ -662,7 +660,7 @@ Style/ZeroLengthPredicate:
- 'lib/message_bus/rack/middleware.rb'
- 'spec/lib/message_bus_spec.rb'

# Offense count: 231
# Offense count: 241
# Configuration parameters: AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, IgnoredPatterns.
# URISchemes: http, https
Metrics/LineLength:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Unreleased

- CHANGE: No longer reports messages' `global_id` to Javascript subscribers
- CHANGE: Stops providing usable `global_id` unless messages come from global backlog. Will now report -1 any time a message comes directly from a specific channel.

30-11-2018

- Version 2.2.0.pre.1
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,21 +533,19 @@ The format for delivered messages is a JSON array of message objects like so:
```json
[
{
"global_id": 12,
"message_id": 1,
"channel": "/some/channel/name",
"data": [the message as published]
}
]
```

The `global_id` field here indicates the ID of the message in the global backlog, while the `message_id` is the ID of the message in the channel-specific backlog. The ID used for subscriptions is always the channel-specific one.
The `message_id` is the ID of the message in the channel-specific backlog, which is used for subscriptions.

In certain conditions, a status message will be delivered and look like this:

```json
{
"global_id": -1,
"message_id": -1,
"channel": "/__status",
"data": {
Expand Down
2 changes: 1 addition & 1 deletion assets/message-bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
if (callback.channel === message.channel) {
callback.last_id = message.message_id;
try {
callback.func(message.data, message.global_id, message.message_id);
callback.func(message.data, message.message_id);
}
catch(e){
if(console.log) {
Expand Down
51 changes: 13 additions & 38 deletions lib/message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,15 @@ def global_backlog(last_id = nil)
# @param [String] channel the name of the channel in question
# @param [#to_i] last_id the channel-specific ID of the last message that the caller received on the specified channel
# @param [String] site_id the ID of the site by which to filter
# @param [Boolean] inclusive whether or not to incluse the message with specified `last_id` in the results
#
# @return [Array<MessageBus::Message>] all messages published to the specified channel since the specified last ID
def backlog(channel = nil, last_id = nil, site_id = nil)
def backlog(channel = nil, last_id = nil, site_id = nil, inclusive: false)
old =
if channel
reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id)
reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id, inclusive: inclusive)
else
reliable_pub_sub.global_backlog(last_id)
reliable_pub_sub.global_backlog(last_id, inclusive: inclusive)
end

old.each do |m|
Expand All @@ -479,10 +480,9 @@ def last_id(channel, site_id = nil)
# @return [MessageBus::Message] the last message published to the given channel
def last_message(channel)
if last_id = last_id(channel)
messages = backlog(channel, last_id - 1)
if messages
messages[0]
end
message = reliable_pub_sub.get_message(encode_channel_name(channel, nil), last_id)
decode_message!(message) if message
message
end
end

Expand Down Expand Up @@ -588,41 +588,16 @@ def decode_message!(msg)
msg.client_ids = parsed["client_ids"]
end

def replay_backlog(channel, last_id, site_id)
id = nil

backlog(channel, last_id, site_id).each do |m|
yield m
id = m.message_id
end

id
end

def subscribe_impl(channel, site_id, last_id, &blk)
raise MessageBus::BusDestroyed if @destroyed

if last_id >= 0
# this gets a bit tricky, but we got to ensure ordering so we wrap the block
original_blk = blk
current_id = replay_backlog(channel, last_id, site_id, &blk)
just_yield = false

# we double check to ensure no messages snuck through while we were subscribing
blk = proc do |m|
if just_yield
original_blk.call m
else
if current_id && current_id == (m.message_id - 1)
original_blk.call m
just_yield = true
else
current_id = replay_backlog(channel, current_id, site_id, &original_blk)
if (current_id == m.message_id)
just_yield = true
end
end
end
existing_messages = backlog(channel, last_id, site_id, inclusive: true)
if existing_messages.empty?
raise ArgumentError, "You tried to subscribe starting from a message that doesn't exist yet."
end
existing_messages.each do |m|
yield m
end
end

Expand Down
15 changes: 10 additions & 5 deletions lib/message_bus/backends/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,21 @@ def last_id(channel)
#
# @param [String] channel the name of the channel in question
# @param [#to_i] last_id the channel-specific ID of the last message that the caller received on the specified channel
# @param [Boolean] inclusive whether or not to incluse the message with specified `last_id` in the results
#
# @return [Array<MessageBus::Message>] all messages published to the specified channel since the specified last ID
def backlog(channel, last_id = 0)
# @return [Array<MessageBus::Message>] all messages published to the specified channel since the specified last ID.
# The messages will have a `global_id` of `-1` to indicate that they came directly from a channel.
def backlog(channel, last_id = 0, inclusive: false)
raise ConcreteClassMustImplementError
end

# Get messages from the global backlog
#
# @param [#to_i] last_id the global ID of the last message that the caller received
# @param [Boolean] inclusive whether or not to incluse the message with specified `last_id` in the results
#
# @return [Array<MessageBus::Message>] all messages published on any channel since the specified last ID
def global_backlog(last_id = 0)
def global_backlog(last_id = 0, inclusive: false)
raise ConcreteClassMustImplementError
end

Expand All @@ -139,7 +142,8 @@ def global_backlog(last_id = 0)
# @param [String] channel the name of the channel in question
# @param [Integer] message_id the channel-specific ID of the message required
#
# @return [MessageBus::Message, nil] the requested message, or nil if it does not exist
# @return [MessageBus::Message, nil] the requested message, or nil if it does not exist.
# The message will have a `global_id` of `-1` to indicate that it came directly from a channel.
def get_message(channel, message_id)
raise ConcreteClassMustImplementError
end
Expand All @@ -152,7 +156,8 @@ def get_message(channel, message_id)
# @param [#to_i] last_id the channel-specific ID of the last message that the caller received on the specified channel
#
# @yield [message] a message-handler block
# @yieldparam [MessageBus::Message] message each message as it is delivered
# @yieldparam [MessageBus::Message] message each message as it is delivered.
# The message will have a `global_id` of `-1` to indicate that it came directly from a channel.
#
# @return [nil]
def subscribe(channel, last_id = nil)
Expand Down
45 changes: 33 additions & 12 deletions lib/message_bus/backends/memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,38 @@ def clear_channel_backlog(channel, backlog_id, num_to_keep)
nil
end

def backlog(channel, backlog_id)
sync { chan(channel).backlog.select { |id, _| id > backlog_id } }
def backlog(channel, backlog_id, inclusive:)
sync do
chan(channel).backlog.select do |id, _|
if inclusive
id >= backlog_id
else
id > backlog_id
end
end
end
end

def global_backlog(backlog_id)
def global_backlog(backlog_id, inclusive:)
sync do
@channels.dup.flat_map do |channel_name, channel|
channel.backlog.select { |id, _| id > backlog_id }.map { |id, value| [id, channel_name, value] }
relevant_backlog = channel.backlog.select do |id, _|
if inclusive
id >= backlog_id
else
id > backlog_id
end
end
relevant_backlog.map { |id, value| [id, channel_name, value] }
end.sort
end
end

def get_value(channel, id)
sync { chan(channel).backlog.find { |i, _| i == id }[1] }
sync do
id, value = chan(channel).backlog.find { |i, _| i == id }
value
end
end

# Dangerous, drops the message_bus table containing the backlog if it exists.
Expand Down Expand Up @@ -241,17 +259,17 @@ def last_id(channel)
end

# (see Base#backlog)
def backlog(channel, last_id = 0)
items = client.backlog channel, last_id.to_i
def backlog(channel, last_id = 0, inclusive: false)
items = client.backlog channel, last_id.to_i, inclusive: inclusive

items.map! do |id, data|
MessageBus::Message.new id, id, channel, data
MessageBus::Message.new(-1, id, channel, data)
end
end

# (see Base#global_backlog)
def global_backlog(last_id = 0)
items = client.global_backlog last_id.to_i
def global_backlog(last_id = 0, inclusive: false)
items = client.global_backlog last_id.to_i, inclusive: inclusive

items.map! do |id, channel, data|
MessageBus::Message.new id, id, channel, data
Expand All @@ -261,7 +279,7 @@ def global_backlog(last_id = 0)
# (see Base#get_message)
def get_message(channel, message_id)
if data = client.get_value(channel, message_id)
MessageBus::Message.new message_id, message_id, channel, data
MessageBus::Message.new(-1, message_id, channel, data)
else
nil
end
Expand All @@ -274,7 +292,10 @@ def subscribe(channel, last_id = nil)
raise ArgumentError unless block_given?

global_subscribe(last_id) do |m|
yield m if m.channel == channel
if m.channel == channel
m.global_id = -1
yield m
end
end
end

Expand Down
Loading