Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Issue #2397 #2480

Merged
merged 5 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
return true
}
}

return false
}

Expand Down Expand Up @@ -1295,17 +1296,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {

// Make sure we do not leave the apply channel to fill up and block the raft layer.
defer func() {
if n.State() != Closed {
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
if n.State() == Closed {
return
}
if n.Leader() {
n.StepDown()
}
// Drain the commit channel..
for len(ach) > 0 {
select {
case <-ach:
default:
return
}
}
}()
Expand Down Expand Up @@ -1529,11 +1531,35 @@ func (mset *stream) resetClusteredState() bool {
js.mu.Lock()
sa.Group.node = nil
js.mu.Unlock()
go js.processClusterCreateStream(acc, sa)
go js.restartClustered(acc, sa)
}
return true
}

// This will reset the stream and consumers.
// Should be done in separate go routine.
func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
js.processClusterCreateStream(acc, sa)

// Check consumers.
js.mu.Lock()
var consumers []*consumerAssignment
if cc := js.cluster; cc != nil && cc.meta != nil {
ourID := cc.meta.ID()
for _, ca := range sa.consumers {
if rg := ca.Group; rg != nil && rg.isMember(ourID) {
rg.node = nil // Erase group raft/node state.
consumers = append(consumers, ca)
}
}
}
js.mu.Unlock()

for _, ca := range consumers {
js.processClusterCreateConsumer(ca, nil)
}
}

func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error {
for _, e := range ce.Entries {
if e.Type == EntryNormal {
Expand Down Expand Up @@ -1561,7 +1587,9 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// We can skip if we know this is less than what we already have.
if lseq < last {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
if !isRecovering {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
}
continue
}

Expand Down
173 changes: 172 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -7763,7 +7765,7 @@ func TestJetStreamPullConsumerLeakedSubs(t *testing.T) {
defer sub.Unsubscribe()

// Load up a bunch of requests.
numRequests := 20 //100_000
numRequests := 20
for i := 0; i < numRequests; i++ {
js.PublishAsync("Domains.Domain", []byte("QUESTION"))
}
Expand Down Expand Up @@ -8078,6 +8080,175 @@ func TestJetStreamDeadlockOnVarz(t *testing.T) {
wg.Wait()
}

// Make sure when we try to hard reset a stream state in a cluster that we also re-create the consumers.
func TestJetStreamClusterStreamReset(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
Retention: nats.WorkQueuePolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numRequests := 20
for i := 0; i < numRequests; i++ {
js.Publish("foo.created", []byte("REQ"))
}

// Durable.
sub, err := js.SubscribeSync("foo.created", nats.Durable("d1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != uint64(numRequests) {
t.Fatalf("Expected %d msgs, got bad state: %+v", numRequests, si.State)
}
// Let settle a bit.
time.Sleep(250 * time.Millisecond)

// Grab number go routines.
base := runtime.NumGoroutine()

// Grab a server that is the consumer leader for the durable.
cl := c.consumerLeader("$G", "TEST", "d1")
mset, err := cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do a hard reset here by hand.
mset.resetClusteredState()
// Wait til we have the leader elected.
c.waitOnConsumerLeader("$G", "TEST", "d1")

// So do not wait 10s in call in checkFor.
js2, _ := nc.JetStream(nats.MaxWait(100 * time.Millisecond))
// Make sure we can get the consumer info eventually.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
_, err := js2.ConsumerInfo("TEST", "d1")
return err
})

// Grab number go routines.
if after := runtime.NumGoroutine(); base > after {
t.Fatalf("Expected %d go routines, got %d", base, after)
}
}

// Issue #2397
func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R2S", 2)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Hold onto servers.
sl := c.streamLeader("$G", "TEST")
if sl == nil {
t.Fatalf("Did not get a server")
}
nsl := c.randomNonStreamLeader("$G", "TEST")
if nsl == nil {
t.Fatalf("Did not get a server")
}
// Grab low level stream and raft node.
mset, err := nsl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
node := mset.raftNode()
if node == nil {
t.Fatalf("Could not get stream group name")
}
gname := node.Group()

numRequests := 100
for i := 0; i < numRequests; i++ {
// This will force a snapshot which will prune the normal log.
// We will remove the snapshot to simulate the error condition.
if i == 10 {
if err := node.InstallSnapshot(mset.stateSnapshot()); err != nil {
t.Fatalf("Error installing snapshot: %v", err)
}
}
js.Publish("foo.created", []byte("REQ"))
}

config := nsl.JetStreamConfig()
if config == nil {
t.Fatalf("No config")
}
lconfig := sl.JetStreamConfig()
if lconfig == nil {
t.Fatalf("No config")
}

nc.Close()
c.stopAll()
// Remove all state by truncating for the non-leader.
for _, fn := range []string{"1.blk", "1.idx", "1.fss"} {
fname := path.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn)
fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms)
if err != nil {
continue
}
fd.Truncate(0)
fd.Close()
}
// For both make sure we have no raft snapshots.
snapDir := path.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)
snapDir = path.Join(config.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)

// Now restart.
c.restartAll()
for _, cs := range c.servers {
c.waitOnStreamCurrent(cs, "$G", "TEST")
}

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

if _, err := js.Publish("foo.created", []byte("REQ")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.LastSeq != 101 {
t.Fatalf("bad state after restart: %+v", si.State)
}
}

// Support functions

// Used to setup superclusters for tests.
Expand Down
5 changes: 3 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2600,8 +2600,9 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// For clustering the lower layers will pass our expected lseq. If it is present check for that here.
if lseq > 0 && lseq != (mset.lseq+mset.clfs) {
isMisMatch := true
// If our first message for this mirror, see if we have to adjust our starting sequence.
if mset.cfg.Mirror != nil {
// We may be able to recover here if we have no state whatsoever, or we are a mirror.
// See if we have to adjust our starting sequence.
if mset.lseq == 0 || mset.cfg.Mirror != nil {
var state StreamState
mset.store.FastState(&state)
if state.FirstSeq == 0 {
Expand Down