Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Clustering: restore snapshot with some messages no longer avail #835

Merged
merged 1 commit into from
May 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 98 additions & 6 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/nats-io/go-nats-streaming"
"github.com/nats-io/go-nats-streaming/pb"
"github.com/nats-io/nats-streaming-server/stores"
"github.com/nats-io/nats-streaming-server/test"
)

var defaultRaftLog string
Expand All @@ -59,6 +60,20 @@ func cleanupRaftLog(t *testing.T) {
}
}

func shutdownAndCleanupState(t *testing.T, s *StanServer, nodeID string) {
t.Helper()
s.Shutdown()
switch persistentStoreType {
case stores.TypeFile:
os.RemoveAll(filepath.Join(defaultDataStore, nodeID))
os.RemoveAll(filepath.Join(defaultRaftLog, nodeID))
case stores.TypeSQL:
test.CleanupSQLDatastore(t, testSQLDriver, testSQLSource+nodeID)
default:
t.Fatalf("This test needs to be updated for store type: %v", persistentStoreType)
}
}

func getTestDefaultOptsForClustering(id string, bootstrap bool) *Options {
opts := getTestDefaultOptsForPersistentStore()
if persistentStoreType == stores.TypeFile {
Expand Down Expand Up @@ -4707,15 +4722,10 @@ func TestClusteringSubDontStallDueToMsgExpiration(t *testing.T) {
}
checkReceived()

s2.Shutdown()
// We aremove all state from node "b", but even if we didn't, on restart,
// since "b" store would be behind the rest, it would be emptied because
// the current first message is move than the "b"'s last sequence.
// For DB, let it recover its state...
if persistentStoreType == stores.TypeFile {
os.RemoveAll(filepath.Join(defaultDataStore, "b"))
os.RemoveAll(filepath.Join(defaultRaftLog, "b"))
}
shutdownAndCleanupState(t, s2, "b")

for i := 0; i < secondBatch; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
Expand Down Expand Up @@ -5711,3 +5721,85 @@ func TestClusteringNumSubs(t *testing.T) {
sc.Close()
leader.Shutdown()
}

func TestClusteringRestoreSnapshotWithSomeMsgsNoLongerAvail(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.MaxMsgs = 10
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.MaxMsgs = 10
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.MaxMsgs = 10
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)

sc := NewDefaultConnection(t)
defer sc.Close()

for i := 0; i < 10; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}

// Create a snapshot that will indicate that there is messages from 1 to 10.
if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}

// Shutdown s3 and cleanup its state so it will have nothing in its stores
// and will restore from the snapshot.
shutdownAndCleanupState(t, s3, "c")

// Send 2 more messages that will make messages 1 and 2 disappear
for i := 0; i < 2; i++ {
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
}
sc.Close()

// Start s3. It will restore from the snapshot that says that
// channel has message 1 to 10, and then should receive 2 raft
// logs with messages 11 and 12.
// When s3 will ask the leader for messages 1 and 2, it should
// get empty responses indicating that these messages are gone,
// but should be able to request messages 3 to 10, then 11 and
// 12 will be replayed from raft logs.
s3 = runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

waitFor(t, 5*time.Second, 15*time.Millisecond, func() error {
c := s3.channels.get("foo")
if c == nil {
return fmt.Errorf("Channel foo not recreated yet")
}
first, last, err := c.store.Msgs.FirstAndLastSequence()
if err != nil {
return fmt.Errorf("Error getting first/last seq: %v", err)
}
if first != 3 || last != 12 {
return fmt.Errorf("Expected first=3 last=12, got %v and %v", first, last)
}
return nil
})
}
3 changes: 0 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,6 @@ func (s *StanServer) subToSnapshotRestoreRequests() error {
if err := s.ncsr.Publish(m.Reply, buf); err != nil {
s.log.Errorf("Snapshot restore request error for channel %q, unable to send response for seq %v: %v", c.name, seq, err)
}
if buf == nil {
return
}
select {
case <-s.shutdownCh:
return
Expand Down
21 changes: 10 additions & 11 deletions server/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,17 +445,16 @@ func (r *raftFSM) restoreMsgsFromSnapshot(c *channel, first, last uint64) error
return err
}
// It is possible that the leader does not have this message because of
// channel limits. If resp.Data is empty, we are in this situation and
// we are done recovering snapshot.
if len(resp.Data) == 0 {
break
}
msg := &pb.MsgProto{}
if err := msg.Unmarshal(resp.Data); err != nil {
panic(err)
}
if _, err := c.store.Msgs.Store(msg); err != nil {
return err
// channel limits. If resp.Data is empty, we are in this situation.
// We need to continue to see if more recent messages are available though.
if len(resp.Data) != 0 {
msg := &pb.MsgProto{}
if err := msg.Unmarshal(resp.Data); err != nil {
panic(err)
}
if _, err := c.store.Msgs.Store(msg); err != nil {
return err
}
}
select {
case <-r.server.shutdownCh:
Expand Down