From a1e2de7348b62cf852b1bdc0b01a6bb0cb4a6d66 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 27 Jun 2024 10:04:24 -0700 Subject: [PATCH] Moved to jetstream_cluster_4_test.go and simplified --- server/jetstream_cluster_4_test.go | 117 ++++++++++++++ server/mqtt_ex_leak_investigate_test.go | 205 ------------------------ server/mqtt_ex_test_test.go | 15 -- server/mqtt_test.go | 51 +++--- 4 files changed, 136 insertions(+), 252 deletions(-) delete mode 100644 server/mqtt_ex_leak_investigate_test.go diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 8407187e99..6add85e48a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -24,6 +24,7 @@ import ( "math/rand" "os" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -2329,3 +2330,119 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) { } } } + +// https://github.com/nats-io/nats-server/pull/5600 +func TestJetStreamClusterConsumerLeak(t *testing.T) { + N := 2000 // runs in under 10s, but significant enough to see the difference. + NConcurrent := 100 + + clusterConf := ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + leafnodes { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + + cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3) + defer cl.shutdown() + cl.waitOnLeader() + + s := cl.randomNonLeader() + + // Create the test stream. + streamName := "LEAK_TEST_STREAM" + nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p")) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{"$SOMETHING.>"}, + Storage: nats.FileStorage, + Retention: nats.InterestPolicy, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + concurrent := make(chan struct{}, NConcurrent) + for i := 0; i < NConcurrent; i++ { + concurrent <- struct{}{} + } + errors := make(chan error, N) + + wg := sync.WaitGroup{} + wg.Add(N) + + // Gather the stats for comparison. + before := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(before) + + for i := 0; i < N; { + // wait for a slot to open up + <-concurrent + i++ + go func() { + defer func() { + concurrent <- struct{}{} + wg.Done() + }() + + nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p")) + defer nc.Close() + + consumerName := "sessid_" + nuid.Next() + _, err := js.AddConsumer(streamName, &nats.ConsumerConfig{ + DeliverSubject: "inbox", + Durable: consumerName, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverNewPolicy, + FilterSubject: "$SOMETHING.ELSE.subject", + AckWait: 30 * time.Second, + MaxAckPending: 1024, + }) + if err != nil { + errors <- fmt.Errorf("Error on JetStream consumer creation: %v", err) + return + } + + err = js.DeleteConsumer(streamName, consumerName) + if err != nil { + errors <- fmt.Errorf("Error on JetStream consumer deletion: %v", err) + } + }() + } + + wg.Wait() + if len(errors) > 0 { + for err := range errors { + t.Fatalf("%v", err) + } + } + + after := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(after) + + // Before https://github.com/nats-io/nats-server/pull/5600 this test was + // adding 180Mb+ to HeapInuse. Now it's under 40Mb (ran locally on a Mac) + limit := before.HeapInuse + 100*1024*1024 // 100MB + if after.HeapInuse > before.HeapInuse+limit { + t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse) + } +} diff --git a/server/mqtt_ex_leak_investigate_test.go b/server/mqtt_ex_leak_investigate_test.go deleted file mode 100644 index c05091f538..0000000000 --- a/server/mqtt_ex_leak_investigate_test.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2024 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !skip_mqtt_tests -// +build !skip_mqtt_tests - -package server - -import ( - "os" - "runtime" - "runtime/pprof" - "sync" - "testing" - "time" - - "github.com/nats-io/nats.go" - "github.com/nats-io/nuid" -) - -func TestJetstreamConsumerLeak(t *testing.T) { - - QOS := byte(2) - NSubscribers := 1000 - NConcurrentSubscribers := 100 - - clusterConf := ` - listen: 127.0.0.1:-1 - - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} - - leafnodes { - listen: 127.0.0.1:-1 - } - - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - - accounts { - ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } - $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } - } -` - cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3) - defer cl.shutdown() - - cl.waitOnLeader() - - s := cl.randomNonLeader() - testMQTTInitializeStreams(t, s) - - // Write the memory profile before starting the test - w, _ := os.Create("before.pprof") - pprof.WriteHeapProfile(w) - w.Close() - - before := &runtime.MemStats{} - runtime.GC() - runtime.ReadMemStats(before) - - testMQTTConnSubReceiveDiscConcurrent(t, s, QOS, NSubscribers, NConcurrentSubscribers, testMQTTConnSubDiscJS) - - // Sleep for a few seconds to see if some timers kick in and help cleanup? - time.Sleep(10 * time.Second) - - runtime.GC() - w, _ = os.Create("after.pprof") - pprof.WriteHeapProfile(w) - w.Close() - - after := &runtime.MemStats{} - runtime.GC() - runtime.ReadMemStats(after) - - limit := before.HeapInuse + 100*1024*1024 // 100MB - if after.HeapInuse > limit { - t.Fatalf("Memory usage too high: %v", after.HeapInuse) - } -} - -func testMQTTInitializeStreams(t *testing.T, server *Server) { - nc, js := jsClientConnect(t, server, nats.UserInfo("one", "p")) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: mqttStreamName, - Subjects: []string{mqttStreamSubjectPrefix + ">"}, - Storage: nats.FileStorage, - Retention: nats.InterestPolicy, - Replicas: 3, - }) - if err != nil { - t.Fatalf("Error creating stream: %v", err) - } - _, err = js.AddStream(&nats.StreamConfig{ - Name: mqttOutStreamName, - Subjects: []string{mqttOutSubjectPrefix + ">"}, - Storage: nats.FileStorage, - Retention: nats.InterestPolicy, - Replicas: 3, - }) - if err != nil { - t.Fatalf("Error creating stream: %v", err) - } -} - -func testMQTTConnSubDiscJS(t *testing.T, server *Server, QOS byte, iSub int) { - nc, js := jsClientConnect(t, server, nats.UserInfo("one", "p")) - defer nc.Close() - - // make sure the MQTT streams are accessible to us - _, err := js.StreamInfo(mqttStreamName) - if err != nil { - t.Fatalf("Error on JetStream stream info: %v", err) - } - _, err = js.StreamInfo(mqttOutStreamName) - if err != nil { - t.Fatalf("Error on JetStream stream info: %v", err) - } - - start := time.Now() - pubrelConsumerName := mqttPubRelConsumerDurablePrefix + nuid.Next() - _, err = js.AddConsumer(mqttOutStreamName, &nats.ConsumerConfig{ - DeliverSubject: "pubrel-delivery_" + nuid.Next(), - Durable: pubrelConsumerName, - AckPolicy: nats.AckExplicitPolicy, - DeliverPolicy: nats.DeliverNewPolicy, - FilterSubject: mqttPubRelSubjectPrefix + nuid.Next(), - AckWait: mqttDefaultAckWait, - MaxAckPending: mqttDefaultMaxAckPending, - MemoryStorage: false, - }) - if err != nil { - t.Fatalf("Error on JetStream consumer creation: %v", err) - } - - subConsumerName := "sessid_" + nuid.Next() - _, err = js.AddConsumer(mqttStreamName, &nats.ConsumerConfig{ - DeliverSubject: "inbox", - Durable: subConsumerName, - AckPolicy: nats.AckExplicitPolicy, - DeliverPolicy: nats.DeliverNewPolicy, - FilterSubject: mqttStreamSubjectPrefix + "subject", - AckWait: mqttDefaultAckWait, - MaxAckPending: mqttDefaultMaxAckPending, - MemoryStorage: false, - }) - if err != nil { - t.Fatalf("Error on JetStream consumer creation: %v", err) - } - t.Logf("<>/<> SUB %v: Now %v, created 2 consumers", iSub, time.Since(start)) - - err = js.DeleteConsumer(mqttOutStreamName, pubrelConsumerName) - if err != nil { - t.Fatalf("Error on JetStream consumer deletion: %v", err) - } - err = js.DeleteConsumer(mqttStreamName, subConsumerName) - if err != nil { - t.Fatalf("Error on JetStream consumer deletion: %v", err) - } - t.Logf("SUB %v: Now %v, deleted 2 consumers", iSub, time.Since(start)) -} - -func testMQTTConnSubReceiveDiscConcurrent( - t *testing.T, server *Server, QOS byte, NSubscribers int, NConcurrentSubscribers int, - subf func(t *testing.T, server *Server, QOS byte, n int), -) { - ConcurrentSubscribers := make(chan struct{}, NConcurrentSubscribers) - for i := 0; i < NConcurrentSubscribers; i++ { - ConcurrentSubscribers <- struct{}{} - } - - wg := sync.WaitGroup{} - wg.Add(NSubscribers) - // Start concurrent subscribers. Each will receive 1 to 3 messages, then quit. - go func() { - for iSub := 0; iSub < NSubscribers; { - // wait for a slot to open up - <-ConcurrentSubscribers - iSub++ - go func(c int) { - defer func() { - ConcurrentSubscribers <- struct{}{} - wg.Done() - }() - subf(t, server, QOS, c) - }(iSub) - } - }() - wg.Wait() -} diff --git a/server/mqtt_ex_test_test.go b/server/mqtt_ex_test_test.go index 44a4135581..9acad55877 100644 --- a/server/mqtt_ex_test_test.go +++ b/server/mqtt_ex_test_test.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io" - "net" "os" "os/exec" "strconv" @@ -38,7 +37,6 @@ type mqttTarget struct { clusters []*cluster configs []mqttTestConfig all []mqttDial - allNATS []string } type mqttTestConfig struct { @@ -196,16 +194,6 @@ func (d mqttDial) Get() (u, p, s, c string) { return u, p, s, c } -func (d mqttDial) GetHostPort() (host string, port int) { - _, _, s, _ := d.Get() - host, portS, err := net.SplitHostPort(s) - if err != nil { - return s, 0 - } - port, _ = strconv.Atoi(portS) - return host, port -} - func (d mqttDial) Name() string { _, _, _, c := d.Get() return c @@ -297,16 +285,13 @@ func mqttMakeTestCluster(size int, domain string) func(tb testing.TB) *mqttTarge cl.waitOnLeader() all := []mqttDial{} - allNATS := []string{} for _, s := range cl.servers { all = append(all, mqttNewDialForServer(s, "one", "p")) - allNATS = append(allNATS, string(mqttNewDial("one", "p", s.getOpts().Host, s.getOpts().Port, ""))) } return &mqttTarget{ clusters: []*cluster{cl}, all: all, - allNATS: allNATS, configs: []mqttTestConfig{ { name: "publish to one", diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 0ca5bb8f73..7a72136570 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -329,11 +329,8 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server { if err != nil { t.Fatalf("Error creating server: %v", err) } - // l := &DummyLogger{} - // s.SetLogger(l, true, true) - o.Debug = false - o.Trace = false - s.ConfigureLogger() + l := &DummyLogger{} + s.SetLogger(l, true, true) s.Start() if err := s.readyForConnections(3 * time.Second); err != nil { testMQTTShutdownServer(s) @@ -1910,7 +1907,23 @@ func TestMQTTParseSub(t *testing.T) { func testMQTTSub(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []*mqttFilter, expected []byte) { t.Helper() - testMQTTSubNoAck(t, pi, c, r, filters, expected) + w := newMQTTWriter(0) + pkLen := 2 // for pi + for i := 0; i < len(filters); i++ { + f := filters[i] + pkLen += 2 + len(f.filter) + 1 + } + w.WriteByte(mqttPacketSub | mqttSubscribeFlags) + w.WriteVarInt(pkLen) + w.WriteUint16(pi) + for i := 0; i < len(filters); i++ { + f := filters[i] + w.WriteBytes([]byte(f.filter)) + w.WriteByte(f.qos) + } + if _, err := testMQTTWrite(c, w.Bytes()); err != nil { + t.Fatalf("Error writing SUBSCRIBE protocol: %v", err) + } b, pl := testMQTTReadPacket(t, r) if pt := b & mqttPacketMask; pt != mqttPacketSubAck { t.Fatalf("Expected SUBACK packet %x, got %x", mqttPacketSubAck, pt) @@ -1932,27 +1945,6 @@ func testMQTTSub(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []* } } -func testMQTTSubNoAck(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []*mqttFilter, expected []byte) { - t.Helper() - w := newMQTTWriter(0) - pkLen := 2 // for pi - for i := 0; i < len(filters); i++ { - f := filters[i] - pkLen += 2 + len(f.filter) + 1 - } - w.WriteByte(mqttPacketSub | mqttSubscribeFlags) - w.WriteVarInt(pkLen) - w.WriteUint16(pi) - for i := 0; i < len(filters); i++ { - f := filters[i] - w.WriteBytes([]byte(f.filter)) - w.WriteByte(f.qos) - } - if _, err := testMQTTWrite(c, w.Bytes()); err != nil { - t.Fatalf("Error writing SUBSCRIBE protocol: %v", err) - } -} - func TestMQTTSubAck(t *testing.T) { o := testMQTTDefaultOptions() s := testMQTTRunServer(t, o) @@ -2106,11 +2098,6 @@ func testMQTTReadPubPacket(t testing.TB, r *mqttReader) (flags byte, pi uint16, if pt := b & mqttPacketMask; pt != mqttPacketPub { t.Fatalf("Expected PUBLISH packet %x, got %x", mqttPacketPub, pt) } - return testMQTTReadPubPacketEx(t, r, b, pl) -} - -func testMQTTReadPubPacketEx(t testing.TB, r *mqttReader, b byte, pl int) (flags byte, pi uint16, topic string, payload []byte) { - t.Helper() flags = b & mqttPacketFlagMask start := r.pos topic, err := r.readString("topic name")