diff --git a/server/consumer.go b/server/consumer.go index 61d6420bd37..e6b8395a7c5 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1637,6 +1637,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { return false } +const ( + defaultConsumerNotActiveStartInterval = 30 * time.Second + defaultConsumerNotActiveMaxInterval = 5 * time.Minute +) + +var ( + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval +) + func (o *consumer) deleteNotActive() { o.mu.Lock() if o.mset == nil { @@ -1702,12 +1712,9 @@ func (o *consumer) deleteNotActive() { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - const ( - startInterval = 30 * time.Second - maxInterval = 5 * time.Minute - ) - jitter := time.Duration(rand.Int63n(int64(startInterval))) - interval := startInterval + jitter + const () + jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) + interval := consumerNotActiveStartInterval + jitter ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { @@ -1722,7 +1729,7 @@ func (o *consumer) deleteNotActive() { if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) - if interval < maxInterval { + if interval < consumerNotActiveMaxInterval { interval *= 2 ticker.Reset(interval) } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 045aa5c1c79..001b114c9ee 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) { } func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { time.Sleep(2 * time.Second) // Restart first and wait so that we know it will try cleanup without a metaleader. + // It will fail as there's no metaleader at that time, it should keep retrying on an interval. c.restartServer(rs) time.Sleep(time.Second) @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { defer nc.Close() subj := fmt.Sprintf(JSApiConsumerListT, "TEST") - checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { - m, err := nc.Request(subj, nil, time.Second) + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + // Request will take at most 4 seconds if some consumers can't be found. + m, err := nc.Request(subj, nil, 5*time.Second) if err != nil { return err } diff --git a/server/norace_test.go b/server/norace_test.go index 5e01b6b9ed0..98d29cb2bc0 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) { } func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "GHOST", 3) defer c.shutdown() @@ -6670,22 +6675,17 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { time.Sleep(5 * time.Second) cancel() - getMissing := func() []string { - m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10) - require_NoError(t, err) - + checkFor(t, 30*time.Second, time.Second, func() error { + m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second) + if err != nil { + return err + } var resp JSApiConsumerListResponse - err = json.Unmarshal(m.Data, &resp) - require_NoError(t, err) - return resp.Missing - } - - checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { - missing := getMissing() - if len(missing) == 0 { + require_NoError(t, json.Unmarshal(m.Data, &resp)) + if len(resp.Missing) == 0 { return nil } - return fmt.Errorf("Still have missing: %+v", missing) + return fmt.Errorf("Still have missing: %+v", resp.Missing) }) }