Skip to content

Commit

Permalink
[FIXED] Memory stream compact should clear subjects state & hold writ…
Browse files Browse the repository at this point in the history
…e lock (#6187)

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Nov 28, 2024
2 parents cfaad68 + 9042ba3 commit 9c2061e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
33 changes: 33 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24688,3 +24688,36 @@ func TestJetStreamDelayedAPIResponses(t *testing.T) {
s.sendDelayedAPIErrResponse(nil, acc, "I", _EMPTY_, "request9", "response9", nil, 100*time.Millisecond)
check("request9", "response9")
}

func TestJetStreamMemoryPurgeClearsSubjectsState(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.MemoryStorage,
})
require_NoError(t, err)

pa, err := js.Publish("foo", nil)
require_NoError(t, err)
require_Equal(t, pa.Sequence, 1)

// When requesting stream info, we expect one subject foo with one entry.
si, err := js.StreamInfo("TEST", &nats.StreamInfoRequest{SubjectsFilter: ">"})
require_NoError(t, err)
require_Len(t, len(si.State.Subjects), 1)
require_Equal(t, si.State.Subjects["foo"], 1)

// After purging, moving the sequence up, the subjects state should be cleared.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100})
require_NoError(t, err)

si, err = js.StreamInfo("TEST", &nats.StreamInfoRequest{SubjectsFilter: ">"})
require_NoError(t, err)
require_Len(t, len(si.State.Subjects), 0)
}
7 changes: 5 additions & 2 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,9 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje

// SubjectsState returns a map of SimpleState for all matching subjects.
func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()
// This needs to be a write lock, as we can mutate the per-subject state.
ms.mu.Lock()
defer ms.mu.Unlock()

if ms.fss.Size() == 0 {
return nil
Expand Down Expand Up @@ -1076,7 +1077,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Time{}
ms.state.LastSeq = seq - 1
// Reset msgs and fss.
ms.msgs = make(map[uint64]*StoreMsg)
ms.fss = stree.NewSubjectTree[SimpleState]()
}
ms.mu.Unlock()

Expand Down

0 comments on commit 9c2061e

Please sign in to comment.