diff --git a/README.md b/README.md
index 9133104..f8ba462 100644
--- a/README.md
+++ b/README.md
@@ -22,7 +22,7 @@ You can find a number of ready-to-run examples at [this directory](examples).
After running `docker-compose up` command, you can run any application you want.
- Without Retry/Exception Manager
+ Simple Consumer
func main() {
consumerCfg := &kafka.ConsumerConfig{
@@ -49,7 +49,7 @@ After running `docker-compose up` command, you can run any application you want.
- With Retry/Exception Option Enabled
+ Simple Consumer With Retry/Exception Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
diff --git a/examples/with-grafana/main.go b/examples/with-grafana/main.go
index 56afab5..17a299b 100644
--- a/examples/with-grafana/main.go
+++ b/examples/with-grafana/main.go
@@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"strconv"
- "sync/atomic"
"time"
"github.com/Trendyol/kafka-konsumer"
@@ -27,47 +26,31 @@ var messages = []user{
}
func main() {
- // retryMap stores the number of retries for each message
- var retryMap = make(map[int]int, len(messages))
- for _, message := range messages {
- retryMap[message.ID] = 0
- }
-
// create new kafka producer
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
+ defer producer.Close()
- // produce messages at 1 seconds interval
- ticker := time.NewTicker(1 * time.Second)
- quit := make(chan struct{})
go func() {
- // to find the message we will produce at the next interval
- var i uint64
- for {
- select {
- case <-ticker.C:
- message := messages[atomic.LoadUint64(&i)]
- bytes, _ := json.Marshal(message)
-
- _ = producer.Produce(context.Background(), kafka.Message{
- Topic: "konsumer",
- Key: []byte(strconv.Itoa(message.ID)),
- Value: bytes,
- })
-
- if message.ID == messages[len(messages)-1].ID {
- quit <- struct{}{}
- return
- }
-
- atomic.AddUint64(&i, 1)
- case <-quit:
- ticker.Stop()
- return
+ // produce messages at 1 seconds interval
+ i := 0
+ ticker := time.NewTicker(1 * time.Second)
+ for range ticker.C {
+ if i == len(messages) {
+ break
}
+ message := messages[i]
+ bytes, _ := json.Marshal(message)
+
+ _ = producer.Produce(context.Background(), kafka.Message{
+ Topic: "konsumer",
+ Key: []byte(strconv.Itoa(message.ID)),
+ Value: bytes,
+ })
+ i++
}
}()
@@ -88,16 +71,8 @@ func main() {
MaxRetry: 3,
},
ConsumeFn: func(message kafka.Message) error {
- u := &user{}
- if err := json.Unmarshal(message.Value, u); err != nil {
- return err
- }
-
- n := retryMap[u.ID]
- if n < 3 {
- retryMap[u.ID] += 1
- return fmt.Errorf("message %s retrying, current retry count: %d", message.Key, n)
- }
+ // mocking some background task
+ time.Sleep(1 * time.Second)
fmt.Printf("Message from %s with value %s is consumed successfully\n", message.Topic, string(message.Value))
return nil
@@ -113,7 +88,7 @@ func main() {
fmt.Println("Consumer started!")
- // wait for interrupt signal to gracefully shutdown the consumer
+ // wait for interrupt signal to gracefully shut down the consumer
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
diff --git a/examples/with-grafana/prometheus/alerts.yml b/examples/with-grafana/prometheus/alerts.yml
index cad1ba6..853050c 100644
--- a/examples/with-grafana/prometheus/alerts.yml
+++ b/examples/with-grafana/prometheus/alerts.yml
@@ -1,11 +1,11 @@
groups:
- name: konsumer-alerts
rules:
- - alert: ProcessedMessageDecreasing
- expr: sum(rate(kafka_konsumer_processed_messages_total{job="konsumer"}[1m])) < 0.1
+ - alert: UnprocessedMessageIncreasing
+ expr: increase(kafka_konsumer_unprocessed_messages_total{job="konsumer"}[5m]) > 0
for: 5m
labels:
severity: "critical"
annotations:
- summary: "Kafka Konsumer processed message decreasing"
- description: "Kafka Konsumer processed message decreasing, current rate: {{ $value }} "
+ summary: "Kafka Konsumer unprocessed message increasing"
+ description: "Kafka Konsumer unprocessed message increasing, current value: {{ $value }} "