Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue 235: kafka consumer and publisher for multiple brokers #236

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/sbezverk/gobmp
go 1.19

require (
github.com/Shopify/sarama v1.27.0
github.com/IBM/sarama v1.42.1
kotronis-te marked this conversation as resolved.
Show resolved Hide resolved
github.com/go-test/deep v1.0.8
github.com/golang/glog v1.1.1
github.com/nats-io/nats.go v1.28.0
Expand All @@ -12,29 +12,28 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/frankban/quicktest v1.14.4 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/nats-io/nats-server/v2 v2.9.23 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.5.0 // indirect
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
)
129 changes: 67 additions & 62 deletions go.sum

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pkg/kafka/kafka-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package kafka
import (
"math/rand"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/IBM/sarama"
"github.com/golang/glog"
"github.com/sbezverk/tools"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ func NewKafkaMConsumer(kafkaSrv string, topics []*TopicDescriptor) (Srv, error)
config.Consumer.Return.Errors = true
config.Version = sarama.V1_1_0_0

brokers := []string{kafkaSrv}
brokers := strings.Split(kafkaSrv, ",")

// Create new consumer
master, err := sarama.NewConsumer(brokers, config)
Expand Down
140 changes: 78 additions & 62 deletions pkg/kafka/kafka-publisher.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package kafka

import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"net"
"os"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/IBM/sarama"
"github.com/golang/glog"
"github.com/sbezverk/gobmp/pkg/bmp"
"github.com/sbezverk/gobmp/pkg/pub"
Expand Down Expand Up @@ -73,10 +75,10 @@ var (
)

type publisher struct {
broker *sarama.Broker
config *sarama.Config
producer sarama.AsyncProducer
stopCh chan struct{}
clusterAdmin sarama.ClusterAdmin
config *sarama.Config
producer sarama.AsyncProducer
stopCh chan struct{}
}

func (p *publisher) PublishMessage(t int, key []byte, msg []byte) error {
Expand Down Expand Up @@ -140,7 +142,7 @@ func (p *publisher) produceMessage(topic string, key []byte, msg []byte) error {

func (p *publisher) Stop() {
close(p.stopCh)
p.broker.Close()
p.clusterAdmin.Close()
}

// NewKafkaPublisher instantiates a new instance of a Kafka publisher
Expand All @@ -157,24 +159,33 @@ func NewKafkaPublisher(kafkaSrv string) (pub.Publisher, error) {
config.ClientID = "gobmp-producer" + "_" + strconv.Itoa(rand.Intn(1000))
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Admin.Retry.Max = 100
config.Version = sarama.V1_1_0_0
config.Admin.Retry.Max = 120
config.Admin.Retry.Backoff = time.Second
config.Metadata.Retry.Max = 300
config.Metadata.Retry.Backoff = time.Second * 10
config.Version = sarama.V2_1_0_0

br := sarama.NewBroker(kafkaSrv)
kafkaSrvs := strings.Split(kafkaSrv, ",")
ca, err := sarama.NewClusterAdmin(kafkaSrvs, config)
if err != nil {
glog.Errorf("failed to create cluster admin: %+v", err)
kotronis-te marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

if err := waitForBrokerConnection(br, config, brockerConnectTimeout); err != nil {
glog.Errorf("failed to open connection to the broker with error: %+v\n", err)
cb, err := waitForControllerBrokerConnection(ca, config, brockerConnectTimeout)
if err != nil {
glog.Errorf("failed to open connection to the controller broker with error: %+v\n", err)
return nil, err
}
glog.V(5).Infof("Connected to broker: %s id: %d\n", br.Addr(), br.ID())
glog.V(5).Infof("Connected to controller broker: %s id: %d\n", cb.Addr(), cb.ID())

for _, t := range topicNames {
if err := ensureTopic(br, topicCreateTimeout, t); err != nil {
if err := ensureTopic(ca, topicCreateTimeout, t); err != nil {
glog.Errorf("New Kafka publisher failed to ensure requested topics with error: %+v", err)
return nil, err
}
}
producer, err := sarama.NewAsyncProducer([]string{kafkaSrv}, config)
producer, err := sarama.NewAsyncProducer(kafkaSrvs, config)
if err != nil {
glog.Errorf("New Kafka publisher failed to start new async producer with error: %+v", err)
return nil, err
Expand All @@ -195,61 +206,59 @@ func NewKafkaPublisher(kafkaSrv string) (pub.Publisher, error) {
}(producer, stopCh)

return &publisher{
stopCh: stopCh,
broker: br,
config: config,
producer: producer,
stopCh: stopCh,
clusterAdmin: ca,
config: config,
producer: producer,
}, nil
}

func validator(addr string) error {
host, port, _ := net.SplitHostPort(addr)
if host == "" || port == "" {
return fmt.Errorf("host or port cannot be ''")
}
// Try to resolve if the hostname was used in the address
if ip, err := net.LookupIP(host); err != nil || ip == nil {
// Check if IP address was used in address instead of a host name
if net.ParseIP(host) == nil {
return fmt.Errorf("fail to parse host part of address")
func validator(brokerEndpoints string) error {
addrs := strings.Split(brokerEndpoints, ",")
for _, addr := range addrs {
host, port, _ := net.SplitHostPort(addr)
if host == "" || port == "" {
return fmt.Errorf("%s: host or port cannot be ''", addr)
}
// Try to resolve if the hostname was used in the address
if ip, err := net.LookupIP(host); err != nil || ip == nil {
// Check if IP address was used in address instead of a host name
if net.ParseIP(host) == nil {
return fmt.Errorf("%s: fail to parse host part of address", addr)
}
}
np, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("%s: fail to parse port with error: %w", addr, err)
}
if np == 0 || np > math.MaxUint16 {
return fmt.Errorf("%s: the value of port is invalid", addr)
}
}
np, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("fail to parse port with error: %w", err)
}
if np == 0 || np > math.MaxUint16 {
return fmt.Errorf("the value of port is invalid")
}
return nil
}

func ensureTopic(br *sarama.Broker, timeout time.Duration, topicName string) error {
topic := &sarama.CreateTopicsRequest{
TopicDetails: map[string]*sarama.TopicDetail{
topicName: {
NumPartitions: 1,
ReplicationFactor: 1,
ConfigEntries: map[string]*string{
"retention.ms": &topicRetention,
},
},
func ensureTopic(ca sarama.ClusterAdmin, timeout time.Duration, topicName string) error {
topicDetail := &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
ConfigEntries: map[string]*string{
"retention.ms": &topicRetention,
},
}

ticker := time.NewTicker(100 * time.Millisecond)
tout := time.NewTimer(timeout)
for {
t, err := br.CreateTopics(topic)
if err != nil {
err := ca.CreateTopic(topicName, topicDetail, false)
if errors.Is(err, sarama.ErrIncompleteResponse) {
return err
}
if e, ok := t.TopicErrors[topicName]; ok {
if e.Err == sarama.ErrTopicAlreadyExists || e.Err == sarama.ErrNoError {
return nil
}
if e.Err != sarama.ErrRequestTimedOut {
return e
}
if errors.Is(err, sarama.ErrTopicAlreadyExists) || errors.Is(err, sarama.ErrNoError) {
return nil
}
if !errors.Is(err, sarama.ErrRequestTimedOut) {
return err
}
select {
case <-ticker.C:
Expand All @@ -260,34 +269,41 @@ func ensureTopic(br *sarama.Broker, timeout time.Duration, topicName string) err
}
}

func waitForBrokerConnection(br *sarama.Broker, config *sarama.Config, timeout time.Duration) error {
func waitForControllerBrokerConnection(ca sarama.ClusterAdmin, config *sarama.Config, timeout time.Duration) (*sarama.Broker, error) {
if ca == nil {
return nil, errors.New("nil ClusterAdmin provided")
}
ticker := time.NewTicker(10 * time.Second)
tout := time.NewTimer(timeout)
defer func() {
ticker.Stop()
tout.Stop()
}()
cb, err := ca.Controller()
kotronis-te marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
for {
if err := br.Open(config); err == nil {
if ok, err := br.Connected(); err != nil {
glog.Errorf("failed to connect to the broker with error: %+v, will retry in 10 seconds", err)
if err := cb.Open(config); err == nil {
if ok, err := cb.Connected(); err != nil {
glog.Errorf("failed to connect to the controller broker with error: %+v, will retry in 10 seconds", err)
} else {
if ok {
return nil
return cb, nil
} else {
glog.Errorf("kafka broker %s is not ready yet, will retry in 10 seconds", br.Addr())
glog.Errorf("kafka controller broker %s is not ready yet, will retry in 10 seconds", cb.Addr())
}
}
} else {
if err == sarama.ErrAlreadyConnected {
return nil
return cb, nil
}
}
select {
case <-ticker.C:
continue
case <-tout.C:
return fmt.Errorf("timeout waiting for the connection to the broker %s", br.Addr())
return nil, fmt.Errorf("timeout waiting for the connection to the broker %s", cb.Addr())
}
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/kafka/testbed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Setup

From within the `testbed` folder (note that there are separate files for gobgp and xrd tests):
```
export IMAGE_VERSION=test-235
cd ../../..
make container
cd pkg/kafka/testbed
cd gobgp
docker build -t test/gobgp:3.20.0 -f Dockerfile .
cd ..
# single-broker test
docker compose -f docker-compose-single-broker-gobgp.yaml up -d
docker compose -f docker-compose-single-broker-gobgp.yaml logs -f gobmp
docker compose -f docker-compose-single-broker-gobgp.yaml exec gobgp-1 sh
gobgp global rib add 192.168.1.0/24
exit
docker compose -f docker-compose-single-broker-gobgp.yaml exec kafka-1 sh
kafka-console-consumer --bootstrap-server kafka-1:9091 --topic gobmp.parsed.unicast_prefix_v4 --from-beginning
# make sure that kafka messages are present!
docker compose -f docker-compose-single-broker-gobgp.yaml down
# multi-broker test
docker compose -f docker-compose-multi-broker-gobgp.yaml up -d
docker compose -f docker-compose-multi-broker-gobgp.yaml logs -f gobmp
docker compose -f docker-compose-single-broker-gobgp.yaml exec gobgp-1 sh
gobgp global rib add 192.168.1.0/24
exit
docker compose -f docker-compose-multi-broker-gobgp.yaml exec kafka-4 sh
kafka-console-consumer --bootstrap-server kafka-1:9091,kafka-2:9092,kafka-3:9093,kafka-4:9094 --topic gobmp.pars
ed.unicast_prefix_v4 --from-beginning
# make sure that kafka messages are present!
docker compose -f docker-compose-multi-broker-gobgp.yaml down
```
Loading