Skip to content

Commit 1ca7779

Browse files
kafakstreamer: Allow configurable config (#155)
* kafakstreamer: Allow configurable config
1 parent 97e3cbb commit 1ca7779

File tree

3 files changed

+78
-4
lines changed

3 files changed

+78
-4
lines changed

adapters/kafkastreamer/kafka.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,29 @@ import (
1111
"github.com/luno/workflow"
1212
)
1313

14-
func New(brokers []string) *StreamConstructor {
15-
return &StreamConstructor{
14+
type Option func(*StreamConstructor)
15+
16+
func WithConfig(config *sarama.Config) Option {
17+
return func(sc *StreamConstructor) {
18+
if config == nil {
19+
panic("sarama config cannot be nil")
20+
}
21+
22+
sc.sharedConfig = config
23+
}
24+
}
25+
26+
func New(brokers []string, opts ...Option) *StreamConstructor {
27+
sc := &StreamConstructor{
1628
sharedConfig: newConfig(),
1729
brokers: brokers,
1830
}
31+
32+
for _, opt := range opts {
33+
opt(sc)
34+
}
35+
36+
return sc
1937
}
2038

2139
func newConfig() *sarama.Config {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package kafkastreamer
2+
3+
import (
4+
"testing"
5+
6+
"github.com/IBM/sarama"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestWithConfig(t *testing.T) {
11+
brokers := []string{"localhost:9092"}
12+
customConfig := sarama.NewConfig()
13+
customConfig.Producer.MaxMessageBytes = 2000000
14+
customConfig.Consumer.Fetch.Default = 2048576
15+
16+
sc := New(brokers, WithConfig(customConfig))
17+
18+
// Since this is an internal test, we can access private fields
19+
require.Equal(t, 2000000, sc.sharedConfig.Producer.MaxMessageBytes)
20+
require.Equal(t, int32(2048576), sc.sharedConfig.Consumer.Fetch.Default)
21+
}
22+
23+
func TestWithConfigOverridesDefaults(t *testing.T) {
24+
brokers := []string{"localhost:9092"}
25+
26+
// First verify our default config
27+
defaultSC := New(brokers)
28+
require.True(t, defaultSC.sharedConfig.Producer.Return.Successes)
29+
require.True(t, defaultSC.sharedConfig.Producer.Return.Errors)
30+
31+
// Now test that custom config overrides defaults
32+
customConfig := sarama.NewConfig()
33+
customConfig.Producer.Return.Successes = false
34+
customConfig.Producer.Return.Errors = false
35+
36+
customSC := New(brokers, WithConfig(customConfig))
37+
38+
// Verify that the custom config completely replaced the default
39+
require.False(t, customSC.sharedConfig.Producer.Return.Successes)
40+
require.False(t, customSC.sharedConfig.Producer.Return.Errors)
41+
}
42+
43+
func TestDefaultConfig(t *testing.T) {
44+
brokers := []string{"localhost:9092"}
45+
sc := New(brokers)
46+
47+
// Verify default config values
48+
require.True(t, sc.sharedConfig.Producer.Return.Successes)
49+
require.True(t, sc.sharedConfig.Producer.Return.Errors)
50+
}
51+
52+
func TestPanicIfConfigNil(t *testing.T) {
53+
require.PanicsWithValue(t,
54+
"sarama config cannot be nil",
55+
func() {
56+
_ = New([]string{""}, WithConfig(nil))
57+
}, "")
58+
}

adapters/kafkastreamer/kafka_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ import (
1414
"github.com/luno/workflow/adapters/kafkastreamer"
1515
)
1616

17-
const brokerAddress = "localhost:9092"
18-
1917
func TestStreamer(t *testing.T) {
2018
adaptertest.RunEventStreamerTest(t, func() workflow.EventStreamer {
2119
ctx := t.Context()

0 commit comments

Comments
 (0)