diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 996cd96805e..9061bbc678f 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -29,6 +29,103 @@ import ( "github.com/stretchr/testify/require" ) +func TestStream(t *testing.T) { + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + t.Run("XADD wrong number of args", func(t *testing.T) { + require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream").Err(), "wrong number of arguments") + require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream", "*").Err(), "wrong number of arguments") + require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream", "*", "field").Err(), "wrong number of arguments") + }) + + t.Run("XADD can add entries into a stream that XRANGE can fetch", func(t *testing.T) { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Err()) + + require.EqualValues(t, 2, rdb.XLen(ctx, "mystream").Val()) + + items := rdb.XRange(ctx, "mystream", "-", "+").Val() + require.Len(t, items, 2) + require.EqualValues(t, map[string]interface{}{"item": "1", "value": "a"}, items[0].Values) + require.EqualValues(t, map[string]interface{}{"item": "2", "value": "b"}, items[1].Values) + }) + + t.Run("XADD stores entry value with respect to case sensitivity", func(t *testing.T) { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"ItEm", "2", "VaLUe", "B"}}).Err()) + + require.EqualValues(t, 2, rdb.XLen(ctx, "myStream").Val()) + + items := rdb.XRange(ctx, "myStream", "-", "+").Val() + require.Len(t, items, 2) + require.EqualValues(t, map[string]interface{}{"iTeM": "1", "vAluE": "a"}, items[0].Values) + require.EqualValues(t, map[string]interface{}{"ItEm": "2", "VaLUe": "B"}, items[1].Values) + }) + + t.Run("XADD IDs are incremental", func(t *testing.T) { + x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Val() + x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Val() + x3 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "3", "value", "c"}}).Val() + require.Less(t, x1, x2) + require.Less(t, x2, x3) + }) + + t.Run("XADD IDs are incremental when ms is the same as well", func(t *testing.T) { + require.NoError(t, rdb.Do(ctx, "MULTI").Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "3", "value", "c"}}).Err()) + v := rdb.Do(ctx, "EXEC").Val().([]interface{}) + require.Len(t, v, 3) + require.Less(t, v[0], v[1]) + require.Less(t, v[1], v[2]) + }) + + t.Run("XADD IDs correctly report an error when overflowing", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "mystream").Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "18446744073709551615-18446744073709551615", Values: []string{"a", "b"}}).Err()) + require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "*", Values: []string{"c", "d"}}).Err(), "ERR") + }) + + t.Run("XADD auto-generated sequence is incremented for last ID", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "mystream").Err()) + x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Val() + x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-*", Values: []string{"item", "2", "value", "b"}}).Val() + require.Equal(t, "123-457", x2) + require.Less(t, x1, x2) + }) + + t.Run("XADD auto-generated sequence is zero for future timestamp ID", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "mystream").Err()) + x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Val() + x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "789-*", Values: []string{"item", "2", "value", "b"}}).Val() + require.Equal(t, "789-0", x2) + require.Less(t, x1, x2) + }) + + t.Run("XADD auto-generated sequence can't be smaller than last ID", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "mystream").Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Err()) + require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "42-*", Values: []string{"item", "2", "value", "b"}}).Err(), "ERR") + }) + + t.Run("XADD auto-generated sequence can't overflow", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "mystream").Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-18446744073709551615", Values: []string{"a", "b"}}).Err()) + require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-*", Values: []string{"c", "d"}}).Err(), "ERR") + }) + + t.Run("XADD 0-* should succeed", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "mystream").Err()) + x := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "0-*", Values: []string{"a", "b"}}).Val() + require.Equal(t, "0-1", x) + }) +} + func TestStreamOffset(t *testing.T) { srv := util.StartServer(t, map[string]string{}) defer srv.Close() diff --git a/tests/tcl/tests/unit/type/stream.tcl b/tests/tcl/tests/unit/type/stream.tcl index a6c121cdf4b..629f65fd4c8 100644 --- a/tests/tcl/tests/unit/type/stream.tcl +++ b/tests/tcl/tests/unit/type/stream.tcl @@ -75,91 +75,6 @@ set content {} ;# Will be populated with Tcl side copy of the stream content. start_server { tags {"stream"} } { - test "XADD wrong number of args" { - assert_error {*wrong number of arguments*} {r XADD mystream} - assert_error {*wrong number of arguments*} {r XADD mystream *} - assert_error {*wrong number of arguments*} {r XADD mystream * field} - } - - test {XADD can add entries into a stream that XRANGE can fetch} { - r XADD mystream * item 1 value a - r XADD mystream * item 2 value b - assert_equal 2 [r XLEN mystream] - set items [r XRANGE mystream - +] - assert_equal [lindex $items 0 1] {item 1 value a} - assert_equal [lindex $items 1 1] {item 2 value b} - } - - test {XADD stores entry value with respect to case sensitivity } { - r XADD myStream * iTeM 1 vAluE a - r XADD myStream * ItEm 2 VaLUe B - assert_equal 2 [r XLEN myStream] - set items [r XRANGE myStream - +] - assert_equal [lindex $items 0 1] {iTeM 1 vAluE a} - assert_equal [lindex $items 1 1] {ItEm 2 VaLUe B} - } - - test {XADD IDs are incremental} { - set id1 [r XADD mystream * item 1 value a] - set id2 [r XADD mystream * item 2 value b] - set id3 [r XADD mystream * item 3 value c] - assert {[streamCompareID $id1 $id2] == -1} - assert {[streamCompareID $id2 $id3] == -1} - } - - test {XADD IDs are incremental when ms is the same as well} { - r multi - r XADD mystream * item 1 value a - r XADD mystream * item 2 value b - r XADD mystream * item 3 value c - lassign [r exec] id1 id2 id3 - assert {[streamCompareID $id1 $id2] == -1} - assert {[streamCompareID $id2 $id3] == -1} - } - - test {XADD IDs correctly report an error when overflowing} { - r DEL mystream - r xadd mystream 18446744073709551615-18446744073709551615 a b - assert_error ERR* {r xadd mystream * c d} - } - - test {XADD auto-generated sequence is incremented for last ID} { - r DEL mystream - set id1 [r XADD mystream 123-456 item 1 value a] - set id2 [r XADD mystream 123-* item 2 value b] - lassign [split $id2 -] _ seq - assert {$seq == 457} - assert {[streamCompareID $id1 $id2] == -1} - } - - test {XADD auto-generated sequence is zero for future timestamp ID} { - r DEL mystream - set id1 [r XADD mystream 123-456 item 1 value a] - set id2 [r XADD mystream 789-* item 2 value b] - lassign [split $id2 -] _ seq - assert {$seq == 0} - assert {[streamCompareID $id1 $id2] == -1} - } - - test {XADD auto-generated sequence can't be smaller than last ID} { - r DEL mystream - r XADD mystream 123-456 item 1 value a - assert_error ERR* {r XADD mystream 42-* item 2 value b} - } - - test {XADD auto-generated sequence can't overflow} { - r DEL mystream - r xadd mystream 1-18446744073709551615 a b - assert_error ERR* {r xadd mystream 1-* c d} - } - - test {XADD 0-* should succeed} { - r DEL mystream - set id [r xadd mystream 0-* a b] - lassign [split $id -] _ seq - assert {$seq == 1} - } - test {XADD with MAXLEN option} { r DEL mystream for {set j 0} {$j < 1000} {incr j} {