Skip to content

Commit

Permalink
Factor out consumer cleanup times to deflake orphaned consumer tests
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
Co-authored-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Oct 3, 2024
1 parent 82c1371 commit 9109439
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
21 changes: 14 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
return false
}

const (
defaultConsumerNotActiveStartInterval = 30 * time.Second
defaultConsumerNotActiveMaxInterval = 5 * time.Minute
)

var (
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1702,12 +1712,9 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
const (
startInterval = 30 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
interval := startInterval + jitter
const ()
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
Expand All @@ -1722,7 +1729,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < maxInterval {
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
Expand Down
11 changes: 9 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
}

func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
time.Sleep(2 * time.Second)

// Restart first and wait so that we know it will try cleanup without a metaleader.
// It will fail as there's no metaleader at that time, it should keep retrying on an interval.
c.restartServer(rs)
time.Sleep(time.Second)

Expand All @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
defer nc.Close()

subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
m, err := nc.Request(subj, nil, time.Second)
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
m, err := nc.Request(subj, nil, 5*time.Second)
if err != nil {
return err
}
Expand Down
26 changes: 13 additions & 13 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) {
}

func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "GHOST", 3)
defer c.shutdown()

Expand Down Expand Up @@ -6670,22 +6675,17 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
time.Sleep(5 * time.Second)
cancel()

getMissing := func() []string {
m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10)
require_NoError(t, err)

checkFor(t, 30*time.Second, time.Second, func() error {
m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second)
if err != nil {
return err
}
var resp JSApiConsumerListResponse
err = json.Unmarshal(m.Data, &resp)
require_NoError(t, err)
return resp.Missing
}

checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
missing := getMissing()
if len(missing) == 0 {
require_NoError(t, json.Unmarshal(m.Data, &resp))
if len(resp.Missing) == 0 {
return nil
}
return fmt.Errorf("Still have missing: %+v", missing)
return fmt.Errorf("Still have missing: %+v", resp.Missing)
})
}

Expand Down

0 comments on commit 9109439

Please sign in to comment.