Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: migrate xadd options #987

Merged
merged 1 commit into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package stream
import (
"context"
"fmt"
"math/rand"
"strconv"
"testing"

"github.com/apache/incubator-kvrocks/tests/gocase/util"
Expand Down Expand Up @@ -124,6 +126,101 @@ func TestStream(t *testing.T) {
x := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "0-*", Values: []string{"a", "b"}}).Val()
require.Equal(t, "0-1", x)
})

t.Run("XADD with MAXLEN option", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
for i := 0; i < 1000; i++ {
if rand.Float64() < 0.9 {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", MaxLen: 5, Values: []string{"xitem", strconv.Itoa(i)}}).Err())
} else {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", MaxLen: 5, Values: []string{"yitem", strconv.Itoa(i)}}).Err())
}
}
require.EqualValues(t, 5, rdb.XLen(ctx, "mystream").Val())
items := rdb.XRange(ctx, "mystream", "-", "+").Val()
expected := 995
for _, item := range items {
require.Subset(t, map[string]interface{}{"xitem": strconv.Itoa(expected), "yitem": strconv.Itoa(expected)}, item.Values)
expected++
}
})

t.Run("XADD with MAXLEN option and the '=' argument", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
for i := 0; i < 1000; i++ {
if rand.Float64() < 0.9 {
require.NoError(t, rdb.Do(ctx, "XADD", "mystream", "MAXLEN", "=", "5", "*", "xitem", "i").Err())
} else {
require.NoError(t, rdb.Do(ctx, "XADD", "mystream", "MAXLEN", "=", "5", "*", "yitem", "i").Err())
}
}
require.EqualValues(t, 5, rdb.XLen(ctx, "mystream").Val())
})

t.Run("XADD with NOMKSTREAM option", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
require.Empty(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", NoMkStream: true, Values: []string{"item", "1", "value", "a"}}).Val())
require.Zero(t, rdb.Exists(ctx, "mystream").Val())
require.NotEmpty(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Val())
require.NotEmpty(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", NoMkStream: true, Values: []string{"item", "2", "value", "b"}}).Val())
require.EqualValues(t, 2, rdb.XLen(ctx, "mystream").Val())
items := rdb.XRange(ctx, "mystream", "-", "+").Val()
require.Len(t, items, 2)
require.EqualValues(t, map[string]interface{}{"item": "1", "value": "a"}, items[0].Values)
require.EqualValues(t, map[string]interface{}{"item": "2", "value": "b"}, items[1].Values)
})

t.Run("XADD with MINID option", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
buildXAddArgs := func(id int, tag string) *redis.XAddArgs {
c := id - 5
if c < 0 {
c = 1000
}
return &redis.XAddArgs{Stream: "mystream", MinID: strconv.Itoa(c), ID: strconv.Itoa(id), Values: []string{"xitem", strconv.Itoa(id)}}
}
for i := 0; i < 1000; i++ {
if rand.Float64() < 0.9 {
require.NoError(t, rdb.XAdd(ctx, buildXAddArgs(i+1, "xitem")).Err())
} else {
require.NoError(t, rdb.XAdd(ctx, buildXAddArgs(i+1, "yitem")).Err())
}
}
require.EqualValues(t, 6, rdb.XLen(ctx, "mystream").Val())
items := rdb.XRange(ctx, "mystream", "-", "+").Val()
expected := 995
for _, item := range items {
require.Subset(t, map[string]interface{}{"xitem": strconv.Itoa(expected), "yitem": strconv.Itoa(expected)}, item.Values)
expected++
}
})

t.Run("XTRIM with MINID option", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-0", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "2-0", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "3-0", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "4-0", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "5-0", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XTrimMinID(ctx, "mystream", "3-0").Err())
items := rdb.XRange(ctx, "mystream", "-", "+").Val()
require.Len(t, items, 3)
require.EqualValues(t, "3-0", items[0].ID)
require.EqualValues(t, "4-0", items[1].ID)
require.EqualValues(t, "5-0", items[2].ID)
})

t.Run("XTRIM with MINID option, big delta from master record", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-0", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1641544570597-0", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1641544570597-1", Values: []string{"f", "v"}}).Err())
require.NoError(t, rdb.XTrimMinID(ctx, "mystream", "1641544570597-0").Err())
items := rdb.XRange(ctx, "mystream", "-", "+").Val()
require.Len(t, items, 2)
require.EqualValues(t, "1641544570597-0", items[0].ID)
require.EqualValues(t, "1641544570597-1", items[1].ID)
})
}

func TestStreamOffset(t *testing.T) {
Expand Down
24 changes: 9 additions & 15 deletions tests/gocase/util/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,31 @@ import (
"strings"
)

func RandPath[T any](funcs ...func() T) T {
index := rand.Int31n(int32(len(funcs)))
return funcs[index]()
func RandPath[T any](f ...func() T) T {
index := rand.Int31n(int32(len(f)))
return f[index]()
}

func RandPathNoResult(funcs ...func()) {
index := rand.Int31n(int32(len(funcs)))
funcs[index]()
func RandPathNoResult(f ...func()) {
index := rand.Int31n(int32(len(f)))
f[index]()
}

// Random signed integer in (-max, max)
// RandomSignedInt returns an integer in (-max, max)
func RandomSignedInt(max int32) int64 {
return rand.Int63n(int64(max)*2-1) - int64(max) + 1
}

// Random integer in [0, max)
// RandomInt return an integer in [0, max)
func RandomInt(max int64) int64 {
return rand.Int63() % int64(max)
return rand.Int63() % max
}

type RandStringType int

const (
Alpha RandStringType = iota
Binary
Compr
)

func RandString(min, max int, typ RandStringType) string {
Expand All @@ -62,8 +61,6 @@ func RandString(min, max int, typ RandStringType) string {
minVal, maxVal = 0, 255
case Alpha:
minVal, maxVal = 48, 122
case Compr:
minVal, maxVal = 48, 52
}

var sb strings.Builder
Expand Down Expand Up @@ -101,9 +98,6 @@ func RandomValue() string {
func() string {
return RandString(0, 256, Alpha)
},
func() string {
return RandString(0, 256, Compr)
},
func() string {
return RandString(0, 256, Binary)
},
Expand Down
84 changes: 0 additions & 84 deletions tests/tcl/tests/unit/type/stream.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -75,90 +75,6 @@ set content {} ;# Will be populated with Tcl side copy of the stream content.
start_server {
tags {"stream"}
} {
test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {
if {rand() < 0.9} {
r XADD mystream MAXLEN 5 * xitem $j
} else {
r XADD mystream MAXLEN 5 * yitem $j
}
}
assert {[r xlen mystream] == 5}
set res [r xrange mystream - +]
set expected 995
foreach r $res {
assert {[lindex $r 1 1] == $expected}
incr expected
}
}

test {XADD with MAXLEN option and the '=' argument} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {
if {rand() < 0.9} {
r XADD mystream MAXLEN = 5 * xitem $j
} else {
r XADD mystream MAXLEN = 5 * yitem $j
}
}
assert {[r XLEN mystream] == 5}
}

test {XADD with NOMKSTREAM option} {
r DEL mystream
assert_equal "" [r XADD mystream NOMKSTREAM * item 1 value a]
assert_equal 0 [r EXISTS mystream]
r XADD mystream * item 1 value a
r XADD mystream NOMKSTREAM * item 2 value b
assert_equal 2 [r XLEN mystream]
set items [r XRANGE mystream - +]
assert_equal [lindex $items 0 1] {item 1 value a}
assert_equal [lindex $items 1 1] {item 2 value b}
}

test {XADD with MINID option} {
r DEL mystream
for {set j 1} {$j < 1001} {incr j} {
set minid 1000
if {$j >= 5} {
set minid [expr {$j-5}]
}
if {rand() < 0.9} {
r XADD mystream MINID $minid $j xitem $j
} else {
r XADD mystream MINID $minid $j yitem $j
}
}
assert {[r xlen mystream] == 6}
set res [r xrange mystream - +]
set expected 995
foreach r $res {
assert {[lindex $r 1 1] == $expected}
incr expected
}
}

test {XTRIM with MINID option} {
r DEL mystream
r XADD mystream 1-0 f v
r XADD mystream 2-0 f v
r XADD mystream 3-0 f v
r XADD mystream 4-0 f v
r XADD mystream 5-0 f v
r XTRIM mystream MINID = 3-0
assert_equal [r XRANGE mystream - +] {{3-0 {f v}} {4-0 {f v}} {5-0 {f v}}}
}

test {XTRIM with MINID option, big delta from master record} {
r DEL mystream
r XADD mystream 1-0 f v
r XADD mystream 1641544570597-0 f v
r XADD mystream 1641544570597-1 f v
r XTRIM mystream MINID 1641544570597-0
assert_equal [r XRANGE mystream - +] {{1641544570597-0 {f v}} {1641544570597-1 {f v}}}
}

proc insert_into_stream_key {key {count 10000}} {
r multi
for {set j 0} {$j < $count} {incr j} {
Expand Down