Skip to content

Commit

Permalink
[MID-164] Process kafka messages sequentially and commit manually (#135)
Browse files Browse the repository at this point in the history
This ensures that message processing is more reliable. By pushing the message handling off to a goroutine, we immediately "acknowledged" the kafka message and if a redeployment or an app crash happened during processing a message, it would have lost the messages.

Additionally, when there are thousands of messages in the kafka topic, they all would have been read into memory and executed (more or less) simultaneously, leading to a very high memory consumption.

## Extra stuff:
When a handler returns an error, this is now passed to the `errFn`
  • Loading branch information
ulich committed Jul 12, 2022
1 parent f852a45 commit 66a4aaa
Show file tree
Hide file tree
Showing 19 changed files with 906 additions and 270 deletions.
5 changes: 3 additions & 2 deletions camunda/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ replace (
github.com/blacklane/go-libs/logger => ../../logger
github.com/blacklane/go-libs/tracking => ../../tracking
github.com/blacklane/go-libs/x/events => ../../x/events

)

require (
github.com/blacklane/go-libs/logger v0.6.4
github.com/blacklane/go-libs/logger v0.6.5
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.7.1
)

require github.com/stretchr/objx v0.1.1 // indirect
123 changes: 113 additions & 10 deletions camunda/v2/go.sum

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions logger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ replace (

require (
github.com/blacklane/go-libs/tracking v0.3.1
github.com/blacklane/go-libs/x/events v0.2.1
github.com/blacklane/go-libs/x/events v0.3.0
github.com/google/go-cmp v0.5.7
github.com/rs/zerolog v1.26.0
github.com/stretchr/testify v1.7.1
Expand All @@ -18,16 +18,17 @@ require (
)

require (
github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.0.0-20211109214657-ef0fda0de508 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
126 changes: 114 additions & 12 deletions logger/go.sum

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions middleware/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ replace (
)

require (
github.com/blacklane/go-libs/camunda/v2 v2.0.3
github.com/blacklane/go-libs/logger v0.6.4
github.com/blacklane/go-libs/otel v0.1.3
github.com/blacklane/go-libs/camunda/v2 v2.0.4
github.com/blacklane/go-libs/logger v0.6.5
github.com/blacklane/go-libs/otel v0.1.4
github.com/blacklane/go-libs/tracking v0.3.1
github.com/blacklane/go-libs/x/events v0.2.1
github.com/blacklane/go-libs/x/events v0.3.0
github.com/google/go-cmp v0.5.7
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.7.1
)

require (
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -43,13 +43,13 @@ require (
go.opentelemetry.io/otel/sdk v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.opentelemetry.io/proto/otlp v0.16.0 // indirect
golang.org/x/net v0.0.0-20211109214657-ef0fda0de508 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
102 changes: 87 additions & 15 deletions middleware/go.sum

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions otel/examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@ replace (
)

require (
github.com/blacklane/go-libs/logger v0.6.2
github.com/blacklane/go-libs/logger v0.6.5
github.com/blacklane/go-libs/middleware v0.1.0
github.com/blacklane/go-libs/otel v0.1.1
github.com/blacklane/go-libs/otel v0.1.4
github.com/blacklane/go-libs/tracking v0.3.1
github.com/blacklane/go-libs/x/events v0.2.1
github.com/blacklane/go-libs/x/events v0.3.0
github.com/caarlos0/env v3.5.0+incompatible
github.com/confluentinc/confluent-kafka-go v1.8.2
github.com/confluentinc/confluent-kafka-go v1.9.1
github.com/google/uuid v1.3.0
github.com/rs/zerolog v1.26.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
)

require (
github.com/blacklane/go-libs/camunda/v2 v2.0.1 // indirect
github.com/blacklane/go-libs/camunda/v2 v2.0.4 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
Expand All @@ -42,12 +42,12 @@ require (
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/sdk v1.7.0 // indirect
go.opentelemetry.io/proto/otlp v0.16.0 // indirect
golang.org/x/net v0.0.0-20211109214657-ef0fda0de508 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
Loading

0 comments on commit 66a4aaa

Please sign in to comment.