diff --git a/cmd_stream.go b/cmd_stream.go index b6d0034..e8d9b4d 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -569,9 +569,11 @@ func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) { return } - c.WriteMapLen(1) + c.WriteMapLen(2) c.WriteBulk("length") c.WriteInt(len(s.entries)) + c.WriteBulk("entries-added") + c.WriteInt(s.entriesAdded) }) } @@ -610,9 +612,14 @@ func (m *Miniredis) cmdXinfoGroups(c *server.Peer, args []string) { c.WriteBulk("last-delivered-id") c.WriteBulk(g.lastID) c.WriteBulk("entries-read") - c.WriteNull() + c.WriteInt(g.entriesRead) c.WriteBulk("lag") - c.WriteInt(len(g.stream.entries)) + lag := g.lag() + if lag == -1 { + c.WriteNull() + } else { + c.WriteInt(lag) + } } }) } diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 719a28e..4abb266 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -35,7 +35,7 @@ func TestStream(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "s", - proto.Array(proto.String("length"), proto.Int(1)), + proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) now := time.Date(2001, 1, 1, 4, 4, 5, 4000000, time.UTC) @@ -73,7 +73,7 @@ func TestStream(t *testing.T) { t.Run("resp3", func(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "s", - proto.Map(proto.String("length"), proto.Int(1)), + proto.Map(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) }) } @@ -546,7 +546,7 @@ func TestStreamInfo(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "planets", - proto.Array(proto.String("length"), proto.Int(1)), + proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) mustDo(t, c, @@ -605,7 +605,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -627,7 +627,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -660,7 +660,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -727,7 +727,7 @@ func TestStreamReadGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -747,6 +747,20 @@ func TestStreamReadGroup(t *testing.T) { "XLEN", "planets", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(0), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-0"), + proto.String("entries-read"), proto.Int(0), + proto.String("lag"), proto.Int(1), + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", proto.Array( @@ -762,8 +776,8 @@ func TestStreamReadGroup(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-1"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), ), ), ) @@ -806,6 +820,11 @@ func TestStreamDelete(t *testing.T) { proto.String("0-1"), ) + mustDo(t, c, + "XADD", "planets", "0-2", "name", "Venus", + proto.String("0-2"), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", proto.Array( @@ -816,24 +835,107 @@ func TestStreamDelete(t *testing.T) { proto.String("0-1"), proto.Strings("name", "Mercury"), ), + proto.Array( + proto.String("0-2"), + proto.Strings("name", "Venus"), + ), ), ), ), ) mustDo(t, c, - "XADD", "planets", "0-2", "name", "Mercury", - proto.String("0-2"), + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(0), + ), + ), + ) + + mustDo(t, c, + "XADD", "planets", "0-3", "name", "Earth", + proto.String("0-3"), + ) + + mustDo(t, c, + "XADD", "planets", "0-4", "name", "Jupiter", + proto.String("0-4"), + ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(2), + ), + ), ) must1(t, c, "XDEL", "planets", "0-1", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(2), + ), + ), + ) + must1(t, c, "XDEL", "planets", "0-2", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(2), + ), + ), + ) + + must1(t, c, + "XDEL", "planets", "0-3", + ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Nil, + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0", proto.Array( @@ -843,6 +945,36 @@ func TestStreamDelete(t *testing.T) { ), ), ) + + mustDo(t, c, + "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", + proto.Array( + proto.Array( + proto.String("planets"), + proto.Array( + proto.Array( + proto.String("0-4"), + proto.Strings("name", "Jupiter"), + ), + ), + ), + ), + ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-4"), + proto.String("entries-read"), proto.Int(4), + proto.String("lag"), proto.Int(0), + ), + ), + ) + } // Test XACK @@ -865,9 +997,38 @@ func TestStreamAck(t *testing.T) { ), ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-1"), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), + ), + ), + ) + must1(t, c, "XACK", "planets", "processing", "0-1", ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-1"), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0", proto.Array( @@ -885,8 +1046,8 @@ func TestStreamAck(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-1"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), ), ), ) @@ -1353,8 +1514,8 @@ func TestStreamAutoClaim(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-2"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(2), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(0), ), ), ) @@ -1428,7 +1589,7 @@ func TestStreamClaim(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(2), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(2), ), ), @@ -1479,8 +1640,8 @@ func TestStreamClaim(t *testing.T) { proto.String("consumers"), proto.Int(2), proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(0), + proto.String("lag"), proto.Nil, ), ), ) diff --git a/integration/stream_test.go b/integration/stream_test.go index 6e817a3..9047a75 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -531,6 +531,30 @@ func TestStreamGroup(t *testing.T) { }) }) + t.Run("XREADGROUP XDEL XINFO lag", func(t *testing.T) { + testRaw(t, func(c *client) { + // TODO ALL INFO needs to be uncommented, even DoLoosely is not enough + // for some of them becase Nil is returned instead of Int + c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM") + c.Do("XADD", "planets", "0-1", "name", "Mercury") + c.Do("XADD", "planets", "0-2", "name", "Venus") + c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">") + //c.Do("XINFO", "GROUPS", "planets") //SPEC entries-added 2, entries-read = 2, lag = 0 + c.Do("XADD", "planets", "0-3", "name", "Earth") + c.Do("XADD", "planets", "0-4", "name", "Jupiter") + //c.Do("XINFO", "GROUPS", "planets") //SPEC entries-added 4, entries-read = 2, lag = 2 + + c.Do("XDEL", "planets", "0-1") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = 2 + c.Do("XDEL", "planets", "0-2") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = 2 + c.Do("XDEL", "planets", "0-3") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = nil + c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 4, lag = 0 + }) + }) + t.Run("XACK", func(t *testing.T) { testRaw(t, func(c *client) { c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM") diff --git a/integration/test.go b/integration/test.go index f91163a..1fd0836 100644 --- a/integration/test.go +++ b/integration/test.go @@ -346,7 +346,7 @@ func (c *client) Do(cmd string, args ...string) { // c.t.Logf("real:%q mini:%q", string(resReal), string(resMini)) if resReal != resMini { - c.t.Errorf("real: %q mini: %q", string(resReal), string(resMini)) + c.t.Errorf("\nreal: %q \nmini: %q", string(resReal), string(resMini)) return } diff --git a/stream.go b/stream.go index 55faa5e..d1a442d 100644 --- a/stream.go +++ b/stream.go @@ -18,6 +18,7 @@ type streamKey struct { entries []StreamEntry groups map[string]*streamGroup lastAllocatedID string + entriesAdded int mu sync.Mutex } @@ -30,10 +31,12 @@ type StreamEntry struct { } type streamGroup struct { - stream *streamKey - lastID string - pending []pendingEntry - consumers map[string]*consumer + stream *streamKey + lastID string + pending []pendingEntry + consumers map[string]*consumer + entriesRead int + entriesReadValid bool } type consumer struct { @@ -93,6 +96,14 @@ func (s *streamKey) lastIDUnlocked() string { return s.entries[len(s.entries)-1].ID } +func (s *streamKey) firstIDUnlocked() string { + if len(s.entries) == 0 { + return "0-0" + } + + return s.entries[0].ID +} + func (s *streamKey) copy() *streamKey { s.mu.Lock() defer s.mu.Unlock() @@ -218,13 +229,22 @@ func (s *streamKey) createGroup(group, id string) error { return errors.New("BUSYGROUP Consumer Group name already exists") } + var entriesRead = 0 + var entriesReadValid = true if id == "$" { id = s.lastIDUnlocked() + entriesRead = s.entriesAdded + } else if id == "0" || id == "0-0" || id == s.firstIDUnlocked() { + entriesRead = 0 + } else { + entriesReadValid = false } s.groups[group] = &streamGroup{ - stream: s, - lastID: id, - consumers: map[string]*consumer{}, + stream: s, + lastID: id, + consumers: map[string]*consumer{}, + entriesRead: entriesRead, + entriesReadValid: entriesReadValid, } return nil } @@ -255,6 +275,7 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string, ID: entryID, Values: values, }) + s.entriesAdded++ return entryID, nil } @@ -263,10 +284,26 @@ func (s *streamKey) trim(n int) { defer s.mu.Unlock() if len(s.entries) > n { + for _, group := range s.groups { + for _, entry := range s.entries[:n] { + group.onDelete(entry.ID) + } + } s.entries = s.entries[len(s.entries)-n:] } } +func (g *streamGroup) onDelete(id string) { + if !g.entriesReadValid { + return + } + compare := streamCmp(g.lastID, id) + if compare < 0 { + // an item between last-delivered-id and last-written-id is deleted. Entries read is not valid anymore + g.entriesReadValid = false + } +} + // trimBefore deletes entries with an id less than the provided id // and returns the number of entries deleted func (s *streamKey) trimBefore(id string) int { @@ -295,6 +332,12 @@ func (s *streamKey) after(id string) []StreamEntry { return s.entries[pos:] } +func (s *streamKey) len() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.entries) +} + // get a stream entry by ID // Also returns the position in the entries slice, if found. func (s *streamKey) get(id string) (int, *StreamEntry) { @@ -363,6 +406,8 @@ func (g *streamGroup) readGroup( } g.consumers[consumerID].numPendingEntries += len(msgs) g.lastID = msgs[len(msgs)-1].ID + + g.updateEntriesRead(msgs) return msgs } @@ -434,6 +479,9 @@ func (s *streamKey) delete(ids []string) (int, error) { continue } + for _, group := range s.groups { + group.onDelete(entry.ID) + } s.entries = append(s.entries[:i], s.entries[i+1:]...) count++ } @@ -498,3 +546,26 @@ func (g *streamGroup) setLastSuccess(c string, t time.Time) { g.setLastSeen(c, t) g.consumers[c].lastSuccess = t } + +func (g *streamGroup) lag() int { + if !g.entriesReadValid { + return -1 + } + g.stream.mu.Lock() + defer g.stream.mu.Unlock() + return g.stream.entriesAdded - g.entriesRead +} + +func (g *streamGroup) updateEntriesRead(msgs []StreamEntry) { + // mutex protects lastId and len(g.stream.entries) together. + // We should not lock the mutex twice for each + g.stream.mu.Lock() + defer g.stream.mu.Unlock() + if g.entriesReadValid { + g.entriesRead += len(msgs) + } else if g.lastID == g.stream.lastIDUnlocked() { + // reset entries read as we catch up to the last ID + g.entriesRead = g.stream.entriesAdded + g.entriesReadValid = true + } +}