diff --git a/go.mod b/go.mod index c5c0591e877..4d3c5749b1d 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/nats-io/nats-server/v2 go 1.21.0 require ( - github.com/goccy/go-json v0.10.3 github.com/klauspost/compress v1.17.11 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.5.8 diff --git a/go.sum b/go.sum index 92c085a8471..55132e6a25e 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= -github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= diff --git a/server/accounts_test.go b/server/accounts_test.go index 8f593cd4a0a..92778c66629 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,6 +15,7 @@ package server import ( "encoding/base64" + "encoding/json" "fmt" "net/http" "strconv" @@ -24,7 +25,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/auth_callout_test.go b/server/auth_callout_test.go index b4f39045468..29b7d83b049 100644 --- a/server/auth_callout_test.go +++ b/server/auth_callout_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package server import ( "bytes" "crypto/x509" + "encoding/json" "encoding/pem" "errors" "fmt" @@ -28,7 +29,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/auth_test.go b/server/auth_test.go index 4ec9030dc81..05c3402f7d6 100644 --- a/server/auth_test.go +++ b/server/auth_test.go @@ -15,6 +15,7 @@ package server import ( "context" + "encoding/json" "fmt" "net" "net/url" @@ -24,7 +25,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) diff --git a/server/certidp/certidp.go b/server/certidp/certidp.go index e92aebabf8e..a26618577be 100644 --- a/server/certidp/certidp.go +++ b/server/certidp/certidp.go @@ -17,12 +17,12 @@ import ( "crypto/sha256" "crypto/x509" "encoding/base64" + "encoding/json" "fmt" "net/url" "strings" "time" - "github.com/goccy/go-json" "golang.org/x/crypto/ocsp" ) diff --git a/server/client.go b/server/client.go index c4e6ba27b47..d87d122110c 100644 --- a/server/client.go +++ b/server/client.go @@ -17,6 +17,7 @@ import ( "bytes" "crypto/tls" "crypto/x509" + "encoding/json" "errors" "fmt" "io" @@ -32,7 +33,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/internal/fastrand" diff --git a/server/client_test.go b/server/client_test.go index fa03bd48073..aa5daf2a71f 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,7 @@ package server import ( "bufio" "bytes" - "crypto/tls" + "encoding/json" "fmt" "io" "math" @@ -31,7 +31,8 @@ import ( "testing" "time" - "github.com/goccy/go-json" + "crypto/tls" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/consumer.go b/server/consumer.go index c569f636616..a91c1925441 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -16,6 +16,7 @@ package server import ( "bytes" "encoding/binary" + "encoding/json" "errors" "fmt" "math/rand" @@ -27,7 +28,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nuid" "golang.org/x/time/rate" diff --git a/server/events.go b/server/events.go index 16d4e819227..1928e64d741 100644 --- a/server/events.go +++ b/server/events.go @@ -18,6 +18,7 @@ import ( "compress/gzip" "crypto/sha256" "crypto/x509" + "encoding/json" "errors" "fmt" "math/rand" @@ -29,8 +30,8 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/certidp" "github.com/nats-io/nats-server/v2/server/pse" diff --git a/server/events_test.go b/server/events_test.go index ed87bfe4e02..6545db66753 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -16,6 +16,7 @@ package server import ( "bytes" "crypto/sha256" + "encoding/json" "errors" "fmt" "math/rand" @@ -29,7 +30,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/filestore.go b/server/filestore.go index 1e6748b5ba2..bcc4b5d1061 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -22,6 +22,7 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" + "encoding/json" "errors" "fmt" "hash" @@ -39,7 +40,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" "github.com/nats-io/nats-server/v2/server/avl" @@ -8157,8 +8157,10 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { defer close(done) // Make sure we do not try to write these out too fast. + // Spread these out for large numbers on a server restart. const writeThreshold = 2 * time.Minute - t := time.NewTicker(writeThreshold) + writeJitter := time.Duration(mrand.Int63n(int64(30 * time.Second))) + t := time.NewTicker(writeThreshold + writeJitter) defer t.Stop() for { diff --git a/server/filestore_test.go b/server/filestore_test.go index cf0061410c2..1be968f8b11 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -24,6 +24,7 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" + "encoding/json" "errors" "fmt" "io" @@ -38,7 +39,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/nuid" ) @@ -1483,6 +1483,8 @@ func TestFileStoreMeta(t *testing.T) { if err := json.Unmarshal(buf, &oconfig2); err != nil { t.Fatalf("Error unmarshalling: %v", err) } + // Since we set name we will get that back now. + oconfig.Name = oname if !reflect.DeepEqual(oconfig2, oconfig) { t.Fatalf("Consumer configs not equal, got %+v vs %+v", oconfig2, oconfig) } diff --git a/server/gateway.go b/server/gateway.go index 0c265c6dc95..46dd7260ec7 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -18,6 +18,7 @@ import ( "cmp" "crypto/sha256" "crypto/tls" + "encoding/json" "fmt" "math/rand" "net" @@ -27,8 +28,6 @@ import ( "sync" "sync/atomic" "time" - - "github.com/goccy/go-json" ) const ( diff --git a/server/gateway_test.go b/server/gateway_test.go index 665c9684520..b397bfedc10 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "fmt" "net" "net/url" @@ -30,12 +31,10 @@ import ( "testing" "time" - "github.com/goccy/go-json" + . "github.com/nats-io/nats-server/v2/internal/ocsp" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats.go" "golang.org/x/crypto/ocsp" - - . "github.com/nats-io/nats-server/v2/internal/ocsp" ) func init() { diff --git a/server/jetstream.go b/server/jetstream.go index 6e43842ce4d..02920e76a4b 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -18,6 +18,7 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" + "encoding/json" "fmt" "math" "os" @@ -29,7 +30,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/minio/highwayhash" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nkeys" diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d60c93283ec..88c06730b6a 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2024 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package server import ( "bytes" "cmp" + "encoding/json" "errors" "fmt" "math/rand" @@ -29,7 +30,6 @@ import ( "time" "unicode" - "github.com/goccy/go-json" "github.com/nats-io/nuid" ) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c5d67078a2b..ebcd29c8abb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -18,6 +18,7 @@ import ( "cmp" crand "crypto/rand" "encoding/binary" + "encoding/json" "errors" "fmt" "math" @@ -31,7 +32,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" "github.com/nats-io/nuid" @@ -9099,20 +9099,26 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ { var sm *StoreMsg var err error - // Is we should use load next do so here. + // If we should use load next do so here. if useLoadNext { var nseq uint64 sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv) if err == nil && nseq > seq { + // If we jumped over the requested last sequence, clamp it down. + // Otherwise, we would send too much to the follower. + if nseq > last { + nseq = last + sm = nil + } dr.First, dr.Num = seq, nseq-seq // Jump ahead seq = nseq } else if err == ErrStoreEOF { - dr.First, dr.Num = seq, state.LastSeq-seq + dr.First, dr.Num = seq, last-seq // Clear EOF here for normal processing. err = nil // Jump ahead - seq = state.LastSeq + seq = last } } else { sm, err = mset.store.LoadMsg(seq, &smv) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 562bbfa5915..79dcc61be8d 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -20,6 +20,7 @@ import ( "bytes" "context" crand "crypto/rand" + "encoding/json" "errors" "fmt" "math/rand" @@ -32,7 +33,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) @@ -6667,7 +6667,7 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { // Make sure if we received acks that are out of bounds, meaning past our // last sequence or before our first that they are ignored and errored if applicable. -func TestJetStreamConsumerAckOutOfBounds(t *testing.T) { +func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -6722,6 +6722,96 @@ func TestJetStreamConsumerAckOutOfBounds(t *testing.T) { require_Equal(t, ci.AckFloor.Stream, 1) } +func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) { + tests := []struct { + title string + catchupRequest *streamSyncRequest + setup func(js nats.JetStreamContext) + assert func(sub *nats.Subscription) + }{ + { + title: "within-delete-gap", + setup: func(js nats.JetStreamContext) {}, + }, + { + title: "EOF", + setup: func(js nats.JetStreamContext) { + err := js.DeleteMsg("TEST", 100) + require_NoError(t, err) + }, + }, + } + + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Starts and ends with subject "foo", we'll purge so there's a large gap of deletes in the middle. + // This should force runCatchup to use LoadNextMsg instead of LoadMsg. + for i := 0; i < 100; i++ { + subject := "bar" + if i == 0 || i == 99 { + subject = "foo" + } + _, err = js.Publish(subject, nil) + require_NoError(t, err) + } + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "bar"}) + require_NoError(t, err) + + // Optionally run some extra setup. + test.setup(js) + + // Reconnect to stream leader. + l := c.streamLeader(globalAccountName, "TEST") + nc.Close() + nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + // Setup wiretap and grab stream. + sendSubject := "test-wiretap" + sub, err := nc.SubscribeSync(sendSubject) + require_NoError(t, err) + err = nc.Flush() // Must flush, otherwise our subscription could be too late. + require_NoError(t, err) + acc, err := l.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + // Run custom catchup request and the test's asserts. + sreq := &streamSyncRequest{Peer: "peer", FirstSeq: 5, LastSeq: 5, DeleteRangesOk: true} + require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq) })) + + // Our first message should be a skip msg. + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, entryOp(msg.Data[0]), streamMsgOp) + subj, _, _, _, seq, ts, err := decodeStreamMsg(msg.Data[1:]) + require_NoError(t, err) + require_Equal(t, seq, 5) + require_Equal(t, subj, _EMPTY_) + require_Equal(t, ts, 0) + + // And end with EOF. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Len(t, len(msg.Data), 0) + }) + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 7bb5db1bea0..5807786e625 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -22,6 +22,7 @@ import ( crand "crypto/rand" "encoding/binary" "encoding/hex" + "encoding/json" "errors" "fmt" "math/rand" @@ -36,7 +37,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats.go" ) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index f2010b4180e..a5b32c0e7b8 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -19,6 +19,7 @@ package server import ( "bytes" "context" + "encoding/json" "errors" "fmt" "math/rand" @@ -32,7 +33,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 0e66f3b987c..f0145d21ac8 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -18,6 +18,7 @@ package server import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -35,7 +36,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" ) diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 5d40c0a621b..8e62dbb12a0 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,6 +17,7 @@ package server import ( + "encoding/json" "errors" "fmt" "math/rand" @@ -28,7 +29,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" ) diff --git a/server/jetstream_events.go b/server/jetstream_events.go index adb2a396e3d..8302fcc4048 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -1,4 +1,4 @@ -// Copyright 2020-2024 The NATS Authors +// Copyright 2020-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,9 +14,8 @@ package server import ( + "encoding/json" "time" - - "github.com/goccy/go-json" ) func (s *Server) publishAdvisory(acc *Account, subject string, adv any) { diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index f5cbe05b5e7..eb21057a04c 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -18,6 +18,7 @@ package server import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -32,7 +33,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats.go" "golang.org/x/time/rate" ) diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index aca5e63a88a..f8f6466f69e 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2024 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,6 +17,7 @@ package server import ( + "encoding/json" "errors" "fmt" "net/http" @@ -26,11 +27,9 @@ import ( "testing" "time" - "github.com/goccy/go-json" + jwt "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" - - jwt "github.com/nats-io/jwt/v2" ) func TestJetStreamJWTLimits(t *testing.T) { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 84b74688808..80fd87a6202 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -17,6 +17,7 @@ package server import ( + "encoding/json" "errors" "fmt" "math/rand" @@ -29,7 +30,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 1a91c4d38e6..78cd23d6cbb 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21,6 +21,7 @@ import ( "context" crand "crypto/rand" "encoding/base64" + "encoding/json" "errors" "fmt" "io" @@ -41,7 +42,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nats.go" diff --git a/server/jwt_test.go b/server/jwt_test.go index c6c9f1d1446..f4235a2b973 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,6 +17,7 @@ import ( "bufio" "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io" @@ -30,7 +31,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/leafnode.go b/server/leafnode.go index bf69b0a3f04..26a3f6ec3d1 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -18,6 +18,7 @@ import ( "bytes" "crypto/tls" "encoding/base64" + "encoding/json" "fmt" "math/rand" "net" @@ -35,7 +36,6 @@ import ( "time" "unicode" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" diff --git a/server/monitor.go b/server/monitor.go index 86aeab00b96..2bd25f9a7be 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -20,6 +20,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/hex" + "encoding/json" "expvar" "fmt" "net" @@ -36,7 +37,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/pse" ) diff --git a/server/monitor_test.go b/server/monitor_test.go index f7c214c3609..7003de12316 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2024 The NATS Authors +// Copyright 2013-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package server import ( "bytes" "crypto/tls" + "encoding/json" "errors" "fmt" "io" @@ -33,7 +34,6 @@ import ( "time" "unicode" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/mqtt.go b/server/mqtt.go index 0cad975bb18..35c18ba154d 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1,4 +1,4 @@ -// Copyright 2020-2024 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -18,6 +18,7 @@ import ( "cmp" "crypto/tls" "encoding/binary" + "encoding/json" "errors" "fmt" "io" @@ -30,7 +31,6 @@ import ( "time" "unicode/utf8" - "github.com/goccy/go-json" "github.com/nats-io/nuid" ) diff --git a/server/mqtt_ex_test_test.go b/server/mqtt_ex_test_test.go index 4280b714f99..9acad558779 100644 --- a/server/mqtt_ex_test_test.go +++ b/server/mqtt_ex_test_test.go @@ -18,6 +18,7 @@ package server import ( "bytes" + "encoding/json" "fmt" "io" "os" @@ -26,7 +27,6 @@ import ( "strings" "testing" - "github.com/goccy/go-json" "github.com/nats-io/nuid" ) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 84bedafbc1e..dc7d9651da3 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2024 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -20,6 +20,7 @@ import ( "bufio" "bytes" "crypto/tls" + "encoding/json" "errors" "fmt" "io" @@ -32,7 +33,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" @@ -7080,8 +7080,6 @@ func TestMQTTSubjectMappingWithImportExport(t *testing.T) { func TestMQTTSubRetainedRace(t *testing.T) { o := testMQTTDefaultOptions() - s := testMQTTRunServer(t, o) - defer testMQTTShutdownServer(s) useCases := []struct { name string @@ -7099,6 +7097,9 @@ func TestMQTTSubRetainedRace(t *testing.T) { t.Run(subTopic, func(t *testing.T) { for _, qos := range QOS { t.Run(fmt.Sprintf("QOS%d", qos), func(t *testing.T) { + s := testMQTTRunServer(t, o) + defer testMQTTShutdownServer(s) + tc.f(t, o, subTopic, pubTopic, qos) }) } diff --git a/server/nkey_test.go b/server/nkey_test.go index 174f7067100..f593a2f7548 100644 --- a/server/nkey_test.go +++ b/server/nkey_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,16 +15,15 @@ package server import ( "bufio" + crand "crypto/rand" "encoding/base64" + "encoding/json" "fmt" + mrand "math/rand" "strings" "testing" "time" - crand "crypto/rand" - mrand "math/rand" - - "github.com/goccy/go-json" "github.com/nats-io/nkeys" ) diff --git a/server/norace_test.go b/server/norace_test.go index 2d4e94e612f..3a67cd9b802 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -22,6 +22,7 @@ import ( "compress/gzip" "context" "encoding/binary" + "encoding/json" "errors" "fmt" "io" @@ -47,7 +48,6 @@ import ( crand "crypto/rand" "crypto/sha256" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/avl" diff --git a/server/ocsp_responsecache.go b/server/ocsp_responsecache.go index b62166f25d4..455fdd3a270 100644 --- a/server/ocsp_responsecache.go +++ b/server/ocsp_responsecache.go @@ -1,4 +1,4 @@ -// Copyright 2023-2024 The NATS Authors +// Copyright 2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,6 +15,7 @@ package server import ( "bytes" + "encoding/json" "errors" "fmt" "io" @@ -26,7 +27,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "golang.org/x/crypto/ocsp" diff --git a/server/opts_test.go b/server/opts_test.go index 443ce1f67a5..0755c7d02dd 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package server import ( "bytes" "crypto/tls" + "encoding/json" "flag" "fmt" "net/url" @@ -28,7 +29,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/reload_test.go b/server/reload_test.go index 37de82601b8..7a4c9d3281c 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1,4 +1,4 @@ -// Copyright 2017-2024 The NATS Authors +// Copyright 2017-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,6 +17,7 @@ import ( "bytes" "crypto/tls" "encoding/base64" + "encoding/json" "flag" "fmt" "io" @@ -34,7 +35,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/route.go b/server/route.go index b0f56681748..0c455547c98 100644 --- a/server/route.go +++ b/server/route.go @@ -16,6 +16,7 @@ package server import ( "bytes" "crypto/tls" + "encoding/json" "fmt" "math/rand" "net" @@ -27,7 +28,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" ) diff --git a/server/routes_test.go b/server/routes_test.go index 0d7b24d1951..6696fb7b7bd 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2024 The NATS Authors +// Copyright 2013-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -18,6 +18,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "fmt" "math/rand" "net" @@ -34,7 +35,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/server.go b/server/server.go index 7f39e67110e..81013d1e1b9 100644 --- a/server/server.go +++ b/server/server.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "errors" "flag" "fmt" @@ -27,28 +28,28 @@ import ( "net" "net/http" "net/url" + "regexp" + "runtime/pprof" + "unicode" + + // Allow dynamic profiling. + _ "net/http/pprof" "os" "path" "path/filepath" - "regexp" "runtime" - "runtime/pprof" "strconv" "strings" "sync" "sync/atomic" "time" - "unicode" - - // Allow dynamic profiling. - _ "net/http/pprof" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" - "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" + + "github.com/nats-io/nats-server/v2/logger" ) const ( diff --git a/server/server_test.go b/server/server_test.go index 23a752724e1..03cdc71baf3 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "errors" "flag" "fmt" @@ -34,7 +35,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats.go" ) diff --git a/server/stream.go b/server/stream.go index 0ddf9bb448a..a3a7c8fdc7e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -17,6 +17,7 @@ import ( "archive/tar" "bytes" "encoding/binary" + "encoding/json" "errors" "fmt" "io" @@ -31,7 +32,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/nuid" ) @@ -3574,6 +3574,15 @@ func (mset *stream) startingSequenceForSources() { } }() + update := func(iName string, seq uint64) { + // Only update active in case we have older ones in here that got configured out. + if si := mset.sources[iName]; si != nil { + if _, ok := seqs[iName]; !ok { + seqs[iName] = seq + } + } + } + var smv StoreMsg for seq := state.LastSeq; seq >= state.FirstSeq; seq-- { sm, err := mset.store.LoadMsg(seq, &smv) @@ -3585,15 +3594,6 @@ func (mset *stream) startingSequenceForSources() { continue } - var update = func(iName string, seq uint64) { - // Only update active in case we have older ones in here that got configured out. - if si := mset.sources[iName]; si != nil { - if _, ok := seqs[iName]; !ok { - seqs[iName] = seq - } - } - } - streamName, iName, sseq := streamAndSeq(string(ss)) if iName == _EMPTY_ { // Pre-2.10 message header means it's a match for any source using that stream name for _, ssi := range mset.cfg.Sources { @@ -3675,12 +3675,17 @@ func (mset *stream) subscribeToStream() error { } else if len(mset.cfg.Sources) > 0 && mset.sourcesConsumerSetup == nil { // Setup the initial source infos for the sources mset.resetSourceInfo() - // Delay the actual source consumer(s) creation(s) for after a delay - mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(500*time.Millisecond)))+100*time.Millisecond, func() { - mset.mu.Lock() + // Delay the actual source consumer(s) creation(s) for after a delay if a replicated stream. + // If it's an R1, this is done at startup and we will do inline. + if mset.cfg.Replicas == 1 { mset.setupSourceConsumers() - mset.mu.Unlock() - }) + } else { + mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(500*time.Millisecond)))+100*time.Millisecond, func() { + mset.mu.Lock() + mset.setupSourceConsumers() + mset.mu.Unlock() + }) + } } // Check for direct get access. // We spin up followers for clustered streams in monitorStream(). diff --git a/server/util.go b/server/util.go index 243dcb42b41..aea3dcf17e2 100644 --- a/server/util.go +++ b/server/util.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package server import ( "bytes" "context" + "encoding/json" "errors" "fmt" "math" @@ -26,8 +27,6 @@ import ( "strconv" "strings" "time" - - "github.com/goccy/go-json" ) // This map is used to store URLs string as the key with a reference count as diff --git a/server/websocket_test.go b/server/websocket_test.go index 31d3cd28026..368a9fe8d6f 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "encoding/base64" "encoding/binary" + "encoding/json" "errors" "fmt" "io" @@ -33,11 +34,11 @@ import ( "testing" "time" - "github.com/goccy/go-json" - "github.com/klauspost/compress/flate" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" + + "github.com/klauspost/compress/flate" ) type testReader struct { diff --git a/test/auth_test.go b/test/auth_test.go index 4d634a51c24..232ae717769 100644 --- a/test/auth_test.go +++ b/test/auth_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,12 +14,12 @@ package test import ( + "encoding/json" "fmt" "net" "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/gateway_test.go b/test/gateway_test.go index bbaad336089..49e7274c6c7 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,6 +17,7 @@ import ( "bufio" "bytes" "crypto/tls" + "encoding/json" "fmt" "net" "net/url" @@ -24,7 +25,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) diff --git a/test/leafnode_test.go b/test/leafnode_test.go index a6103882c27..a9451732818 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -17,6 +17,7 @@ import ( "bytes" "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "math/rand" "net" @@ -29,7 +30,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats-server/v2/server" diff --git a/test/monitor_test.go b/test/monitor_test.go index 661d8e475d7..91d2f4f07db 100644 --- a/test/monitor_test.go +++ b/test/monitor_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package test import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "io" "net" @@ -27,7 +28,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 01155cf1814..5810fe4eaf7 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,12 +14,12 @@ package test import ( + "encoding/json" "fmt" "net" "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" diff --git a/test/norace_test.go b/test/norace_test.go index d78766efe87..eb5252e2a63 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -19,6 +19,7 @@ package test import ( "context" crand "crypto/rand" + "encoding/json" "fmt" "net" "net/url" @@ -30,7 +31,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" diff --git a/test/ocsp_peer_test.go b/test/ocsp_peer_test.go index e97fa9a7e61..1efb3f4f35e 100644 --- a/test/ocsp_peer_test.go +++ b/test/ocsp_peer_test.go @@ -17,6 +17,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/json" "errors" "fmt" "io" @@ -26,7 +27,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" . "github.com/nats-io/nats-server/v2/internal/ocsp" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" @@ -3013,12 +3013,12 @@ func TestOCSPMonitoringPort(t *testing.T) { net: 127.0.0.1 port: -1 https: -1 - ocsp { + ocsp { mode = always url = http://127.0.0.1:18888 } store_dir = %s - + tls: { cert_file: "configs/certs/ocsp_peer/mini-ca/server1/TestServer1_bundle.pem" key_file: "configs/certs/ocsp_peer/mini-ca/server1/private/TestServer1_keypair.pem" diff --git a/test/ports_test.go b/test/ports_test.go index 2959c00af29..68b05bbf4c7 100644 --- a/test/ports_test.go +++ b/test/ports_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,6 +14,7 @@ package test import ( + "encoding/json" "errors" "fmt" "os" @@ -22,7 +23,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/proto_test.go b/test/proto_test.go index 1ab2df6e538..a03a4446ebc 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,10 +14,10 @@ package test import ( + "encoding/json" "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/route_discovery_test.go b/test/route_discovery_test.go index 9c77790e431..3c7a87da259 100644 --- a/test/route_discovery_test.go +++ b/test/route_discovery_test.go @@ -1,4 +1,4 @@ -// Copyright 2015-2024 The NATS Authors +// Copyright 2015-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,6 +15,7 @@ package test import ( "bufio" + "encoding/json" "fmt" "io" "net" @@ -25,7 +26,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/routes_test.go b/test/routes_test.go index 76180a63ffd..16254f06c83 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,6 +14,7 @@ package test import ( + "encoding/json" "fmt" "io" "net" @@ -24,7 +25,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/internal/testhelper" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 6c7e167a7f8..ca9c35ede67 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2024 The NATS Authors +// Copyright 2019-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,6 +14,7 @@ package test import ( + "encoding/json" "fmt" "math/rand" "net/http" @@ -25,7 +26,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" diff --git a/test/test.go b/test/test.go index 5404bee830b..072b8ca95cd 100644 --- a/test/test.go +++ b/test/test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package test import ( "crypto/rand" "encoding/hex" + "encoding/json" "fmt" "io" "net" @@ -27,7 +28,6 @@ import ( "testing" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" )