Skip to content

Commit

Permalink
[FIXED] Invalidate stream state on recovery if tracking states mismatch
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Oct 29, 2024
1 parent dc6d45a commit 6241569
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
9 changes: 9 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
var matched bool
mb := fs.lmb
if mb == nil || mb.index != blkIndex {
os.Remove(fn)
fs.warn("Stream state block does not exist or index mismatch")
return errCorruptState
}
Expand Down Expand Up @@ -1777,6 +1778,14 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}
}

// We check first and last seq and number of msgs and bytes. If there is a difference,
// return and error so we rebuild from the message block state on disk.
if !trackingStatesEqual(&fs.state, &mstate) {
os.Remove(fn)
fs.warn("Stream state encountered internal inconsistency on recover")
return errCorruptState
}

return nil
}

Expand Down
33 changes: 33 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/hmac"
crand "crypto/rand"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -8111,3 +8112,35 @@ func TestFileStoreWriteFullStateDetectCorruptState(t *testing.T) {
require_Equal(t, ss.LastSeq, 10)
require_Equal(t, ss.Msgs, 9)
}

func TestFileStoreRecoverFullStateDetectCorruptState(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("abc")
for i := 1; i <= 10; i++ {
_, _, err = fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg)
require_NoError(t, err)
}

err = fs.writeFullState()
require_NoError(t, err)

sfile := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(sfile)
require_NoError(t, err)
// Update to an incorrect message count.
binary.PutUvarint(buf[2:], 0)
// Just append a corrected checksum to the end to make it pass the checks.
fs.hh.Reset()
fs.hh.Write(buf)
buf = fs.hh.Sum(buf)
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)

err = fs.recoverFullState()
require_Error(t, err, errCorruptState)
}

0 comments on commit 6241569

Please sign in to comment.