diff --git a/.github/images/grafana.png b/.github/images/grafana.png index d541ab4..0a77a54 100644 Binary files a/.github/images/grafana.png and b/.github/images/grafana.png differ diff --git a/README.md b/README.md index 9d67c10..1d60aac 100644 --- a/README.md +++ b/README.md @@ -219,53 +219,55 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap ## Configurations -| config | description | default | -|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------| -| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | | -| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | | -| `skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | -| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info | -| `concurrency` | Number of goroutines used at listeners | 1 | -| `retryEnabled` | Retry/Exception consumer is working or not | false | -| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true | -| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | -| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | -| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | | -| `messageGroupDuration` | Maximum time to wait for a batch | 1s | -| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout | -| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled | -| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s | -| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s | -| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s | -| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster | -| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false | -| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() | -| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() | -| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | | -| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | | -| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | | -| `retryConfiguration.topic` | Retry/Exception topic names | | -| `retryConfiguration.brokers` | Retry topic brokers urls | | -| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 | -| `retryConfiguration.tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | -| `retryConfiguration.tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | -| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | | -| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | -| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | -| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | -| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | -| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | -| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | | -| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | -| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | -| `sasl.authType` | `SCRAM` or `PLAIN` | | -| `sasl.username` | SCRAM OR PLAIN username | | -| `sasl.password` | SCRAM OR PLAIN password | | -| `logger` | If you want to custom logger | info | -| `apiEnabled` | Enabled metrics | false | -| `apiConfiguration.port` | Set API port | 8090 | -| `apiConfiguration.healtCheckPath` | Set Health check path | healthcheck | -| `metricConfiguration.path` | Set metric endpoint path | /metrics | +| config | description | default | +|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------| +| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | | +| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | | +| `skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | +| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info | +| `concurrency` | Number of goroutines used at listeners | 1 | +| `retryEnabled` | Retry/Exception consumer is working or not | false | +| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true | +| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | +| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | +| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | | +| `messageGroupDuration` | Maximum time to wait for a batch | 1s | +| `metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is `kafka_konsumer`. Currently, there are two exposed prometheus metrics. `processed_messages_total` and `unprocessed_messages_total` So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current`. | kafka_konsumer | +| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout | +| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled | +| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s | +| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s | +| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s | +| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster | +| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false | +| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() | +| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() | +| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | | +| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | | +| `retryConfiguration.metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current | kafka_cronsumer | +| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | | +| `retryConfiguration.topic` | Retry/Exception topic names | | +| `retryConfiguration.brokers` | Retry topic brokers urls | | +| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 | +| `retryConfiguration.tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | +| `retryConfiguration.tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | +| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | | +| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | +| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | +| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | +| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | +| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | +| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | | +| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | +| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | +| `sasl.authType` | `SCRAM` or `PLAIN` | | +| `sasl.username` | SCRAM OR PLAIN username | | +| `sasl.password` | SCRAM OR PLAIN password | | +| `logger` | If you want to custom logger | info | +| `apiEnabled` | Enabled metrics | false | +| `apiConfiguration.port` | Set API port | 8090 | +| `apiConfiguration.healtCheckPath` | Set Health check path | healthcheck | +| `metricConfiguration.path` | Set metric endpoint path | /metrics | ## Monitoring diff --git a/batch_consumer.go b/batch_consumer.go index 598f129..60de663 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -3,6 +3,8 @@ package kafka import ( "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/segmentio/kafka-go" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -51,8 +53,8 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { return &c, nil } -func (b *batchConsumer) GetMetric() *ConsumerMetric { - return b.metric +func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector { + return b.base.GetMetricCollectors() } func (b *batchConsumer) Consume() { diff --git a/collector.go b/collector.go index 3f702d2..29b7f38 100644 --- a/collector.go +++ b/collector.go @@ -8,20 +8,43 @@ import ( const Name = "kafka_konsumer" -type metricCollector struct { +type MetricCollector struct { consumerMetric *ConsumerMetric totalUnprocessedMessagesCounter *prometheus.Desc totalProcessedMessagesCounter *prometheus.Desc } -func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) { +func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *MetricCollector { + if metricPrefix == "" { + metricPrefix = Name + } + + return &MetricCollector{ + consumerMetric: consumerMetric, + + totalProcessedMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(metricPrefix, "processed_messages_total", "current"), + "Total number of processed messages.", + emptyStringList, + nil, + ), + totalUnprocessedMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(metricPrefix, "unprocessed_messages_total", "current"), + "Total number of unprocessed messages.", + emptyStringList, + nil, + ), + } +} + +func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc) { prometheus.DescribeByCollect(s, ch) } var emptyStringList []string -func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { +func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( s.totalProcessedMessagesCounter, prometheus.CounterValue, @@ -37,31 +60,12 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { ) } -func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector { - return &metricCollector{ - consumerMetric: consumerMetric, - - totalProcessedMessagesCounter: prometheus.NewDesc( - prometheus.BuildFQName(Name, "processed_messages_total", "current"), - "Total number of processed messages.", - emptyStringList, - nil, - ), - totalUnprocessedMessagesCounter: prometheus.NewDesc( - prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"), - "Total number of unprocessed messages.", - emptyStringList, - nil, - ), - } -} - func NewMetricMiddleware(cfg *ConsumerConfig, app *fiber.App, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector, ) (func(ctx *fiber.Ctx) error, error) { - prometheus.DefaultRegisterer.MustRegister(newMetricCollector(consumerMetric)) + prometheus.DefaultRegisterer.MustRegister(NewMetricCollector(cfg.MetricPrefix, consumerMetric)) prometheus.DefaultRegisterer.MustRegister(metricCollectors...) fiberPrometheus := fiberprometheus.New(cfg.Reader.GroupID) diff --git a/collector_test.go b/collector_test.go new file mode 100644 index 0000000..4c53529 --- /dev/null +++ b/collector_test.go @@ -0,0 +1,59 @@ +package kafka + +import ( + "reflect" + "testing" + + "github.com/prometheus/client_golang/prometheus" +) + +func Test_NewCollector(t *testing.T) { + t.Run("When_Default_Prefix_Value_Used", func(t *testing.T) { + cronsumerMetric := &ConsumerMetric{} + expectedTotalProcessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName(Name, "processed_messages_total", "current"), + "Total number of processed messages.", + emptyStringList, + nil, + ) + expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"), + "Total number of unprocessed messages.", + emptyStringList, + nil, + ) + + collector := NewMetricCollector("", cronsumerMetric) + + if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) + } + if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) + } + }) + t.Run("When_Custom_Prefix_Value_Used", func(t *testing.T) { + cronsumerMetric := &ConsumerMetric{} + expectedTotalProcessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName("custom_prefix", "processed_messages_total", "current"), + "Total number of processed messages.", + emptyStringList, + nil, + ) + expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName("custom_prefix", "unprocessed_messages_total", "current"), + "Total number of unprocessed messages.", + emptyStringList, + nil, + ) + + collector := NewMetricCollector("custom_prefix", cronsumerMetric) + + if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) + } + if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) + } + }) +} diff --git a/consumer.go b/consumer.go index 4d66f2f..66f5e6b 100644 --- a/consumer.go +++ b/consumer.go @@ -3,6 +3,8 @@ package kafka import ( "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/segmentio/kafka-go" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -46,6 +48,10 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { return &c, nil } +func (c *consumer) GetMetricCollectors() []prometheus.Collector { + return c.base.GetMetricCollectors() +} + func (c *consumer) Consume() { go c.subprocesses.Start() diff --git a/consumer_base.go b/consumer_base.go index 69b50d1..d429ec5 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -26,6 +26,11 @@ type Consumer interface { // Resume function resumes consumer, it is start to working Resume() + // GetMetricCollectors for the purpose of making metric collectors available. + // You can register these collectors on your own http server. + // Please look at the examples/with-metric-collector directory. + GetMetricCollectors() []prometheus.Collector + // WithLogger for injecting custom log implementation WithLogger(logger LoggerInterface) @@ -72,6 +77,7 @@ type base struct { transactionalRetry bool distributedTracingEnabled bool consumerState state + metricPrefix string } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -109,6 +115,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { batchConsumingStream: make(chan []*Message, cfg.Concurrency), consumerState: stateRunning, skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, + metricPrefix: cfg.MetricPrefix, } if cfg.DistributedTracingEnabled { @@ -127,6 +134,18 @@ func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Messa c.subprocesses.Add(c.cronsumer) } +func (c *base) GetMetricCollectors() []prometheus.Collector { + var metricCollectors []prometheus.Collector + + if c.retryEnabled { + metricCollectors = c.cronsumer.GetMetricCollectors() + } + + metricCollectors = append(metricCollectors, NewMetricCollector(c.metricPrefix, c.metric)) + + return metricCollectors +} + func (c *base) setupAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric) { c.logger.Debug("Initializing API") diff --git a/consumer_config.go b/consumer_config.go index a09436f..f0cdc16 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -51,12 +51,20 @@ type ConsumerConfig struct { DistributedTracingEnabled bool RetryEnabled bool APIEnabled bool + + // MetricPrefix is used for prometheus fq name prefix. + // If not provided, default metric prefix value is `kafka_konsumer`. + // Currently, there are two exposed prometheus metrics. `processed_messages_total_current` and `unprocessed_messages_total_current`. + // So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and + // `kafka_konsumer_unprocessed_messages_total_current`. + MetricPrefix string } func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { cronsumerCfg := kcronsumer.Config{ - ClientID: cfg.RetryConfiguration.ClientID, - Brokers: cfg.RetryConfiguration.Brokers, + MetricPrefix: cfg.RetryConfiguration.MetricPrefix, + ClientID: cfg.RetryConfiguration.ClientID, + Brokers: cfg.RetryConfiguration.Brokers, Consumer: kcronsumer.ConsumerConfig{ ClientID: cfg.ClientID, GroupID: cfg.Reader.GroupID, @@ -131,6 +139,13 @@ func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header { } type RetryConfiguration struct { + // MetricPrefix is used for prometheus fq name prefix. + // If not provided, default metric prefix value is `kafka_cronsumer`. + // Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`. + // So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and + // `kafka_cronsumer_discarded_messages_total_current`. + MetricPrefix string + SASL *SASLConfig TLS *TLSConfig ClientID string diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index de84b17..8fdf209 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -49,6 +49,7 @@ services: command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b kafka:9092 1 20 && \ kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ + kafka-topics --create --topic another-standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ sleep infinity'" diff --git a/examples/with-grafana/grafana/provisioning/dashboards/dashboard.json b/examples/with-grafana/grafana/provisioning/dashboards/dashboard.json index 0415b45..6d93d84 100644 --- a/examples/with-grafana/grafana/provisioning/dashboards/dashboard.json +++ b/examples/with-grafana/grafana/provisioning/dashboards/dashboard.json @@ -24,7 +24,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 1, + "id": 2, "links": [], "liveNow": false, "panels": [ @@ -117,7 +117,7 @@ }, "editorMode": "code", "expr": "kafka_konsumer_processed_messages_total_current{job=\"konsumer\"}", - "legendFormat": "__auto", + "legendFormat": "{{__name__}}", "range": true, "refId": "A" } @@ -188,6 +188,103 @@ "x": 12, "y": 0 }, + "id": 5, + "options": { + "legend": { + "calcs": [ + "min", + "max", + "last", + "lastNotNull" + ], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PE4E52330B67298A4" + }, + "editorMode": "code", + "expr": "kafka_konsumer_unprocessed_messages_total_current{job=\"konsumer\"}", + "legendFormat": "{{__name__}}", + "range": true, + "refId": "A" + } + ], + "title": "Kafka Konsumer Unprocessed Messages Total", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PE4E52330B67298A4" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 9 + }, "id": 2, "options": { "legend": { @@ -213,13 +310,13 @@ "uid": "PE4E52330B67298A4" }, "editorMode": "code", - "expr": "sum(rate(kafka_konsumer_processed_messages_total_current{job=\"konsumer\"}[1m]))", - "legendFormat": "__auto", + "expr": "kafka_cronsumer_retried_messages_total_current{job=\"konsumer\"}", + "legendFormat": "{{__name__}}", "range": true, "refId": "A" } ], - "title": "Kafka Konsumer Processed Messages Rate", + "title": "Kafka Cronsumer Retried Message Total", "type": "timeseries" } ], @@ -232,13 +329,13 @@ "list": [] }, "time": { - "from": "now-5m", + "from": "now-30m", "to": "now" }, "timepicker": {}, "timezone": "", "title": "Kafka Konsumer Dashboard", "uid": "DlIdtG_4z", - "version": 5, + "version": 10, "weekStart": "" } \ No newline at end of file diff --git a/examples/with-grafana/main.go b/examples/with-grafana/main.go index 9b71521..11ad868 100644 --- a/examples/with-grafana/main.go +++ b/examples/with-grafana/main.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" "os" @@ -70,6 +71,10 @@ func main() { MaxRetry: 3, }, ConsumeFn: func(message *kafka.Message) error { + if string(message.Key) == "1" { + return errors.New("simulate error behaviour") + } + // mocking some background task time.Sleep(1 * time.Second) diff --git a/examples/with-metric-collector/README.md b/examples/with-metric-collector/README.md new file mode 100644 index 0000000..d16d674 --- /dev/null +++ b/examples/with-metric-collector/README.md @@ -0,0 +1,30 @@ +If you run this example and go to http://localhost:8000/metrics, + +you can see first and second consumers metrics as shown below + +``` +# HELP first_discarded_messages_total_current Total number of discarded messages. +# TYPE first_discarded_messages_total_current counter +first_discarded_messages_total_current 0 +# HELP first_processed_messages_total_current Total number of processed messages. +# TYPE first_processed_messages_total_current counter +first_processed_messages_total_current 0 +# HELP first_retried_messages_total_current Total number of retried messages. +# TYPE first_retried_messages_total_current counter +first_retried_messages_total_current 0 +# HELP first_unprocessed_messages_total_current Total number of unprocessed messages. +# TYPE first_unprocessed_messages_total_current counter +first_unprocessed_messages_total_current 0 +# HELP second_discarded_messages_total_current Total number of discarded messages. +# TYPE second_discarded_messages_total_current counter +second_discarded_messages_total_current 0 +# HELP second_processed_messages_total_current Total number of processed messages. +# TYPE second_processed_messages_total_current counter +second_processed_messages_total_current 0 +# HELP second_retried_messages_total_current Total number of retried messages. +# TYPE second_retried_messages_total_current counter +second_retried_messages_total_current 0 +# HELP second_unprocessed_messages_total_current Total number of unprocessed messages. +# TYPE second_unprocessed_messages_total_current counter +second_unprocessed_messages_total_current 0 +``` \ No newline at end of file diff --git a/examples/with-metric-collector/api.go b/examples/with-metric-collector/api.go new file mode 100644 index 0000000..14855c0 --- /dev/null +++ b/examples/with-metric-collector/api.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + "github.com/gofiber/fiber/v2" + "github.com/prometheus/client_golang/prometheus" +) + +const port = 8000 + +func StartAPI(metricCollectors ...prometheus.Collector) { + f := fiber.New( + fiber.Config{ + DisableStartupMessage: true, + DisableDefaultDate: true, + DisableHeaderNormalizing: true, + }, + ) + + metricMiddleware, err := NewMetricMiddleware(f, metricCollectors...) + + if err == nil { + f.Use(metricMiddleware) + } else { + fmt.Printf("metric middleware cannot be initialized: %v", err) + } + + fmt.Printf("server starting on port %d", port) + + go listen(f) +} + +func listen(f *fiber.App) { + if err := f.Listen(fmt.Sprintf(":%d", port)); err != nil { + fmt.Printf("server cannot start on port %d, err: %v", port, err) + } +} diff --git a/examples/with-metric-collector/main.go b/examples/with-metric-collector/main.go new file mode 100644 index 0000000..a98bdf5 --- /dev/null +++ b/examples/with-metric-collector/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" + "time" +) + +func main() { + firstConsumer, _ := kafka.NewConsumer(&kafka.ConsumerConfig{ + MetricPrefix: "first", + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + MetricPrefix: "first", + Brokers: []string{"localhost:29092"}, + Topic: "error-topic", + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + }, + ConsumeFn: consumeFn, + }) + defer firstConsumer.Stop() + firstConsumer.Consume() + + secondConsumer, _ := kafka.NewConsumer(&kafka.ConsumerConfig{ + MetricPrefix: "second", + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "another-standart-topic", + GroupID: "another-standart-cg", + }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + MetricPrefix: "second", + Brokers: []string{"localhost:29092"}, + Topic: "retry-topic", + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + }, + ConsumeFn: consumeFn, + }) + defer secondConsumer.Stop() + + secondConsumer.Consume() + + allCollectors := append(firstConsumer.GetMetricCollectors(), secondConsumer.GetMetricCollectors()...) + StartAPI(allCollectors...) + + select {} +} + +func consumeFn(message *kafka.Message) error { + fmt.Printf("Message From %s with value %s\n", message.Topic, string(message.Value)) + return nil +} diff --git a/examples/with-metric-collector/metric.go b/examples/with-metric-collector/metric.go new file mode 100644 index 0000000..e7f68a7 --- /dev/null +++ b/examples/with-metric-collector/metric.go @@ -0,0 +1,16 @@ +package main + +import ( + "github.com/ansrivas/fiberprometheus/v2" + "github.com/gofiber/fiber/v2" + "github.com/prometheus/client_golang/prometheus" +) + +func NewMetricMiddleware(app *fiber.App, metricCollectors ...prometheus.Collector) (func(ctx *fiber.Ctx) error, error) { + prometheus.DefaultRegisterer.MustRegister(metricCollectors...) + + fiberPrometheus := fiberprometheus.New("konsumer-metrics") + fiberPrometheus.RegisterAt(app, "/metrics") + + return fiberPrometheus.Middleware, nil +} diff --git a/examples/with-pause-resume-consumer/how-it-works.md b/examples/with-pause-resume-consumer/how-it-works.md index 37150bf..976e6f7 100644 --- a/examples/with-pause-resume-consumer/how-it-works.md +++ b/examples/with-pause-resume-consumer/how-it-works.md @@ -19,6 +19,8 @@ defer consumer.Stop() consumer.Consume() fmt.Println("Consumer started...!") + +// consumer.Pause(), consumer.Resume() ``` If you need to implement Pause & Resume functionality on your own applications, you need to call `Consume`. Because this diff --git a/go.mod b/go.mod index 0da3f85..9d5f81a 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/Trendyol/kafka-konsumer/v2 go 1.19 require ( - github.com/Trendyol/kafka-cronsumer v1.4.7 + github.com/Trendyol/kafka-cronsumer v1.5.0 github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 - github.com/gofiber/fiber/v2 v2.50.0 + github.com/gofiber/fiber/v2 v2.52.1 github.com/google/go-cmp v0.6.0 github.com/prometheus/client_golang v1.16.0 github.com/segmentio/kafka-go v0.4.47 @@ -23,10 +23,10 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/gofiber/adaptor/v2 v2.2.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.3.1 // indirect + github.com/google/uuid v1.5.0 // indirect github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect @@ -36,7 +36,7 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.50.0 // indirect + github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect @@ -45,7 +45,7 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.13.0 // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/go.sum b/go.sum index c7cba5b..f655f8a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= -github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U= +github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= @@ -21,8 +21,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9Lv4= github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= -github.com/gofiber/fiber/v2 v2.50.0 h1:ia0JaB+uw3GpNSCR5nvC5dsaxXjRU5OEu36aytx+zGw= -github.com/gofiber/fiber/v2 v2.50.0/go.mod h1:21eytvay9Is7S6z+OgPi7c7n4++tnClWmhpimVHMimw= +github.com/gofiber/fiber/v2 v2.52.1 h1:1RoU2NS+b98o1L77sdl5mboGPiW+0Ypsi5oLmcYlgHI= +github.com/gofiber/fiber/v2 v2.52.1/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -30,16 +30,16 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -72,8 +72,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.50.0 h1:H7fweIlBm0rXLs2q0XbalvJ6r0CUPFWK3/bB4N13e9M= -github.com/valyala/fasthttp v1.50.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= +github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -122,8 +122,9 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/test/integration/go.mod b/test/integration/go.mod index 89d79c1..1e0444d 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - github.com/Trendyol/kafka-cronsumer v1.4.7 // indirect + github.com/Trendyol/kafka-cronsumer v1.5.0 // indirect github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 5eb6b0d..5e87da8 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,5 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.8-0.20240218154451-2072724685ea/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=