From 7ac093a936e0df3e5daa8749b333822007b848b6 Mon Sep 17 00:00:00 2001 From: Christian Ravn Date: Tue, 30 Aug 2022 14:58:15 +0200 Subject: [PATCH] feat: allow retry on init failure --- charts/kminion/values.yaml | 3 +++ kafka/config.go | 2 ++ kafka/service.go | 24 +++++++++++++++++------- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/charts/kminion/values.yaml b/charts/kminion/values.yaml index 9385454..fe1fa30 100644 --- a/charts/kminion/values.yaml +++ b/charts/kminion/values.yaml @@ -171,6 +171,9 @@ kminion: # username: "" # password: "" # realm: "" +# # Whether to retry the initial test connection to Kafka. False will exit with code 1 on error, +# # while true will retry until success. +# retryInitConnection: false # # minion: # consumerGroups: diff --git a/kafka/config.go b/kafka/config.go index 87b249f..e6edce2 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -10,6 +10,8 @@ type Config struct { TLS TLSConfig `koanf:"tls"` SASL SASLConfig `koanf:"sasl"` + + RetryInitConnection bool `koanf:"retryInitConnection"` } func (c *Config) SetDefaults() { diff --git a/kafka/service.go b/kafka/service.go index fed8361..372eac6 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -43,11 +43,18 @@ func (s *Service) CreateAndTestClient(ctx context.Context, l *zap.Logger, opts [ } // Test connection - connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - err = s.testConnection(client, connectCtx) - if err != nil { - return nil, fmt.Errorf("failed to test connectivity to Kafka cluster %w", err) + for { + err = s.testConnection(client, ctx) + if err == nil { + break + } + + if !s.cfg.RetryInitConnection { + return nil, fmt.Errorf("failed to test connectivity to Kafka cluster %w", err) + } + + logger.Warn("failed to test connectivity to Kafka cluster, retrying in 5 seconds", zap.Error(err)) + time.Sleep(time.Second * 5) } return client, nil @@ -61,17 +68,20 @@ func (s *Service) Brokers() []string { // testConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be // returned if connecting fails. func (s *Service) testConnection(client *kgo.Client, ctx context.Context) error { + connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + req := kmsg.MetadataRequest{ Topics: nil, } - res, err := req.RequestWith(ctx, client) + res, err := req.RequestWith(connectCtx, client) if err != nil { return fmt.Errorf("failed to request metadata: %w", err) } // Request versions in order to guess Kafka Cluster version versionsReq := kmsg.NewApiVersionsRequest() - versionsRes, err := versionsReq.RequestWith(ctx, client) + versionsRes, err := versionsReq.RequestWith(connectCtx, client) if err != nil { return fmt.Errorf("failed to request api versions: %w", err) }