From 01aa053cea127aec82bccb48b173fb08cc06188b Mon Sep 17 00:00:00 2001 From: "Thomas, Jason" Date: Thu, 18 Jun 2020 20:01:05 -0600 Subject: [PATCH 1/3] Fix trim and add xread --- lib/mock_redis/stream.rb | 13 +++++++++++-- lib/mock_redis/stream_methods.rb | 12 ++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/lib/mock_redis/stream.rb b/lib/mock_redis/stream.rb index 4579ef0a..4d03236b 100644 --- a/lib/mock_redis/stream.rb +++ b/lib/mock_redis/stream.rb @@ -29,8 +29,12 @@ def add(id, values) def trim(count) deleted = @members.size - count - @members = @members.to_a[-count..-1].to_set - deleted + if deleted > 0 + @members = @members.to_a[-count..-1].to_set + deleted + else + 0 + end end def range(start, finish, reversed, *opts_in) @@ -45,6 +49,11 @@ def range(start, finish, reversed, *opts_in) items end + def read(id) + stream_id = MockRedis::Stream::Id.new(id) + members.select { |m| (stream_id <= m[0]) }.map { |m| [m[0].to_s, m[1]] } + end + def each members.each { |m| yield m } end diff --git a/lib/mock_redis/stream_methods.rb b/lib/mock_redis/stream_methods.rb index 2f4d2498..ab65dcf4 100644 --- a/lib/mock_redis/stream_methods.rb +++ b/lib/mock_redis/stream_methods.rb @@ -67,6 +67,18 @@ def xrevrange(key, last = '+', first = '-', count: nil) end end + def xread(keys, ids, count: nil, block: nil) + result = {} + keys = keys.is_a?(Array) ? keys : [keys] + ids = ids.is_a?(Array) ? ids : [ids] + keys.each_with_index do |key, index| + with_stream_at(key) do |stream| + result[key] = stream.read(ids[index]) + end + end + result + end + private def with_stream_at(key, &blk) From fd57c415c2c3912dc9e935c3d3f81e1238a8a45b Mon Sep 17 00:00:00 2001 From: "Thomas, Jason" Date: Fri, 19 Jun 2020 09:07:46 -0600 Subject: [PATCH 2/3] Initial xread implementation --- lib/mock_redis/stream.rb | 9 +++++- lib/mock_redis/stream/id.rb | 7 ----- lib/mock_redis/stream_methods.rb | 7 +++-- spec/commands/xrange_spec.rb | 13 +++++++++ spec/commands/xread_spec.rb | 50 ++++++++++++++++++++++++++++++++ 5 files changed, 75 insertions(+), 11 deletions(-) create mode 100644 spec/commands/xread_spec.rb diff --git a/lib/mock_redis/stream.rb b/lib/mock_redis/stream.rb index 4d03236b..05b3f269 100644 --- a/lib/mock_redis/stream.rb +++ b/lib/mock_redis/stream.rb @@ -23,6 +23,13 @@ def last_id def add(id, values) @last_id = MockRedis::Stream::Id.new(id, min: @last_id) + if @last_id.to_s == '0-0' + raise Redis::CommandError, + 'ERR The ID specified in XADD is equal or smaller than ' \ + 'the target stream top item' + # TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message: + # 'ERR The ID specified in XADD must be greater than 0-0' + end members.add [@last_id, Hash[values.map { |k, v| [k.to_s, v.to_s] }]] @last_id.to_s end @@ -51,7 +58,7 @@ def range(start, finish, reversed, *opts_in) def read(id) stream_id = MockRedis::Stream::Id.new(id) - members.select { |m| (stream_id <= m[0]) }.map { |m| [m[0].to_s, m[1]] } + members.select { |m| (stream_id < m[0]) }.map { |m| [m[0].to_s, m[1]] } end def each diff --git a/lib/mock_redis/stream/id.rb b/lib/mock_redis/stream/id.rb index 864717de..c70ea2a1 100644 --- a/lib/mock_redis/stream/id.rb +++ b/lib/mock_redis/stream/id.rb @@ -31,13 +31,6 @@ def initialize(id, min: nil, sequence: 0) @timestamp = id end @sequence = @sequence.nil? ? sequence : @sequence.to_i - if @timestamp == 0 && @sequence == 0 - raise Redis::CommandError, - 'ERR The ID specified in XADD is equal or smaller than ' \ - 'the target stream top item' - # TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message: - # 'ERR The ID specified in XADD must be greater than 0-0' - end if self <= min raise Redis::CommandError, 'ERR The ID specified in XADD is equal or smaller than ' \ diff --git a/lib/mock_redis/stream_methods.rb b/lib/mock_redis/stream_methods.rb index ab65dcf4..bb346a74 100644 --- a/lib/mock_redis/stream_methods.rb +++ b/lib/mock_redis/stream_methods.rb @@ -4,7 +4,6 @@ # TODO: Implement the following commands # -# * xread # * xgroup # * xreadgroup # * xack @@ -67,13 +66,15 @@ def xrevrange(key, last = '+', first = '-', count: nil) end end - def xread(keys, ids, count: nil, block: nil) + # TODO: Implement count and block parameters + def xread(keys, ids) result = {} keys = keys.is_a?(Array) ? keys : [keys] ids = ids.is_a?(Array) ? ids : [ids] keys.each_with_index do |key, index| with_stream_at(key) do |stream| - result[key] = stream.read(ids[index]) + data = stream.read(ids[index]) + result[key] = data unless data.empty? end end result diff --git a/spec/commands/xrange_spec.rb b/spec/commands/xrange_spec.rb index 4659a9d7..a949294e 100644 --- a/spec/commands/xrange_spec.rb +++ b/spec/commands/xrange_spec.rb @@ -54,6 +54,19 @@ ) end + it 'returns all entries with a lower limit of 0-0' do + expect(@redises.xrange(@key, '0-0', '+')).to eq( + [ + ['1234567891234-0', { 'key1' => 'value1' }], + ['1234567891245-0', { 'key2' => 'value2' }], + ['1234567891245-1', { 'key3' => 'value3' }], + ['1234567891278-0', { 'key4' => 'value4' }], + ['1234567891278-1', { 'key5' => 'value5' }], + ['1234567891299-0', { 'key6' => 'value6' }] + ] + ) + end + it 'returns entries with an upper limit' do expect(@redises.xrange(@key, '-', '1234567891285-0')).to eq( [ diff --git a/spec/commands/xread_spec.rb b/spec/commands/xread_spec.rb new file mode 100644 index 00000000..77376b81 --- /dev/null +++ b/spec/commands/xread_spec.rb @@ -0,0 +1,50 @@ +require 'spec_helper' + +describe '#xread(keys, ids)' do + before :all do + sleep 1 - (Time.now.to_f % 1) + @key = 'mock-redis-test:xread' + @key1 = 'mock-redis-test:xread1' + end + + it 'reads a single entry' do + @redises.xadd(@key, { key: 'value' }, id: '1234567891234-0') + expect(@redises.xread(@key, '0-0')) + .to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]] }) + end + + it 'reads multiple entries from the beginning of the stream' do + @redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1') + expect(@redises.xread(@key, '0-0')) + .to eq({ @key => [['1234567891234-0', { 'key0' => 'value0' }], + ['1234567891234-1', { 'key1' => 'value1' }]] }) + end + + it 'reads entries greater than the ID passed' do + @redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1') + expect(@redises.xread(@key, '1234567891234-0')) + .to eq({ @key => [['1234567891234-1', { 'key1' => 'value1' }]] }) + end + + it 'reads from multiple streams' do + @redises.xadd(@key, { key: 'value' }, id: '1234567891234-0') + @redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-0') + expect(@redises.xread([@key, @key1], %w[0-0 0-0])) + .to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]], + @key1 => [['1234567891234-0', { 'key1' => 'value1' }]] }) + end + + it 'reads from multiple streams at the given IDs' do + @redises.xadd(@key, { key: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key, { key: 'value1' }, id: '1234567891234-1') + @redises.xadd(@key, { key: 'value2' }, id: '1234567891234-2') + @redises.xadd(@key1, { key1: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-1') + @redises.xadd(@key1, { key1: 'value2' }, id: '1234567891234-2') + # The first stream won't return anything since we specify the last ID + expect(@redises.xread([@key, @key1], %w[1234567891234-2 1234567891234-1])) + .to eq({ @key1 => [['1234567891234-2', { 'key1' => 'value2' }]] }) + end +end From 4b98cb2f84108677533eb82acdec2b68a6bd0023 Mon Sep 17 00:00:00 2001 From: "Thomas, Jason" Date: Fri, 19 Jun 2020 09:26:07 -0600 Subject: [PATCH 3/3] Update specs for trim bug --- spec/commands/xadd_spec.rb | 11 +++++++++++ spec/commands/xtrim_spec.rb | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/spec/commands/xadd_spec.rb b/spec/commands/xadd_spec.rb index 901d1a62..14f2336e 100644 --- a/spec/commands/xadd_spec.rb +++ b/spec/commands/xadd_spec.rb @@ -101,4 +101,15 @@ ] ) end + + it 'supports a maxlen greater than the current size' do + @redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-0') + @redises.xadd(@key, { key2: 'value2' }, id: '1234567891245-0', maxlen: 1000) + expect(@redises.xrange(@key, '-', '+')).to eq( + [ + ['1234567891234-0', { 'key1' => 'value1' }], + ['1234567891245-0', { 'key2' => 'value2' }], + ] + ) + end end diff --git a/spec/commands/xtrim_spec.rb b/spec/commands/xtrim_spec.rb index 6d291493..b1a9a08b 100644 --- a/spec/commands/xtrim_spec.rb +++ b/spec/commands/xtrim_spec.rb @@ -16,6 +16,12 @@ expect(@redises.xtrim(@key, 4)).to eq 2 end + it 'returns 0 if count is greater than size' do + initial = @redises.xrange(@key, '-', '+') + expect(@redises.xtrim(@key, 1000)).to eq 0 + expect(@redises.xrange(@key, '-', '+')).to eql(initial) + end + it 'deletes the oldes elements' do @redises.xtrim(@key, 4) expect(@redises.xrange(@key, '-', '+')).to eq(