Skip to content

Commit

Permalink
feat: write json and string methods for producer and consumer configs… (
Browse files Browse the repository at this point in the history
#141)

* feat: write json and string methods for producer and consumer configs (#140)
  • Loading branch information
oktaykcr authored Sep 3, 2024
1 parent c27bd17 commit 88382d0
Show file tree
Hide file tree
Showing 10 changed files with 578 additions and 1 deletion.
19 changes: 19 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,22 @@ func GetBalancerReferenceHash() Balancer {
func GetBalancerRoundRobin() Balancer {
return &kafka.RoundRobin{}
}

func GetBalancerString(balancer Balancer) string {
switch balancer.(type) {
case *kafka.CRC32Balancer:
return "CRC32Balancer"
case *kafka.Hash:
return "Hash"
case *kafka.LeastBytes:
return "LeastBytes"
case *kafka.Murmur2Balancer:
return "Murmur2Balancer"
case *kafka.ReferenceHash:
return "ReferenceHash"
case *kafka.RoundRobin:
return "RoundRobin"
default:
return "Unknown"
}
}
51 changes: 51 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,54 @@ func TestGetBalancerRoundRobinh(t *testing.T) {
t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerString(t *testing.T) {
tests := []struct {
name string
balancer Balancer
want string
}{
{
name: "Should_Return_CRC32Balancer",
balancer: GetBalancerCRC32(),
want: "CRC32Balancer",
},
{
name: "Should_Return_Hash",
balancer: GetBalancerHash(),
want: "Hash",
},
{
name: "Should_Return_LeastBytes",
balancer: GetBalancerLeastBytes(),
want: "LeastBytes",
},
{
name: "Should_Return_Murmur2Balancer",
balancer: GetBalancerMurmur2Balancer(),
want: "Murmur2Balancer",
},
{
name: "Should_Return_ReferenceHash",
balancer: GetBalancerReferenceHash(),
want: "ReferenceHash",
},
{
name: "Should_Return_RoundRobin",
balancer: GetBalancerRoundRobin(),
want: "RoundRobin",
},
{
name: "Should_Return_Unknown",
balancer: nil,
want: "Unknown",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetBalancerString(tt.balancer); got != tt.want {
t.Errorf("GetBalancerString() = %v, want %v", got, tt.want)
}
})
}
}
59 changes: 59 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package kafka

import (
"bytes"
"encoding/json"
"fmt"
"regexp"
"strings"
"time"

"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -61,6 +66,51 @@ type ConsumerConfig struct {
MetricPrefix string
}

func (cfg RetryConfiguration) JSON() string {
return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, `+
`"MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`,
strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron,
cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack)
}

func (cfg *BatchConfiguration) JSON() string {
if cfg == nil {
return "{}"
}
return fmt.Sprintf(`{"MessageGroupLimit": %d}`, cfg.MessageGroupLimit)
}

func (cfg ReaderConfig) JSON() string {
return fmt.Sprintf(`{"Brokers": ["%s"], "GroupId": %q, "GroupTopics": ["%s"], `+
`"MaxWait": %q, "CommitInterval": %q, "StartOffset": %q}`,
strings.Join(cfg.Brokers, "\", \""), cfg.GroupID, strings.Join(cfg.GroupTopics, "\", \""),
cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset))
}

func (cfg *ConsumerConfig) JSON() string {
if cfg == nil {
return "{}"
}
return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, `+
`"TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, `+
`"VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`,
cfg.ClientID, cfg.Reader.JSON(), cfg.BatchConfiguration.JSON(),
cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency,
cfg.RetryEnabled, cfg.RetryConfiguration.JSON(), cfg.VerifyTopicOnStartup,
cfg.Rack, cfg.SASL.JSON(), cfg.TLS.JSON())
}

func (cfg *ConsumerConfig) JSONPretty() string {
return jsonPretty(cfg.JSON())
}

func (cfg *ConsumerConfig) String() string {
re := regexp.MustCompile(`"(\w+)"\s*:`)
modifiedString := re.ReplaceAllString(cfg.JSON(), `$1:`)
modifiedString = modifiedString[1 : len(modifiedString)-1]
return modifiedString
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
cronsumerCfg := kcronsumer.Config{
MetricPrefix: cfg.RetryConfiguration.MetricPrefix,
Expand Down Expand Up @@ -266,3 +316,12 @@ func (cfg *ConsumerConfig) setDefaults() {
func NewBoolPtr(value bool) *bool {
return &value
}

func jsonPretty(jsonString string) string {
var out bytes.Buffer
err := json.Indent(&out, []byte(jsonString), "", "\t")
if err != nil {
return jsonString
}
return out.String()
}
216 changes: 216 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,219 @@ func TestConsumerConfig_getTopics(t *testing.T) {
}
})
}

func Test_jsonPretty(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "Simple JSON",
input: `{"key1":"value1","key2":2}`,
expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": 2\n}",
},
{
name: "Nested JSON",
input: `{"key1":"value1","key2":{"nestedKey1":1,"nestedKey2":2},"key3":[1,2,3]}`,
expected: "{\n\t\"key1\": \"value1\",\n\t\"" +
"key2\": {\n\t\t\"nestedKey1\": 1,\n\t\t\"nestedKey2\": 2\n\t},\n\t\"" +
"key3\": [\n\t\t1,\n\t\t2,\n\t\t3\n\t]\n}",
},
{
name: "Invalid JSON",
input: `{"key1": "value1", "key2": 2`,
expected: `{"key1": "value1", "key2": 2`,
},
{
name: "Empty JSON",
input: ``,
expected: ``,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := jsonPretty(tt.input)
if got != tt.expected {
t.Errorf("jsonPretty() = %v, want %v", got, tt.expected)
}
})
}
}

func TestConsumerConfig_JSON(t *testing.T) {
t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) {
// Given
var config *ConsumerConfig
expected := "{}"
// When
result := config.JSON()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_Json", func(t *testing.T) {
// Given
expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], " +
"\"GroupId\": \"test-consumer.0\", \"GroupTopics\": [\"test-updated.0\"], \"MaxWait\": \"2s\", " +
"\"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, " +
"\"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, " +
"\"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", " +
"\"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, " +
"\"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", " +
"\"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, " +
"\"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}"
// When
result := getConsumerConfigExample().JSON()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) {
// Given
expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"\"], \"GroupId\": \"\", " +
"\"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, " +
"\"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, " +
"\"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", " +
"\"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, " +
"\"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}"
// When
result := getConsumerConfigWithoutInnerObjectExample().JSON()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
}

func TestConsumerConfig_String(t *testing.T) {
t.Run("Should_Convert_To_String", func(t *testing.T) {
// Given
expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], " +
"GroupId: \"test-consumer.0\", GroupTopics: [\"test-updated.0\"], MaxWait: \"2s\", CommitInterval: \"1s\", " +
"StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", " +
"TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, " +
"RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", " +
"StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, " +
"VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " +
"TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}"
// When
result := getConsumerConfigExample().String()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) {
// Given
expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"\"], GroupId: \"\", " +
"GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, " +
"BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, " +
"RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", " +
"MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}"
// When
result := getConsumerConfigWithoutInnerObjectExample().String()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
}

func TestConsumerConfig_JSONPretty(t *testing.T) {
t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) {
// Given
expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"" +
"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" +
"GroupId\": \"test-consumer.0\",\n\t\t\"GroupTopics\": [\n\t\t\t\"test-updated.0\"\n\t\t],\n\t\t\"" +
"MaxWait\": \"2s\",\n\t\t\"CommitInterval\": \"1s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"" +
"BatchConfiguration\": {\n\t\t\"MessageGroupLimit\": 100\n\t},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"" +
"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"" +
"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" +
"Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"" +
"MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"" +
"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"" +
"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"" +
"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}"
// When
result := getConsumerConfigExample().JSONPretty()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) {
// Given
expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"" +
"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"GroupId\": \"\",\n\t\t\"" +
"GroupTopics\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"MaxWait\": \"0s\",\n\t\t\"CommitInterval\": \"0s\",\n\t\t\"" +
"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {},\n\t\"" +
"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"" +
"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"" +
"Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"" +
"VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"" +
"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}"
// When
result := getConsumerConfigWithoutInnerObjectExample().JSONPretty()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
}

func getConsumerConfigExample() *ConsumerConfig {
return &ConsumerConfig{
Rack: "stage",
ClientID: "test-consumer-client-id",
Reader: ReaderConfig{
Brokers: []string{"broker-1.test.com", "broker-2.test.com"},
GroupID: "test-consumer.0",
GroupTopics: []string{"test-updated.0"},
MaxWait: 2 * time.Second,
CommitInterval: time.Second,
},
BatchConfiguration: &BatchConfiguration{
MessageGroupLimit: 100,
},
MessageGroupDuration: 20,
TransactionalRetry: NewBoolPtr(false),
Concurrency: 10,
RetryEnabled: true,
RetryConfiguration: RetryConfiguration{
Brokers: []string{"broker-1.test.com", "broker-2.test.com"},
Topic: "test-exception.0",
StartTimeCron: "*/2 * * * *",
WorkDuration: time.Minute * 1,
MaxRetry: 3,
VerifyTopicOnStartup: true,
},
VerifyTopicOnStartup: true,
TLS: &TLSConfig{
RootCAPath: "resources/ca",
IntermediateCAPath: "resources/intCa",
},
SASL: &SASLConfig{
Type: "scram",
Username: "user",
Password: "pass",
},
}
}

func getConsumerConfigWithoutInnerObjectExample() *ConsumerConfig {
return &ConsumerConfig{
Rack: "stage",
ClientID: "test-consumer-client-id",
Reader: ReaderConfig{},
MessageGroupDuration: 20,
TransactionalRetry: NewBoolPtr(false),
Concurrency: 10,
RetryEnabled: true,
RetryConfiguration: RetryConfiguration{},
VerifyTopicOnStartup: true,
}
}
9 changes: 9 additions & 0 deletions mechanism.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kafka

import (
"fmt"

"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
Expand Down Expand Up @@ -37,3 +39,10 @@ func (s *SASLConfig) plain() sasl.Mechanism {
func (s *SASLConfig) IsEmpty() bool {
return s == nil
}

func (s *SASLConfig) JSON() string {
if s == nil {
return "{}"
}
return fmt.Sprintf(`{"Mechanism": %q, "Username": %q, "Password": %q}`, s.Type, s.Username, s.Password)
}
Loading

0 comments on commit 88382d0

Please sign in to comment.