diff --git a/admin.go b/admin.go index f5d1427a0..b04102e2f 100644 --- a/admin.go +++ b/admin.go @@ -207,19 +207,17 @@ func isErrNoController(err error) bool { // provided retryable func) up to the maximum number of tries permitted by // the admin client configuration func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error { - var err error - for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ { - err = fn() - if err == nil || !retryable(err) { + for attemptsRemaining := ca.conf.Admin.Retry.Max; ; { + err := fn() + attemptsRemaining-- + if err == nil || attemptsRemaining == 0 || !retryable(err) { return err } Logger.Printf( "admin/request retrying after %dms... (%d attempts remaining)\n", - ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt) + ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining) time.Sleep(ca.conf.Admin.Retry.Backoff) - continue } - return err } func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error { diff --git a/admin_test.go b/admin_test.go index edafc44d9..ea7519935 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1790,3 +1790,73 @@ func TestDescribeLogDirsUnknownBroker(t *testing.T) { } } } + +func Test_retryOnError(t *testing.T) { + testBackoffTime := 100 * time.Millisecond + config := NewTestConfig() + config.Version = V1_0_0_0 + config.Admin.Retry.Max = 3 + config.Admin.Retry.Backoff = testBackoffTime + + admin := &clusterAdmin{conf: config} + + t.Run("immediate success", func(t *testing.T) { + startTime := time.Now() + attempts := 0 + err := admin.retryOnError( + func(error) bool { return true }, + func() error { + attempts++ + return nil + }) + if err != nil { + t.Fatalf("expected no error but was %v", err) + } + if attempts != 1 { + t.Fatalf("expected 1 attempt to have been made but was %d", attempts) + } + if time.Since(startTime) >= testBackoffTime { + t.Fatalf("single attempt should take less than backoff time") + } + }) + + t.Run("immediate failure", func(t *testing.T) { + startTime := time.Now() + attempts := 0 + err := admin.retryOnError( + func(error) bool { return false }, + func() error { + attempts++ + return errors.New("mock error") + }) + if err == nil { + t.Fatalf("expected error but was nil") + } + if attempts != 1 { + t.Fatalf("expected 1 attempt to have been made but was %d", attempts) + } + if time.Since(startTime) >= testBackoffTime { + t.Fatalf("single attempt should take less than backoff time") + } + }) + + t.Run("failing all attempts", func(t *testing.T) { + startTime := time.Now() + attempts := 0 + err := admin.retryOnError( + func(error) bool { return true }, + func() error { + attempts++ + return errors.New("mock error") + }) + if err == nil { + t.Errorf("expected error but was nil") + } + if attempts != 3 { + t.Errorf("expected 3 attempts to have been made but was %d", attempts) + } + if time.Since(startTime) >= 3*testBackoffTime { + t.Errorf("attempt+sleep+attempt+sleep+attempt should take less than 3 * backoff time") + } + }) +} diff --git a/client.go b/client.go index 03ecf4c34..4182c33f2 100644 --- a/client.go +++ b/client.go @@ -979,9 +979,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, if time.Since(time.Unix(t/1e3, 0)) < backoff { return err } + attemptsRemaining-- Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) - return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline) + return client.tryRefreshMetadata(topics, attemptsRemaining, deadline) } return err } @@ -1160,9 +1161,10 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo retry := func(err error) (*FindCoordinatorResponse, error) { if attemptsRemaining > 0 { backoff := client.computeBackoff(attemptsRemaining) + attemptsRemaining-- Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) time.Sleep(backoff) - return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining-1) + return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining) } return nil, err }