Skip to content

Commit

Permalink
[FIXED] Invalidate stream state before write if tracking states mismatch
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Oct 29, 2024
1 parent f1fca62 commit dc6d45a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8037,7 +8037,7 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Snapshot prior dirty count.
priorDirty := fs.dirty

statesEqual := trackingStatesEqual(&fs.state, &mstate) || len(fs.blks) > 0
statesEqual := trackingStatesEqual(&fs.state, &mstate)
// Release lock.
fs.mu.Unlock()

Expand Down
35 changes: 35 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8076,3 +8076,38 @@ func Benchmark_FileStoreCreateConsumerStores(b *testing.B) {
})
}
}

func TestFileStoreWriteFullStateDetectCorruptState(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("abc")
for i := 1; i <= 10; i++ {
_, _, err = fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg)
require_NoError(t, err)
}

// Simulate a change in a message block not being reflected in the fs.
mb := fs.selectMsgBlock(2)
mb.mu.Lock()
mb.msgs--
mb.mu.Unlock()

var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 10)
require_Equal(t, ss.Msgs, 10)

// Make sure we detect the corrupt state and rebuild.
err = fs.writeFullState()
require_Error(t, err, errCorruptState)

fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 10)
require_Equal(t, ss.Msgs, 9)
}

0 comments on commit dc6d45a

Please sign in to comment.