Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add functionality pause and resume for consumer #89

Merged
merged 9 commits into from
Jan 15, 2024
13 changes: 13 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"time"

"github.com/segmentio/kafka-go"
Expand All @@ -17,6 +18,18 @@
messageGroupLimit int
}

func (b *batchConsumer) Pause() {
b.logger.Info("Batch consumer is paused!")
b.pauseConsuming = true
b.cancelFn()

Check warning on line 24 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L21-L24

Added lines #L21 - L24 were not covered by tests
}

func (b *batchConsumer) Resume() {
b.logger.Info("Consumer is resumed!")
b.pauseConsuming = false
b.context, b.cancelFn = context.WithCancel(context.Background())

Check warning on line 30 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L27-L30

Added lines #L27 - L30 were not covered by tests
}

func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"time"

"github.com/segmentio/kafka-go"
Expand All @@ -14,6 +15,18 @@
consumeFn func(*Message) error
}

func (c *consumer) Pause() {
c.logger.Info("Consumer is paused!")
c.pauseConsuming = true
c.cancelFn()

Check warning on line 21 in consumer.go

View check run for this annotation

Codecov / codecov/patch

consumer.go#L18-L21

Added lines #L18 - L21 were not covered by tests
}

func (c *consumer) Resume() {
c.logger.Info("Consumer is resumed!")
c.pauseConsuming = false
c.context, c.cancelFn = context.WithCancel(context.Background())

Check warning on line 27 in consumer.go

View check run for this annotation

Codecov / codecov/patch

consumer.go#L24-L27

Added lines #L24 - L27 were not covered by tests
}

func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.Concurrency)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
// Consume starts consuming
Consume()

// Pause pauses consumer, it is stop consuming new messages
Pause()

// Resume resumes consumer, it is start to working
Resume()

// WithLogger for injecting custom log implementation
WithLogger(logger LoggerInterface)

Expand Down Expand Up @@ -56,6 +62,7 @@
retryEnabled bool
transactionalRetry bool
distributedTracingEnabled bool
pauseConsuming bool
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -129,6 +136,10 @@
close(c.incomingMessageStream)
return
default:
if c.pauseConsuming {
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
continue

Check warning on line 140 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L140

Added line #L140 was not covered by tests
}

m := &kafka.Message{}
err := c.r.FetchMessage(c.context, m)
if err != nil {
Expand Down Expand Up @@ -158,7 +169,8 @@
}

func (c *base) Stop() error {
c.logger.Debug("Stop called!")
c.logger.Info("Stop called!")

Check warning on line 172 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L172

Added line #L172 was not covered by tests

var err error
c.once.Do(func() {
c.subprocesses.Stop()
Expand Down
55 changes: 55 additions & 0 deletions examples/with-pause-resume-consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
"os"
"os/signal"
"time"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Concurrency: 1,
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: false,
ConsumeFn: consumeFn,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()
fmt.Println("Consumer started...!")

// You can produce a message via kowl.
go func() {
time.Sleep(10 * time.Second)
consumer.Pause()

time.Sleep(10 * time.Second)
consumer.Resume()

time.Sleep(10 * time.Second)
consumer.Pause()

time.Sleep(10 * time.Second)
consumer.Resume()

time.Sleep(10 * time.Second)
consumer.Pause()
}()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s \n", message.Topic, string(message.Value))
return nil
}