From 27a178069f41081a740bedd8e35fcd0311ee5c29 Mon Sep 17 00:00:00 2001 From: artarts36 Date: Sun, 20 Aug 2023 13:48:13 +0300 Subject: [PATCH 1/6] add ping for kafka --- kafkajobs/driver.go | 12 ++++++++++++ kafkajobs/opts.go | 5 +++++ 2 files changed, 17 insertions(+) diff --git a/kafkajobs/driver.go b/kafkajobs/driver.go index d90043e..a58aefb 100644 --- a/kafkajobs/driver.go +++ b/kafkajobs/driver.go @@ -236,6 +236,18 @@ func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } + if d.cfg.Ping != nil { + pingCtx, pingCancel := context.WithTimeout(ctx, d.cfg.Ping.Timeout) + defer pingCancel() + + err := d.kafkaClient.Ping(pingCtx) + if err != nil { + return errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) + } + + d.log.Debug("ping kafka: ok", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name())) + } + d.mu.Lock() defer d.mu.Unlock() diff --git a/kafkajobs/opts.go b/kafkajobs/opts.go index 4ff7bc0..779a037 100644 --- a/kafkajobs/opts.go +++ b/kafkajobs/opts.go @@ -65,6 +65,7 @@ type config struct { // global Brokers []string `mapstructure:"brokers"` SASL *SASL `mapstructure:"sasl"` + Ping *Ping `mapstructure:"ping"` // pipeline Priority int `mapstructure:"priority"` @@ -91,6 +92,10 @@ type SASL struct { UserAgent string `mapstructure:"user_agent" json:"user_agent"` } +type Ping struct { + Timeout time.Duration `mapstructure:"timeout" json:"timeout"` +} + type GroupOptions struct { GroupID string `mapstructure:"group_id" json:"group_id"` BlockRebalanceOnPoll bool `mapstructure:"block_rebalance_on_poll" json:"block_rebalance_on_poll"` From 31b3174c54c2b11451f566083c4edc87dbdef689 Mon Sep 17 00:00:00 2001 From: artarts36 Date: Sun, 20 Aug 2023 14:25:45 +0300 Subject: [PATCH 2/6] move ping into FromPipeline/FromConfig --- kafkajobs/driver.go | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/kafkajobs/driver.go b/kafkajobs/driver.go index a58aefb..7493cb3 100644 --- a/kafkajobs/driver.go +++ b/kafkajobs/driver.go @@ -115,6 +115,18 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg return nil, errors.E(op, err) } + if conf.Ping != nil { + pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) + defer pingCancel() + + err := jb.kafkaClient.Ping(pingCtx) + if err != nil { + return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) + } + + log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) + } + jb.pipeline.Store(&pipeline) go jb.recordsHandler() @@ -216,6 +228,18 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log * return nil, errors.E(op, err) } + if conf.Ping != nil { + pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) + defer pingCancel() + + err := jb.kafkaClient.Ping(pingCtx) + if err != nil { + return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) + } + + log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) + } + jb.pipeline.Store(&pipeline) go jb.recordsHandler() @@ -236,18 +260,6 @@ func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } - if d.cfg.Ping != nil { - pingCtx, pingCancel := context.WithTimeout(ctx, d.cfg.Ping.Timeout) - defer pingCancel() - - err := d.kafkaClient.Ping(pingCtx) - if err != nil { - return errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) - } - - d.log.Debug("ping kafka: ok", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name())) - } - d.mu.Lock() defer d.mu.Unlock() From ac533e835b879e79dd8817e85a7c3195f351fe90 Mon Sep 17 00:00:00 2001 From: artarts36 Date: Sun, 20 Aug 2023 18:54:46 +0300 Subject: [PATCH 3/6] set default ping --- kafkajobs/config.go | 8 ++++++++ kafkajobs/driver.go | 32 ++++++++++++++------------------ 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/kafkajobs/config.go b/kafkajobs/config.go index 0d449ae..0b166f0 100644 --- a/kafkajobs/config.go +++ b/kafkajobs/config.go @@ -13,6 +13,8 @@ import ( "github.com/twmb/franz-go/pkg/sasl/scram" ) +const defaultPingTimeout = time.Duration(time.Second * 10) + func (c *config) InitDefault() ([]kgo.Opt, error) { opts := make([]kgo.Opt, 0, 1) @@ -228,5 +230,11 @@ func (c *config) InitDefault() ([]kgo.Opt, error) { } } + if c.Ping == nil { + c.Ping = &Ping{ + Timeout: defaultPingTimeout, + } + } + return opts, nil } diff --git a/kafkajobs/driver.go b/kafkajobs/driver.go index 7493cb3..08eb32a 100644 --- a/kafkajobs/driver.go +++ b/kafkajobs/driver.go @@ -115,18 +115,16 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg return nil, errors.E(op, err) } - if conf.Ping != nil { - pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) - defer pingCancel() + pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) + defer pingCancel() - err := jb.kafkaClient.Ping(pingCtx) - if err != nil { - return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) - } - - log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) + err = jb.kafkaClient.Ping(pingCtx) + if err != nil { + return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) } + log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) + jb.pipeline.Store(&pipeline) go jb.recordsHandler() @@ -228,18 +226,16 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log * return nil, errors.E(op, err) } - if conf.Ping != nil { - pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) - defer pingCancel() + pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) + defer pingCancel() - err := jb.kafkaClient.Ping(pingCtx) - if err != nil { - return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) - } - - log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) + err = jb.kafkaClient.Ping(pingCtx) + if err != nil { + return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) } + log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) + jb.pipeline.Store(&pipeline) go jb.recordsHandler() From 8cc9c87bcbe1e74cb8166321a49c90247f53a0a2 Mon Sep 17 00:00:00 2001 From: artarts36 Date: Sun, 20 Aug 2023 18:55:09 +0300 Subject: [PATCH 4/6] simple --- kafkajobs/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafkajobs/config.go b/kafkajobs/config.go index 0b166f0..82ee270 100644 --- a/kafkajobs/config.go +++ b/kafkajobs/config.go @@ -13,7 +13,7 @@ import ( "github.com/twmb/franz-go/pkg/sasl/scram" ) -const defaultPingTimeout = time.Duration(time.Second * 10) +const defaultPingTimeout = time.Second * 10 func (c *config) InitDefault() ([]kgo.Opt, error) { opts := make([]kgo.Opt, 0, 1) From 7ffb9f7b03d5edecdcadf1af28be5ec7f404d61b Mon Sep 17 00:00:00 2001 From: artarts36 Date: Sun, 20 Aug 2023 19:49:04 +0300 Subject: [PATCH 5/6] add method "pingKafka" --- kafkajobs/driver.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/kafkajobs/driver.go b/kafkajobs/driver.go index 08eb32a..fbd86a9 100644 --- a/kafkajobs/driver.go +++ b/kafkajobs/driver.go @@ -115,16 +115,11 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg return nil, errors.E(op, err) } - pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) - defer pingCancel() - - err = jb.kafkaClient.Ping(pingCtx) + err = pingKafka(jb.kafkaClient, log, conf.Ping.Timeout, pipeline) if err != nil { - return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) + return nil, err } - log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) - jb.pipeline.Store(&pipeline) go jb.recordsHandler() @@ -226,16 +221,11 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log * return nil, errors.E(op, err) } - pingCtx, pingCancel := context.WithTimeout(context.Background(), conf.Ping.Timeout) - defer pingCancel() - - err = jb.kafkaClient.Ping(pingCtx) + err = pingKafka(jb.kafkaClient, log, conf.Ping.Timeout, pipeline) if err != nil { - return nil, errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) + return nil, err } - log.Debug("ping kafka: ok", zap.String("driver", pipeline.Driver()), zap.String("pipeline", pipeline.Name())) - jb.pipeline.Store(&pipeline) go jb.recordsHandler() @@ -501,3 +491,19 @@ func (d *Driver) requeueHandler() { } } } + +func pingKafka(client *kgo.Client, log *zap.Logger, timeout time.Duration, pipe jobs.Pipeline) error { + const op = errors.Op("new_kafka_consumer") + + pingCtx, pingCancel := context.WithTimeout(context.Background(), timeout) + defer pingCancel() + + err := client.Ping(pingCtx) + if err != nil { + return errors.E(op, errors.Errorf("ping kafka was failed: %s", err)) + } + + log.Debug("ping kafka: ok", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name())) + + return nil +} From 27a1c7dea230e5931d775b46705c5657f8e3ac12 Mon Sep 17 00:00:00 2001 From: artarts36 Date: Sun, 20 Aug 2023 19:56:29 +0300 Subject: [PATCH 6/6] rename ping method & change log operation name --- kafkajobs/driver.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafkajobs/driver.go b/kafkajobs/driver.go index fbd86a9..ddb72bd 100644 --- a/kafkajobs/driver.go +++ b/kafkajobs/driver.go @@ -115,7 +115,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg return nil, errors.E(op, err) } - err = pingKafka(jb.kafkaClient, log, conf.Ping.Timeout, pipeline) + err = ping(jb.kafkaClient, log, conf.Ping.Timeout, pipeline) if err != nil { return nil, err } @@ -221,7 +221,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log * return nil, errors.E(op, err) } - err = pingKafka(jb.kafkaClient, log, conf.Ping.Timeout, pipeline) + err = ping(jb.kafkaClient, log, conf.Ping.Timeout, pipeline) if err != nil { return nil, err } @@ -492,8 +492,8 @@ func (d *Driver) requeueHandler() { } } -func pingKafka(client *kgo.Client, log *zap.Logger, timeout time.Duration, pipe jobs.Pipeline) error { - const op = errors.Op("new_kafka_consumer") +func ping(client *kgo.Client, log *zap.Logger, timeout time.Duration, pipe jobs.Pipeline) error { + const op = errors.Op("kafka_ping") pingCtx, pingCancel := context.WithTimeout(context.Background(), timeout) defer pingCancel()