diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c6434a82bd..c3f1ecd50c 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24935,43 +24935,47 @@ func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) { } func TestJetStreamMessageTTL(t *testing.T) { - s := RunBasicJetStreamServer(t) - defer s.Shutdown() + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - jsStreamCreate(t, nc, &StreamConfig{ - Name: "TEST", - Storage: FileStorage, - Subjects: []string{"test"}, - AllowMsgTTL: true, - }) + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: storage, + Subjects: []string{"test"}, + AllowMsgTTL: true, + }) - msg := &nats.Msg{ - Subject: "test", - Header: nats.Header{}, - } + msg := &nats.Msg{ + Subject: "test", + Header: nats.Header{}, + } - for i := 1; i <= 10; i++ { - msg.Header.Set(JSMessageTTL, "1s") - _, err := js.PublishMsg(msg) - require_NoError(t, err) - } + for i := 1; i <= 10; i++ { + msg.Header.Set(JSMessageTTL, "1s") + _, err := js.PublishMsg(msg) + require_NoError(t, err) + } - si, err := js.StreamInfo("TEST") - require_NoError(t, err) - require_Equal(t, si.State.Msgs, 10) - require_Equal(t, si.State.FirstSeq, 1) - require_Equal(t, si.State.LastSeq, 10) + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 10) + require_Equal(t, si.State.FirstSeq, 1) + require_Equal(t, si.State.LastSeq, 10) - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 2) - si, err = js.StreamInfo("TEST") - require_NoError(t, err) - require_Equal(t, si.State.Msgs, 0) - require_Equal(t, si.State.FirstSeq, 11) - require_Equal(t, si.State.LastSeq, 10) + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 0) + require_Equal(t, si.State.FirstSeq, 11) + require_Equal(t, si.State.LastSeq, 10) + }) + } } func TestJetStreamMessageTTLRestart(t *testing.T) { @@ -25088,31 +25092,35 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) { } func TestJetStreamMessageTTLInvalid(t *testing.T) { - s := RunBasicJetStreamServer(t) - defer s.Shutdown() + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - jsStreamCreate(t, nc, &StreamConfig{ - Name: "TEST", - Storage: FileStorage, - Subjects: []string{"test"}, - AllowMsgTTL: true, - }) + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: storage, + Subjects: []string{"test"}, + AllowMsgTTL: true, + }) - msg := &nats.Msg{ - Subject: "test", - Header: nats.Header{}, - } + msg := &nats.Msg{ + Subject: "test", + Header: nats.Header{}, + } - msg.Header.Set(JSMessageTTL, "500ms") - _, err := js.PublishMsg(msg) - require_Error(t, err) + msg.Header.Set(JSMessageTTL, "500ms") + _, err := js.PublishMsg(msg) + require_Error(t, err) - msg.Header.Set(JSMessageTTL, "something") - _, err = js.PublishMsg(msg) - require_Error(t, err) + msg.Header.Set(JSMessageTTL, "something") + _, err = js.PublishMsg(msg) + require_Error(t, err) + }) + } } func TestJetStreamMessageTTLNotUpdatable(t *testing.T) { @@ -25139,74 +25147,82 @@ func TestJetStreamMessageTTLNotUpdatable(t *testing.T) { } func TestJetStreamMessageTTLNeverExpire(t *testing.T) { - s := RunBasicJetStreamServer(t) - defer s.Shutdown() + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - jsStreamCreate(t, nc, &StreamConfig{ - Name: "TEST", - Storage: FileStorage, - Subjects: []string{"test"}, - AllowMsgTTL: true, - MaxAge: time.Second, - }) + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: storage, + Subjects: []string{"test"}, + AllowMsgTTL: true, + MaxAge: time.Second, + }) - msg := &nats.Msg{ - Subject: "test", - Header: nats.Header{}, - } + msg := &nats.Msg{ + Subject: "test", + Header: nats.Header{}, + } - // The first message we publish is set to "never expire", therefore it - // won't age out with the MaxAge policy. - msg.Header.Set(JSMessageTTL, "never") - _, err := js.PublishMsg(msg) - require_NoError(t, err) + // The first message we publish is set to "never expire", therefore it + // won't age out with the MaxAge policy. + msg.Header.Set(JSMessageTTL, "never") + _, err := js.PublishMsg(msg) + require_NoError(t, err) - // Following messages will be published as normal and will age out. - msg.Header.Del(JSMessageTTL) - for i := 1; i <= 10; i++ { - _, err := js.PublishMsg(msg) - require_NoError(t, err) - } + // Following messages will be published as normal and will age out. + msg.Header.Del(JSMessageTTL) + for i := 1; i <= 10; i++ { + _, err := js.PublishMsg(msg) + require_NoError(t, err) + } - si, err := js.StreamInfo("TEST") - require_NoError(t, err) - require_Equal(t, si.State.Msgs, 11) - require_Equal(t, si.State.FirstSeq, 1) - require_Equal(t, si.State.LastSeq, 11) + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 11) + require_Equal(t, si.State.FirstSeq, 1) + require_Equal(t, si.State.LastSeq, 11) - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 2) - si, err = js.StreamInfo("TEST") - require_NoError(t, err) - require_Equal(t, si.State.Msgs, 1) - require_Equal(t, si.State.FirstSeq, 1) - require_Equal(t, si.State.LastSeq, 11) + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 1) + require_Equal(t, si.State.FirstSeq, 1) + require_Equal(t, si.State.LastSeq, 11) + }) + } } func TestJetStreamMessageTTLDisabled(t *testing.T) { - s := RunBasicJetStreamServer(t) - defer s.Shutdown() + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - jsStreamCreate(t, nc, &StreamConfig{ - Name: "TEST", - Storage: FileStorage, - Subjects: []string{"test"}, - }) + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: storage, + Subjects: []string{"test"}, + }) - msg := &nats.Msg{ - Subject: "test", - Header: nats.Header{}, - } + msg := &nats.Msg{ + Subject: "test", + Header: nats.Header{}, + } - msg.Header.Set(JSMessageTTL, "1s") - _, err := js.PublishMsg(msg) - require_Error(t, err) + msg.Header.Set(JSMessageTTL, "1s") + _, err := js.PublishMsg(msg) + require_Error(t, err) + }) + } } func TestJetStreamMessageTTLWhenSourcing(t *testing.T) { @@ -25338,40 +25354,44 @@ func TestJetStreamMessageTTLWhenMirroring(t *testing.T) { } func TestJetStreamSubjectDeleteMarkers(t *testing.T) { - s := RunBasicJetStreamServer(t) - defer s.Shutdown() + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - jsStreamCreate(t, nc, &StreamConfig{ - Name: "TEST", - Storage: FileStorage, - Subjects: []string{"test"}, - MaxAge: time.Second, - AllowMsgTTL: true, - SubjectDeleteMarkers: true, - SubjectDeleteMarkerTTL: "1s", - }) + jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Storage: storage, + Subjects: []string{"test"}, + MaxAge: time.Second, + AllowMsgTTL: true, + SubjectDeleteMarkers: true, + SubjectDeleteMarkerTTL: "1s", + }) - sub, err := js.SubscribeSync("test") - require_NoError(t, err) + sub, err := js.SubscribeSync("test") + require_NoError(t, err) - for i := 0; i < 3; i++ { - _, err = js.Publish("test", nil) - require_NoError(t, err) - } + for i := 0; i < 3; i++ { + _, err = js.Publish("test", nil) + require_NoError(t, err) + } - for i := 0; i < 3; i++ { - msg, err := sub.NextMsg(time.Second) - require_NoError(t, err) - require_NoError(t, msg.AckSync()) - } + for i := 0; i < 3; i++ { + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_NoError(t, msg.AckSync()) + } - msg, err := sub.NextMsg(time.Second * 10) - require_NoError(t, err) - require_Equal(t, msg.Header.Get(JSAppliedLimit), "MaxAge") - require_Equal(t, msg.Header.Get(JSMessageTTL), "1s") + msg, err := sub.NextMsg(time.Second * 10) + require_NoError(t, err) + require_Equal(t, msg.Header.Get(JSAppliedLimit), "MaxAge") + require_Equal(t, msg.Header.Get(JSMessageTTL), "1s") + }) + } } func TestJetStreamSubjectDeleteMarkersWithMirror(t *testing.T) { @@ -25402,20 +25422,24 @@ func TestJetStreamSubjectDeleteMarkersWithMirror(t *testing.T) { } func TestJetStreamSubjectDeleteMarkersDefaultTTL(t *testing.T) { - s := RunBasicJetStreamServer(t) - defer s.Shutdown() + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() - nc, _ := jsClientConnect(t, s) - defer nc.Close() + nc, _ := jsClientConnect(t, s) + defer nc.Close() - sc, err := jsStreamCreate(t, nc, &StreamConfig{ - Name: "Origin", - Storage: FileStorage, - Subjects: []string{"test"}, - AllowMsgTTL: true, - SubjectDeleteMarkers: true, - }) - require_NoError(t, err) + sc, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "Origin", + Storage: storage, + Subjects: []string{"test"}, + AllowMsgTTL: true, + SubjectDeleteMarkers: true, + }) + require_NoError(t, err) - require_Equal(t, sc.SubjectDeleteMarkerTTL, subjectDeleteMarkerDefaultTTL) + require_Equal(t, sc.SubjectDeleteMarkerTTL, subjectDeleteMarkerDefaultTTL) + }) + } } diff --git a/server/memstore.go b/server/memstore.go index b9217c0fd1..0bc2841f58 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -17,6 +17,7 @@ import ( crand "crypto/rand" "encoding/binary" "fmt" + "math" "slices" "sort" "sync" @@ -24,6 +25,7 @@ import ( "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nats-server/v2/server/stree" + "github.com/nats-io/nats-server/v2/server/thw" ) // TODO(dlc) - This is a fairly simplistic approach but should do for now. @@ -36,10 +38,11 @@ type memStore struct { dmap avl.SequenceSet maxp int64 scb StorageUpdateHandler - tcb SubjectDeleteMarkerUpdateHandler + sdmcb SubjectDeleteMarkerUpdateHandler ageChk *time.Timer consumers int receivedAny bool + ttls *thw.HashWheel } func newMemStore(cfg *StreamConfig) (*memStore, error) { @@ -55,6 +58,10 @@ func newMemStore(cfg *StreamConfig) (*memStore, error) { maxp: cfg.MaxMsgsPer, cfg: *cfg, } + // Only create a THW if we're going to allow TTLs. + if cfg.AllowMsgTTL { + ms.ttls = thw.NewHashWheel() + } if cfg.FirstSeq > 0 { if _, err := ms.purge(cfg.FirstSeq); err != nil { return nil, err @@ -211,9 +218,18 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, tt ms.enforceMsgLimit() ms.enforceBytesLimit() + // Per-message TTL. + if ms.ttls != nil && ttl > 0 { + expires := time.Duration(ts) + (time.Second * time.Duration(ttl)) + ms.ttls.Add(seq, int64(expires)) + } + // Check if we have and need the age expiration timer running. - if ms.ageChk == nil && ms.cfg.MaxAge != 0 { + switch { + case ms.ageChk == nil && (ms.cfg.MaxAge > 0 || ms.ttls != nil): ms.startAgeChk() + case ms.ageChk != nil && ms.ttls != nil && ttl > 0: + ms.resetAgeChk(0) } return nil } @@ -316,7 +332,7 @@ func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) { // RegisterSubjectDeleteMarkerUpdates registers a callback for updates to new subject delete markers. func (ms *memStore) RegisterSubjectDeleteMarkerUpdates(cb SubjectDeleteMarkerUpdateHandler) { ms.mu.Lock() - ms.tcb = cb + ms.sdmcb = cb ms.mu.Unlock() } @@ -890,25 +906,52 @@ func (ms *memStore) enforceBytesLimit() { // Will start the age check timer. // Lock should be held. func (ms *memStore) startAgeChk() { - if ms.ageChk == nil && ms.cfg.MaxAge != 0 { + if ms.ageChk != nil { + return + } + if ms.cfg.MaxAge != 0 || ms.ttls != nil { ms.ageChk = time.AfterFunc(ms.cfg.MaxAge, ms.expireMsgs) } } // Lock should be held. func (ms *memStore) resetAgeChk(delta int64) { - if ms.cfg.MaxAge == 0 { + var next int64 = math.MaxInt64 + if ms.ttls != nil { + next = ms.ttls.GetNextExpiration(next) + } + + // If there's no MaxAge and there's nothing waiting to be expired then + // don't bother continuing. The next storeRawMsg() will wake us up if + // needs be. + if ms.cfg.MaxAge <= 0 && next == math.MaxInt64 { + clearTimer(&ms.ageChk) return } + // Check to see if we should be firing sooner than MaxAge for an expiring TTL. fireIn := ms.cfg.MaxAge - if delta > 0 && time.Duration(delta) < fireIn { - if fireIn = time.Duration(delta); fireIn < 250*time.Millisecond { - // Only fire at most once every 250ms. - // Excessive firing can effect ingest performance. - fireIn = time.Second + if next < math.MaxInt64 { + // Looks like there's a next expiration, use it either if there's no + // MaxAge set or if it looks to be sooner than MaxAge is. + if until := time.Until(time.Unix(0, next)); fireIn == 0 || until < fireIn { + fireIn = until + } + } + + // If not then look at the delta provided (usually gap to next age expiry). + if delta > 0 { + if fireIn == 0 || time.Duration(delta) < fireIn { + fireIn = time.Duration(delta) } } + + // Make sure we aren't firing too often either way, otherwise we can + // negatively impact stream ingest performance. + if fireIn < 250*time.Millisecond { + fireIn = 250 * time.Millisecond + } + if ms.ageChk != nil { ms.ageChk.Reset(fireIn) } else { @@ -916,43 +959,117 @@ func (ms *memStore) resetAgeChk(delta int64) { } } +// Lock should be held. +func (ms *memStore) cancelAgeChk() { + if ms.ageChk != nil { + ms.ageChk.Stop() + ms.ageChk = nil + } +} + +// Lock must be held so that nothing else can interleave and write a +// new message on this subject before we get the chance to write the +// delete marker. If the delete marker is written successfully then +// this function returns a callback func to call scb and sdmcb after +// the lock has been released. +func (ms *memStore) subjectDeleteMarkerIfNeeded(sm *StoreMsg, reason string) func() { + // If the deleted message was itself a delete marker then + // don't write out more of them or we'll churn endlessly. + if len(getHeader(JSAppliedLimit, sm.hdr)) != 0 { + return nil + } + if !ms.cfg.SubjectDeleteMarkers { + return nil + } + if _, ok := ms.fss.Find(stringToBytes(sm.subj)); ok { + // There are still messages left with this subject, + // therefore it wasn't the last message deleted. + return nil + } + // Build the subject delete marker. If no TTL is specified then + // we'll default to 15 minutes — by that time every possible condition + // should have cleared (i.e. ordered consumer timeout, client timeouts, + // route/gateway interruptions, even device/client restarts etc). + ttl, _ := parseMessageTTL(ms.cfg.SubjectDeleteMarkerTTL) + if ttl <= 0 { + return nil + } + var _hdr [128]byte + hdr := fmt.Appendf(_hdr[:0], "NATS/1.0\r\n%s: %s\r\n%s: %s\r\n\r\n", JSAppliedLimit, reason, JSMessageTTL, time.Duration(ttl)*time.Second) + seq, ts := ms.state.LastSeq+1, time.Now().UnixNano() + // Store it in the stream and then prepare the callbacks + // to return to the caller. + if err := ms.storeRawMsg(sm.subj, hdr, nil, seq, ts, ttl); err != nil { + return nil + } + cb, tcb := ms.scb, ms.sdmcb + return func() { + if cb != nil { + cb(1, int64(fileStoreMsgSize(sm.subj, hdr, nil)), seq, sm.subj) + } + if tcb != nil { + tcb(seq, sm.subj) + } + } +} + // Will expire msgs that are too old. func (ms *memStore) expireMsgs() { + var smv StoreMsg + var sm *StoreMsg ms.mu.RLock() - now := time.Now().UnixNano() - minAge := now - int64(ms.cfg.MaxAge) + maxAge := int64(ms.cfg.MaxAge) + minAge := time.Now().UnixNano() - maxAge ms.mu.RUnlock() - for { - ms.mu.Lock() - if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge { - ms.deleteFirstMsgOrPanic() - // Recalculate in case we are expiring a bunch. - now = time.Now().UnixNano() - minAge = now - int64(ms.cfg.MaxAge) - ms.mu.Unlock() - } else { - // We will exit here - if len(ms.msgs) == 0 { - if ms.ageChk != nil { - ms.ageChk.Stop() - ms.ageChk = nil - } - } else { - var fireIn time.Duration - if sm == nil { - fireIn = ms.cfg.MaxAge - } else { - fireIn = time.Duration(sm.ts - minAge) - } - if ms.ageChk != nil { - ms.ageChk.Reset(fireIn) - } else { - ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs) + if maxAge > 0 { + var seq uint64 + for sm, seq, _ = ms.LoadNextMsg(fwcs, true, 0, &smv); sm != nil && sm.ts <= minAge; sm, seq, _ = ms.LoadNextMsg(fwcs, true, seq+1, &smv) { + if len(sm.hdr) > 0 { + if ttl, err := getMessageTTL(sm.hdr); err == nil && ttl < 0 { + // The message has a negative TTL, therefore it must "never expire". + minAge = time.Now().UnixNano() - maxAge + continue } } + ms.mu.Lock() + ms.removeMsg(seq, false) + cbs := ms.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge) ms.mu.Unlock() - break + if cbs != nil { + cbs() + } + // Recalculate in case we are expiring a bunch. + minAge = time.Now().UnixNano() - maxAge + } + } + + ms.mu.Lock() + defer ms.mu.Unlock() + + // TODO: Not great that we're holding the lock here, but the timed hash wheel isn't thread-safe. + nextTTL := int64(math.MaxInt64) + if ms.ttls != nil { + ms.ttls.ExpireTasks(func(seq uint64, ts int64) { + ms.removeMsg(seq, false) + }) + if maxAge > 0 { + // Only check if we're expiring something in the next MaxAge interval, saves us a bit + // of work if MaxAge will beat us to the next expiry anyway. + nextTTL = ms.ttls.GetNextExpiration(time.Now().Add(time.Duration(maxAge)).UnixNano()) + } else { + nextTTL = ms.ttls.GetNextExpiration(math.MaxInt64) + } + } + + // Only cancel if no message left, not on potential lookup error that would result in sm == nil. + if ms.state.Msgs == 0 && nextTTL == math.MaxInt64 { + ms.cancelAgeChk() + } else { + if sm == nil { + ms.resetAgeChk(0) + } else { + ms.resetAgeChk(sm.ts - minAge) } } } diff --git a/server/memstore_test.go b/server/memstore_test.go index 5458a31b80..830dce5a54 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1161,6 +1161,76 @@ func TestMemStoreNumPendingBug(t *testing.T) { require_Equal(t, total, checkTotal) } +func TestMemStoreMessageTTL(t *testing.T) { + fs, err := newMemStore( + &StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: MemoryStorage, AllowMsgTTL: true}, + ) + require_NoError(t, err) + defer fs.Stop() + + ttl := int64(1) // 1 second + + for i := 1; i <= 10; i++ { + _, _, err = fs.StoreMsg("test", nil, nil, ttl) + require_NoError(t, err) + } + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 10) + require_Equal(t, ss.Msgs, 10) + + time.Sleep(time.Second * 2) + + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 11) + require_Equal(t, ss.LastSeq, 10) + require_Equal(t, ss.Msgs, 0) +} + +func TestMemStoreSubjectDeleteMarkers(t *testing.T) { + fs, err := newMemStore( + &StreamConfig{ + Name: "zzz", Subjects: []string{"test"}, Storage: MemoryStorage, + MaxAge: time.Second, AllowMsgTTL: true, + SubjectDeleteMarkers: true, SubjectDeleteMarkerTTL: "1s", + }, + ) + require_NoError(t, err) + defer fs.Stop() + + // Store three messages that will expire because of MaxAge. + var seq uint64 + for i := 0; i < 3; i++ { + seq, _, err = fs.StoreMsg("test", nil, nil, 0) + require_NoError(t, err) + } + + // The last message should be gone after MaxAge has passed. + time.Sleep(time.Second * 2) + sm, err := fs.LoadMsg(seq, nil) + require_Error(t, err) + require_Equal(t, sm, nil) + + // We should have replaced it with a tombstone. + sm, err = fs.LoadMsg(seq+1, nil) + require_NoError(t, err) + require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge) + require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") + + time.Sleep(time.Second * 2) + + // The tombstone itself only has a TTL of 1 second so that should + // also be gone by now too. No more tombstones should have been + // published. + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, sm.seq+1) + require_Equal(t, ss.LastSeq, sm.seq) + require_Equal(t, ss.Msgs, 0) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////