Skip to content

Commit

Permalink
NRG: empty snapshots dir if memory WAL (#6169)
Browse files Browse the repository at this point in the history
`os.Remove(filepath.Join(n.sd, snapshotsDir, "*"))` doesn't remove all
files in the snapshots directory. This PR ensures the snapshots dir is
cleaned up for in-memory WALs.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Nov 25, 2024
2 parents 1f1d8c6 + 3edc0d1 commit 3ba828c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
12 changes: 6 additions & 6 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,20 +424,20 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
n.vote = vote
}

// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}

// Can't recover snapshots if memory based since wal will be reset.
// We will inherit from the current leader.
if _, ok := n.wal.(*memStore); ok {
os.Remove(filepath.Join(n.sd, snapshotsDir, "*"))
_ = os.RemoveAll(filepath.Join(n.sd, snapshotsDir))
} else {
// See if we have any snapshots and if so load and process on startup.
n.setupLastSnapshot()
}

// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}

truncateAndErr := func(index uint64) {
if err := n.wal.Truncate(index); err != nil {
n.setWriteErr(err)
Expand Down
14 changes: 10 additions & 4 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine {
}

func initSingleMemRaftNode(t *testing.T) (*raft, func()) {
t.Helper()
n, c := initSingleMemRaftNodeWithCluster(t)
cleanup := func() {
c.shutdown()
}
return n, cleanup
}

func initSingleMemRaftNodeWithCluster(t *testing.T) (*raft, *cluster) {
t.Helper()
c := createJetStreamClusterExplicit(t, "R3S", 3)
s := c.servers[0] // RunBasicJetStreamServer not available
Expand All @@ -332,10 +341,7 @@ func initSingleMemRaftNode(t *testing.T) (*raft, func()) {
n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

cleanup := func() {
c.shutdown()
}
return n, cleanup
return n, c
}

// Encode an AppendEntry.
Expand Down
45 changes: 45 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,3 +1654,48 @@ func TestNRGForwardProposalResponse(t *testing.T) {

rg.waitOnTotal(t, 123)
}

func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) {
n, c := initSingleMemRaftNodeWithCluster(t)
defer c.shutdown()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Simply receive first message.
n.processAppendEntry(aeMsg, n.aesub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.commit, 0)

// Heartbeat moves commit up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 1)

// Manually call back down to applied, and then snapshot.
n.Applied(1)
err := n.InstallSnapshot(nil)
require_NoError(t, err)

// Stop current node and restart it.
n.Stop()
n.WaitForStop()

s := c.servers[0]
ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage})
require_NoError(t, err)
cfg := &RaftConfig{Name: "TEST", Store: n.sd, Log: ms}
n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

// Since the WAL is in-memory, the snapshots dir should've been emptied upon restart.
files, err := os.ReadDir(filepath.Join(n.sd, snapshotsDir))
require_NoError(t, err)
require_Len(t, len(files), 0)
}

0 comments on commit 3ba828c

Please sign in to comment.