Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG: empty snapshots dir if memory WAL #6169

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions server/raft.go
Original file line number Diff line number Diff line change
@@ -424,20 +424,20 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
n.vote = vote
}

// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}

// Can't recover snapshots if memory based since wal will be reset.
// We will inherit from the current leader.
if _, ok := n.wal.(*memStore); ok {
os.Remove(filepath.Join(n.sd, snapshotsDir, "*"))
_ = os.RemoveAll(filepath.Join(n.sd, snapshotsDir))
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
} else {
// See if we have any snapshots and if so load and process on startup.
n.setupLastSnapshot()
}

// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}

truncateAndErr := func(index uint64) {
if err := n.wal.Truncate(index); err != nil {
n.setWriteErr(err)
14 changes: 10 additions & 4 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
@@ -319,6 +319,15 @@ func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine {
}

func initSingleMemRaftNode(t *testing.T) (*raft, func()) {
t.Helper()
n, c := initSingleMemRaftNodeWithCluster(t)
cleanup := func() {
c.shutdown()
}
return n, cleanup
}

func initSingleMemRaftNodeWithCluster(t *testing.T) (*raft, *cluster) {
t.Helper()
c := createJetStreamClusterExplicit(t, "R3S", 3)
s := c.servers[0] // RunBasicJetStreamServer not available
@@ -332,10 +341,7 @@ func initSingleMemRaftNode(t *testing.T) (*raft, func()) {
n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

cleanup := func() {
c.shutdown()
}
return n, cleanup
return n, c
}

// Encode an AppendEntry.
45 changes: 45 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
@@ -1654,3 +1654,48 @@ func TestNRGForwardProposalResponse(t *testing.T) {

rg.waitOnTotal(t, 123)
}

func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) {
n, c := initSingleMemRaftNodeWithCluster(t)
defer c.shutdown()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Simply receive first message.
n.processAppendEntry(aeMsg, n.aesub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.commit, 0)

// Heartbeat moves commit up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 1)

// Manually call back down to applied, and then snapshot.
n.Applied(1)
err := n.InstallSnapshot(nil)
require_NoError(t, err)

// Stop current node and restart it.
n.Stop()
n.WaitForStop()

s := c.servers[0]
ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage})
require_NoError(t, err)
cfg := &RaftConfig{Name: "TEST", Store: n.sd, Log: ms}
n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

// Since the WAL is in-memory, the snapshots dir should've been emptied upon restart.
files, err := os.ReadDir(filepath.Join(n.sd, snapshotsDir))
require_NoError(t, err)
require_Len(t, len(files), 0)
}