Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG: empty snapshots dir if memory WAL #6169

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,20 +424,20 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
n.vote = vote
}

// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}

// Can't recover snapshots if memory based since wal will be reset.
// We will inherit from the current leader.
if _, ok := n.wal.(*memStore); ok {
os.Remove(filepath.Join(n.sd, snapshotsDir, "*"))
_ = os.RemoveAll(filepath.Join(n.sd, snapshotsDir))
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
} else {
// See if we have any snapshots and if so load and process on startup.
n.setupLastSnapshot()
}

// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}

truncateAndErr := func(index uint64) {
if err := n.wal.Truncate(index); err != nil {
n.setWriteErr(err)
Expand Down
14 changes: 10 additions & 4 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine {
}

func initSingleMemRaftNode(t *testing.T) (*raft, func()) {
t.Helper()
n, c := initSingleMemRaftNodeWithCluster(t)
cleanup := func() {
c.shutdown()
}
return n, cleanup
}

func initSingleMemRaftNodeWithCluster(t *testing.T) (*raft, *cluster) {
t.Helper()
c := createJetStreamClusterExplicit(t, "R3S", 3)
s := c.servers[0] // RunBasicJetStreamServer not available
Expand All @@ -332,10 +341,7 @@ func initSingleMemRaftNode(t *testing.T) (*raft, func()) {
n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

cleanup := func() {
c.shutdown()
}
return n, cleanup
return n, c
}

// Encode an AppendEntry.
Expand Down
45 changes: 45 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,3 +1654,48 @@ func TestNRGForwardProposalResponse(t *testing.T) {

rg.waitOnTotal(t, 123)
}

func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) {
n, c := initSingleMemRaftNodeWithCluster(t)
defer c.shutdown()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Simply receive first message.
n.processAppendEntry(aeMsg, n.aesub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.commit, 0)

// Heartbeat moves commit up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 1)

// Manually call back down to applied, and then snapshot.
n.Applied(1)
err := n.InstallSnapshot(nil)
require_NoError(t, err)

// Stop current node and restart it.
n.Stop()
n.WaitForStop()

s := c.servers[0]
ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage})
require_NoError(t, err)
cfg := &RaftConfig{Name: "TEST", Store: n.sd, Log: ms}
n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

// Since the WAL is in-memory, the snapshots dir should've been emptied upon restart.
files, err := os.ReadDir(filepath.Join(n.sd, snapshotsDir))
require_NoError(t, err)
require_Len(t, len(files), 0)
}