Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix trim add xread #190

Merged
merged 3 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions lib/mock_redis/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,25 @@ def last_id

def add(id, values)
@last_id = MockRedis::Stream::Id.new(id, min: @last_id)
if @last_id.to_s == '0-0'
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
'the target stream top item'
# TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message:
# 'ERR The ID specified in XADD must be greater than 0-0'
end
members.add [@last_id, Hash[values.map { |k, v| [k.to_s, v.to_s] }]]
@last_id.to_s
end

def trim(count)
deleted = @members.size - count
@members = @members.to_a[-count..-1].to_set
deleted
if deleted > 0
@members = @members.to_a[-count..-1].to_set
deleted
else
0
end
end

def range(start, finish, reversed, *opts_in)
Expand All @@ -45,6 +56,11 @@ def range(start, finish, reversed, *opts_in)
items
end

def read(id)
stream_id = MockRedis::Stream::Id.new(id)
members.select { |m| (stream_id < m[0]) }.map { |m| [m[0].to_s, m[1]] }
end

def each
members.each { |m| yield m }
end
Expand Down
7 changes: 0 additions & 7 deletions lib/mock_redis/stream/id.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ def initialize(id, min: nil, sequence: 0)
@timestamp = id
end
@sequence = @sequence.nil? ? sequence : @sequence.to_i
if @timestamp == 0 && @sequence == 0
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
'the target stream top item'
# TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message:
# 'ERR The ID specified in XADD must be greater than 0-0'
end
if self <= min
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
Expand Down
15 changes: 14 additions & 1 deletion lib/mock_redis/stream_methods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

# TODO: Implement the following commands
#
# * xread
# * xgroup
# * xreadgroup
# * xack
Expand Down Expand Up @@ -67,6 +66,20 @@ def xrevrange(key, last = '+', first = '-', count: nil)
end
end

# TODO: Implement count and block parameters
def xread(keys, ids)
result = {}
keys = keys.is_a?(Array) ? keys : [keys]
ids = ids.is_a?(Array) ? ids : [ids]
keys.each_with_index do |key, index|
with_stream_at(key) do |stream|
data = stream.read(ids[index])
result[key] = data unless data.empty?
end
end
result
end

private

def with_stream_at(key, &blk)
Expand Down
11 changes: 11 additions & 0 deletions spec/commands/xadd_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,15 @@
]
)
end

it 'supports a maxlen greater than the current size' do
@redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-0')
@redises.xadd(@key, { key2: 'value2' }, id: '1234567891245-0', maxlen: 1000)
expect(@redises.xrange(@key, '-', '+')).to eq(
[
['1234567891234-0', { 'key1' => 'value1' }],
['1234567891245-0', { 'key2' => 'value2' }],
]
)
end
end
13 changes: 13 additions & 0 deletions spec/commands/xrange_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@
)
end

it 'returns all entries with a lower limit of 0-0' do
expect(@redises.xrange(@key, '0-0', '+')).to eq(
[
['1234567891234-0', { 'key1' => 'value1' }],
['1234567891245-0', { 'key2' => 'value2' }],
['1234567891245-1', { 'key3' => 'value3' }],
['1234567891278-0', { 'key4' => 'value4' }],
['1234567891278-1', { 'key5' => 'value5' }],
['1234567891299-0', { 'key6' => 'value6' }]
]
)
end

it 'returns entries with an upper limit' do
expect(@redises.xrange(@key, '-', '1234567891285-0')).to eq(
[
Expand Down
50 changes: 50 additions & 0 deletions spec/commands/xread_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
require 'spec_helper'

describe '#xread(keys, ids)' do
before :all do
sleep 1 - (Time.now.to_f % 1)
@key = 'mock-redis-test:xread'
@key1 = 'mock-redis-test:xread1'
end

it 'reads a single entry' do
@redises.xadd(@key, { key: 'value' }, id: '1234567891234-0')
expect(@redises.xread(@key, '0-0'))
.to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]] })
end

it 'reads multiple entries from the beginning of the stream' do
@redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1')
expect(@redises.xread(@key, '0-0'))
.to eq({ @key => [['1234567891234-0', { 'key0' => 'value0' }],
['1234567891234-1', { 'key1' => 'value1' }]] })
end

it 'reads entries greater than the ID passed' do
@redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1')
expect(@redises.xread(@key, '1234567891234-0'))
.to eq({ @key => [['1234567891234-1', { 'key1' => 'value1' }]] })
end

it 'reads from multiple streams' do
@redises.xadd(@key, { key: 'value' }, id: '1234567891234-0')
@redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-0')
expect(@redises.xread([@key, @key1], %w[0-0 0-0]))
.to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]],
@key1 => [['1234567891234-0', { 'key1' => 'value1' }]] })
end

it 'reads from multiple streams at the given IDs' do
@redises.xadd(@key, { key: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key, { key: 'value1' }, id: '1234567891234-1')
@redises.xadd(@key, { key: 'value2' }, id: '1234567891234-2')
@redises.xadd(@key1, { key1: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-1')
@redises.xadd(@key1, { key1: 'value2' }, id: '1234567891234-2')
# The first stream won't return anything since we specify the last ID
expect(@redises.xread([@key, @key1], %w[1234567891234-2 1234567891234-1]))
.to eq({ @key1 => [['1234567891234-2', { 'key1' => 'value2' }]] })
end
end
6 changes: 6 additions & 0 deletions spec/commands/xtrim_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
expect(@redises.xtrim(@key, 4)).to eq 2
end

it 'returns 0 if count is greater than size' do
initial = @redises.xrange(@key, '-', '+')
expect(@redises.xtrim(@key, 1000)).to eq 0
expect(@redises.xrange(@key, '-', '+')).to eql(initial)
end

it 'deletes the oldes elements' do
@redises.xtrim(@key, 4)
expect(@redises.xrange(@key, '-', '+')).to eq(
Expand Down