Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.23-RC.3 #6129

Merged
merged 11 commits into from
Nov 19, 2024
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.22.8"
- "1.21.13"
- "1.23.3"
- "1.22.9"

go_import_path: github.com/nats-io/nats-server

Expand Down Expand Up @@ -58,4 +58,4 @@ deploy:
script: curl -o /tmp/goreleaser.tar.gz -sLf https://github.com/goreleaser/goreleaser/releases/download/v1.26.2/goreleaser_Linux_x86_64.tar.gz && tar -xvf /tmp/goreleaser.tar.gz -C /tmp/ && /tmp/goreleaser
on:
tags: true
condition: ($TRAVIS_GO_VERSION =~ 1.22) && ($TEST_SUITE = "compile")
condition: ($TRAVIS_GO_VERSION =~ 1.23) && ($TEST_SUITE = "compile")
2 changes: 1 addition & 1 deletion scripts/runTestsOnTravis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ if [ "$1" = "compile" ]; then
go build;

# Now run the linters.
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.56.1;
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.0;
golangci-lint run;
if [ "$TRAVIS_TAG" != "" ]; then
go test -race -v -run=TestVersionMatchesTag ./server -ldflags="-X=github.com/nats-io/nats-server/v2/server.serverVersion=$TRAVIS_TAG" -count=1 -vet=off
Expand Down
47 changes: 34 additions & 13 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4570,6 +4570,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Declared here because of goto.
var queues [][]byte

var leafOrigin string
switch c.kind {
case ROUTER:
if len(c.pa.origin) > 0 {
// Picture a message sent from a leafnode to a server that then routes
// this message: CluserA -leaf-> HUB1 -route-> HUB2
// Here we are in HUB2, so c.kind is a ROUTER, but the message will
// contain a c.pa.origin set to "ClusterA" to indicate that this message
// originated from that leafnode cluster.
leafOrigin = bytesToString(c.pa.origin)
}
case LEAF:
leafOrigin = c.remoteCluster()
}

// For all routes/leaf/gateway connections, we may still want to send messages to
// leaf nodes or routes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
Expand Down Expand Up @@ -4609,7 +4624,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
for i := 0; i < len(qsubs); i++ {
sub = qsubs[i]
if dst := sub.client.kind; dst == LEAF || dst == ROUTER {
// If we have assigned an ROUTER rsub already, replace if
// If the destination is a LEAF, we first need to make sure
// that we would not pick one that was the origin of this
// message.
if dst == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// If we have assigned a ROUTER rsub already, replace if
// the destination is a LEAF since we want to favor that.
if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) {
rsub = sub
Expand All @@ -4635,6 +4656,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

// Find a subscription that is able to deliver this message starting at a random index.
// Note that if the message came from a ROUTER, we will only have CLIENT or LEAF
// queue subs here, otherwise we can have all types.
for i := 0; i < lqs; i++ {
if sindex+i < lqs {
sub = qsubs[sindex+i]
Expand All @@ -4655,6 +4678,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// If we come from a LEAF and are about to pick a LEAF connection,
// make sure this is not the same leaf cluster.
if src == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
Expand Down Expand Up @@ -4755,18 +4783,11 @@ sendToRoutesOrLeafs:
// If so make sure we do not send it back to the same cluster for a different
// leafnode. Cluster wide no echo.
if dc.kind == LEAF {
// Check two scenarios. One is inbound from a route (c.pa.origin)
if c.kind == ROUTER && len(c.pa.origin) > 0 {
if bytesToString(c.pa.origin) == dc.remoteCluster() {
continue
}
}
// The other is leaf to leaf.
if c.kind == LEAF {
src, dest := c.remoteCluster(), dc.remoteCluster()
if src != _EMPTY_ && src == dest {
continue
}
// Check two scenarios. One is inbound from a route (c.pa.origin),
// and the other is leaf to leaf. In both case, leafOrigin is the one
// to use for the comparison.
if leafOrigin != _EMPTY_ && leafOrigin == dc.remoteCluster() {
continue
}

// We need to check if this is a request that has a stamped client information header.
Expand Down
4 changes: 3 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5332,12 +5332,14 @@ func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool)
return
}

mset.mu.RUnlock()
mset.mu.Lock()
for seq := start; seq <= stop; seq++ {
if mset.noInterest(seq, co) {
rmseqs = append(rmseqs, seq)
}
}
mset.mu.RUnlock()
mset.mu.Unlock()

// These can be removed.
for _, seq := range rmseqs {
Expand Down
6 changes: 3 additions & 3 deletions server/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

func TestErrCtx(t *testing.T) {
ctx := "Extra context information"
e := NewErrorCtx(ErrWrongGateway, ctx)
e := NewErrorCtx(ErrWrongGateway, "%s", ctx)

if e.Error() != ErrWrongGateway.Error() {
t.Fatalf("%v and %v are supposed to be identical", e, ErrWrongGateway)
Expand All @@ -45,9 +45,9 @@ func TestErrCtx(t *testing.T) {

func TestErrCtxWrapped(t *testing.T) {
ctxO := "Original Ctx"
eO := NewErrorCtx(ErrWrongGateway, ctxO)
eO := NewErrorCtx(ErrWrongGateway, "%s", ctxO)
ctx := "Extra context information"
e := NewErrorCtx(eO, ctx)
e := NewErrorCtx(eO, "%s", ctx)

if e.Error() != ErrWrongGateway.Error() {
t.Fatalf("%v and %v are supposed to be identical", e, ErrWrongGateway)
Expand Down
6 changes: 3 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8130,11 +8130,11 @@ func (fs *fileStore) setSyncTimer() {
if fs.syncTmr != nil {
fs.syncTmr.Reset(fs.fcfg.SyncInterval)
} else {
// First time this fires will be any time up to the fs.fcfg.SyncInterval,
// First time this fires will be between SyncInterval/2 and SyncInterval,
// so that different stores are spread out, rather than having many of
// them trying to all sync at once, causing blips and contending dios.
start := time.Duration(mrand.Int63n(int64(fs.fcfg.SyncInterval)))
fs.syncTmr = time.AfterFunc(min(start, time.Second), fs.syncBlocks)
start := (fs.fcfg.SyncInterval / 2) + (time.Duration(mrand.Int63n(int64(fs.fcfg.SyncInterval / 2))))
fs.syncTmr = time.AfterFunc(start, fs.syncBlocks)
}
}

Expand Down
8 changes: 8 additions & 0 deletions server/jetstream_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,9 @@ func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Ser
shutdown = func() {
s.Shutdown()
}
s.optsMu.Lock()
s.opts.SyncInterval = 5 * time.Minute
s.optsMu.Unlock()
} else {
c = createJetStreamClusterExplicit(b, "BENCH_PUB", clusterSize)
c.waitOnClusterReadyWithNumPeers(clusterSize)
Expand All @@ -1768,6 +1771,11 @@ func startJSClusterAndConnect(b *testing.B, clusterSize int) (c *cluster, s *Ser
shutdown = func() {
c.shutdown()
}
for _, s := range c.servers {
s.optsMu.Lock()
s.opts.SyncInterval = 5 * time.Minute
s.optsMu.Unlock()
}
}

nc, err = nats.Connect(s.ClientURL())
Expand Down
28 changes: 18 additions & 10 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ type streamAssignment struct {
Reply string `json:"reply"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
consumers map[string]*consumerAssignment
responded bool
recovering bool
err error
consumers map[string]*consumerAssignment
responded bool
recovering bool
reassigning bool // i.e. due to placement issues, lack of resources, etc.
err error
}

// consumerAssignment is what the meta controller uses to assign consumers to streams.
Expand Down Expand Up @@ -4917,7 +4918,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
doSnapshot(true)
}
} else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
ne, nb := n.Applied(ce.Index)
var ne, nb uint64
// We can't guarantee writes are flushed while we're shutting down. Just rely on replay during recovery.
if !js.isShuttingDown() {
ne, nb = n.Applied(ce.Index)
}
ce.ReturnToPool()
// If we have at least min entries to compact, go ahead and snapshot/compact.
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
Expand Down Expand Up @@ -5449,8 +5454,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
// then we will do the proper thing. Otherwise will be a no-op.
cc.removeInflightProposal(result.Account, result.Stream)

// FIXME(dlc) - suppress duplicates?
if sa := js.streamAssignment(result.Account, result.Stream); sa != nil {
if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && !sa.reassigning {
canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second

// See if we should retry in case this cluster is full but there are others.
Expand All @@ -5476,6 +5480,10 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
// Propose new.
sa.Group, sa.err = rg, nil
cc.meta.Propose(encodeAddStreamAssignment(sa))
// When the new stream assignment is processed, sa.reassigning will be
// automatically set back to false. Until then, don't process any more
// assignment results.
sa.reassigning = true
return
}
}
Expand Down Expand Up @@ -7948,7 +7956,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Check msgSize if we have a limit set there. Again this works if it goes through but better to be pre-emptive.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
err := fmt.Errorf("JetStream message size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name)
s.RateLimitWarnf(err.Error())
s.RateLimitWarnf("%s", err.Error())
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSStreamMessageExceedsMaximumError()
Expand All @@ -7965,7 +7973,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Again this works if it goes through but better to be pre-emptive.
if len(hdr) > math.MaxUint16 {
err := fmt.Errorf("JetStream header size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name)
s.RateLimitWarnf(err.Error())
s.RateLimitWarnf("%s", err.Error())
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSStreamHeaderExceedsMaximumError()
Expand Down Expand Up @@ -8097,7 +8105,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
if mset.clseq-(lseq+mset.clfs) > streamLagWarnThreshold {
lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name)
s.RateLimitWarnf(lerr.Error())
s.RateLimitWarnf("%s", lerr.Error())
}
mset.clMu.Unlock()

Expand Down
Loading