Skip to content

Commit

Permalink
Merge pull request #66 from Trendyol/65-consume-message-when-header-h…
Browse files Browse the repository at this point in the history
…as-expected-value

65 consume message when header has expected value
  • Loading branch information
A.Samet İleri authored Feb 11, 2024
2 parents dfb0985 + 1b89837 commit 0a7c944
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:

- uses: actions/checkout@v3
- name: Integration Test
run: go test -v test/integration/integration_test.go
run: go test -timeout=15m -v test/integration/integration_test.go
env:
INPUT_PUBLISH: false
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
61 changes: 31 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,36 +128,37 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {

## Configurations

| config | description | default | example |
|------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------|
| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | info | |
| `consumer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Dialer) | | |
| `consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * |
| `consumer.backOffStrategy` | Define consumer backoff strategy for retry topics | fixed | exponential, linear |
| `consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h |
| `consumer.topic` | Exception topic names | | exception-topic |
| `consumer.groupId` | Exception consumer group id | | exception-consumer-group |
| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | |
| `consumer.concurrency` | Number of goroutines used at listeners | 1 | |
| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MinBytes) | 1 | |
| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxBytes) | 1 MB | |
| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxWait) | 10s | |
| `consumer.commitInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.CommitInterval) | 1s | |
| `consumer.heartbeatInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.HeartbeatInterval) | 3s | |
| `consumer.sessionTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.SessionTimeout) | 30s | |
| `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | |
| `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | |
| `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | |
| `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | |
| `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | |
| `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | |
| `sasl.enabled` | It enables sasl authentication mechanism | false | |
| `sasl.authType` | Currently we only support `SCRAM` | "" | |
| `sasl.username` | SCRAM username | "" | |
| `sasl.password` | SCRAM password | "" | |
| `sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | |
| `sasl.intermediateCAPath` | | "" | |
| `sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#RackAffinityGroupBalancer) | "" | |
| config | description | default | example |
|----------------------------------|----------------------------------------------------------------------------------------------------|----------|--------------------------|
| `logLevel` | Describes log level, valid options are `debug`, `info`, `warn`, and `error` | info | |
| `consumer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Dialer) | | |
| `consumer.cron` | Cron expression when exception consumer starts to work at | | */1 * * * * |
| `consumer.backOffStrategy` | Define consumer backoff strategy for retry topics | fixed | exponential, linear |
| `consumer.duration` | Work duration exception consumer actively consuming messages | | 20s, 15m, 1h |
| `consumer.topic` | Exception topic names | | exception-topic |
| `consumer.groupId` | Exception consumer group id | | exception-consumer-group |
| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | |
| `consumer.concurrency` | Number of goroutines used at listeners | 1 | |
| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MinBytes) | 1 | |
| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxBytes) | 1 MB | |
| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.MaxWait) | 10s | |
| `consumer.commitInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.CommitInterval) | 1s | |
| `consumer.heartbeatInterval` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.HeartbeatInterval) | 3s | |
| `consumer.sessionTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.SessionTimeout) | 30s | |
| `consumer.rebalanceTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RebalanceTimeout) | 30s | |
| `consumer.startOffset` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.StartOffset) | earliest | |
| `consumer.retentionTime` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#ReaderConfig.RetentionTime) | 24h | |
| `consumer.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | |
| `producer.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Transport) | | |
| `producer.batchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchSize) | 100 | |
| `producer.batchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#Writer.BatchTimeout) | 1s | |
| `sasl.enabled` | It enables sasl authentication mechanism | false | |
| `sasl.authType` | Currently we only support `SCRAM` | "" | |
| `sasl.username` | SCRAM username | "" | |
| `sasl.password` | SCRAM password | "" | |
| `sasl.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | |
| `sasl.intermediateCAPath` | | "" | |
| `sasl.rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.39#RackAffinityGroupBalancer) | "" | |

### Exposed Metrics

Expand Down
31 changes: 31 additions & 0 deletions examples/single-consumer-with-header-filter-function/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module single-consumer

go 1.19

replace github.com/Trendyol/kafka-cronsumer => ../..

require github.com/Trendyol/kafka-cronsumer v0.0.0-00010101000000-000000000000

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/klauspost/compress v1.16.4 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/segmentio/kafka-go v0.4.42 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
69 changes: 69 additions & 0 deletions examples/single-consumer-with-header-filter-function/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
40 changes: 40 additions & 0 deletions examples/single-consumer-with-header-filter-function/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"fmt"
cronsumer "github.com/Trendyol/kafka-cronsumer"
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"time"
)

func main() {
config := &kafka.Config{
Brokers: []string{"localhost:29092"},
Consumer: kafka.ConsumerConfig{
GroupID: "sample-consumer",
Topic: "exception",
Cron: "*/1 * * * *",
Duration: 20 * time.Second,
SkipMessageByHeaderFn: SkipMessageByHeaderFn,
},
LogLevel: "info",
}

var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return nil
}

c := cronsumer.New(config, consumeFn)
c.Run()
}

func SkipMessageByHeaderFn(headers []kafka.Header) bool {
for _, header := range headers {
if header.Key == "skipMessage" {
// If a kafka message comes with `skipMessage` header key, it will be skipped!
return true
}
}
return false
}
Loading

0 comments on commit 0a7c944

Please sign in to comment.