Skip to content

Commit

Permalink
test: migrate xstream ids (#986)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun authored Oct 13, 2022
1 parent 287ed1a commit 84b8c28
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 85 deletions.
97 changes: 97 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,103 @@ import (
"github.com/stretchr/testify/require"
)

func TestStream(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()
ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("XADD wrong number of args", func(t *testing.T) {
require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream").Err(), "wrong number of arguments")
require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream", "*").Err(), "wrong number of arguments")
require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream", "*", "field").Err(), "wrong number of arguments")
})

t.Run("XADD can add entries into a stream that XRANGE can fetch", func(t *testing.T) {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Err())

require.EqualValues(t, 2, rdb.XLen(ctx, "mystream").Val())

items := rdb.XRange(ctx, "mystream", "-", "+").Val()
require.Len(t, items, 2)
require.EqualValues(t, map[string]interface{}{"item": "1", "value": "a"}, items[0].Values)
require.EqualValues(t, map[string]interface{}{"item": "2", "value": "b"}, items[1].Values)
})

t.Run("XADD stores entry value with respect to case sensitivity", func(t *testing.T) {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"ItEm", "2", "VaLUe", "B"}}).Err())

require.EqualValues(t, 2, rdb.XLen(ctx, "myStream").Val())

items := rdb.XRange(ctx, "myStream", "-", "+").Val()
require.Len(t, items, 2)
require.EqualValues(t, map[string]interface{}{"iTeM": "1", "vAluE": "a"}, items[0].Values)
require.EqualValues(t, map[string]interface{}{"ItEm": "2", "VaLUe": "B"}, items[1].Values)
})

t.Run("XADD IDs are incremental", func(t *testing.T) {
x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Val()
x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Val()
x3 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "3", "value", "c"}}).Val()
require.Less(t, x1, x2)
require.Less(t, x2, x3)
})

t.Run("XADD IDs are incremental when ms is the same as well", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "MULTI").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "3", "value", "c"}}).Err())
v := rdb.Do(ctx, "EXEC").Val().([]interface{})
require.Len(t, v, 3)
require.Less(t, v[0], v[1])
require.Less(t, v[1], v[2])
})

t.Run("XADD IDs correctly report an error when overflowing", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "18446744073709551615-18446744073709551615", Values: []string{"a", "b"}}).Err())
require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "*", Values: []string{"c", "d"}}).Err(), "ERR")
})

t.Run("XADD auto-generated sequence is incremented for last ID", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Val()
x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-*", Values: []string{"item", "2", "value", "b"}}).Val()
require.Equal(t, "123-457", x2)
require.Less(t, x1, x2)
})

t.Run("XADD auto-generated sequence is zero for future timestamp ID", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Val()
x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "789-*", Values: []string{"item", "2", "value", "b"}}).Val()
require.Equal(t, "789-0", x2)
require.Less(t, x1, x2)
})

t.Run("XADD auto-generated sequence can't be smaller than last ID", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Err())
require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "42-*", Values: []string{"item", "2", "value", "b"}}).Err(), "ERR")
})

t.Run("XADD auto-generated sequence can't overflow", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-18446744073709551615", Values: []string{"a", "b"}}).Err())
require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-*", Values: []string{"c", "d"}}).Err(), "ERR")
})

t.Run("XADD 0-* should succeed", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
x := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "0-*", Values: []string{"a", "b"}}).Val()
require.Equal(t, "0-1", x)
})
}

func TestStreamOffset(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()
Expand Down
85 changes: 0 additions & 85 deletions tests/tcl/tests/unit/type/stream.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -75,91 +75,6 @@ set content {} ;# Will be populated with Tcl side copy of the stream content.
start_server {
tags {"stream"}
} {
test "XADD wrong number of args" {
assert_error {*wrong number of arguments*} {r XADD mystream}
assert_error {*wrong number of arguments*} {r XADD mystream *}
assert_error {*wrong number of arguments*} {r XADD mystream * field}
}

test {XADD can add entries into a stream that XRANGE can fetch} {
r XADD mystream * item 1 value a
r XADD mystream * item 2 value b
assert_equal 2 [r XLEN mystream]
set items [r XRANGE mystream - +]
assert_equal [lindex $items 0 1] {item 1 value a}
assert_equal [lindex $items 1 1] {item 2 value b}
}

test {XADD stores entry value with respect to case sensitivity } {
r XADD myStream * iTeM 1 vAluE a
r XADD myStream * ItEm 2 VaLUe B
assert_equal 2 [r XLEN myStream]
set items [r XRANGE myStream - +]
assert_equal [lindex $items 0 1] {iTeM 1 vAluE a}
assert_equal [lindex $items 1 1] {ItEm 2 VaLUe B}
}

test {XADD IDs are incremental} {
set id1 [r XADD mystream * item 1 value a]
set id2 [r XADD mystream * item 2 value b]
set id3 [r XADD mystream * item 3 value c]
assert {[streamCompareID $id1 $id2] == -1}
assert {[streamCompareID $id2 $id3] == -1}
}

test {XADD IDs are incremental when ms is the same as well} {
r multi
r XADD mystream * item 1 value a
r XADD mystream * item 2 value b
r XADD mystream * item 3 value c
lassign [r exec] id1 id2 id3
assert {[streamCompareID $id1 $id2] == -1}
assert {[streamCompareID $id2 $id3] == -1}
}

test {XADD IDs correctly report an error when overflowing} {
r DEL mystream
r xadd mystream 18446744073709551615-18446744073709551615 a b
assert_error ERR* {r xadd mystream * c d}
}

test {XADD auto-generated sequence is incremented for last ID} {
r DEL mystream
set id1 [r XADD mystream 123-456 item 1 value a]
set id2 [r XADD mystream 123-* item 2 value b]
lassign [split $id2 -] _ seq
assert {$seq == 457}
assert {[streamCompareID $id1 $id2] == -1}
}

test {XADD auto-generated sequence is zero for future timestamp ID} {
r DEL mystream
set id1 [r XADD mystream 123-456 item 1 value a]
set id2 [r XADD mystream 789-* item 2 value b]
lassign [split $id2 -] _ seq
assert {$seq == 0}
assert {[streamCompareID $id1 $id2] == -1}
}

test {XADD auto-generated sequence can't be smaller than last ID} {
r DEL mystream
r XADD mystream 123-456 item 1 value a
assert_error ERR* {r XADD mystream 42-* item 2 value b}
}

test {XADD auto-generated sequence can't overflow} {
r DEL mystream
r xadd mystream 1-18446744073709551615 a b
assert_error ERR* {r xadd mystream 1-* c d}
}

test {XADD 0-* should succeed} {
r DEL mystream
set id [r xadd mystream 0-* a b]
lassign [split $id -] _ seq
assert {$seq == 1}
}

test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {
Expand Down

0 comments on commit 84b8c28

Please sign in to comment.