Skip to content

Commit

Permalink
Merge pull request #1406 from worms/master
Browse files Browse the repository at this point in the history
Add consume ability to sasl_scram_client example
  • Loading branch information
bai committed Jun 23, 2019
2 parents 4154a59 + 20c7a58 commit 93e48db
Showing 1 changed file with 65 additions and 13 deletions.
78 changes: 65 additions & 13 deletions examples/sasl_scram_client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"os"
"os/signal"
"strings"

"github.com/Shopify/sarama"
Expand All @@ -27,6 +28,8 @@ var (
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
verifySSL = flag.Bool("verify", false, "Optional verify ssl certificates chain")
useTLS = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
mode = flag.String("mode", "produce", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
logMsg = flag.Bool("logmsg", false, "True to log consumed messages to console")

logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
)
Expand Down Expand Up @@ -62,8 +65,9 @@ func main() {
flag.Parse()

if *brokers == "" {
log.Fatalln("at least one brocker is required")
log.Fatalln("at least one broker is required")
}
splitBrokers := strings.Split(*brokers, ",")

if *userName == "" {
log.Fatalln("SASL username is required")
Expand Down Expand Up @@ -101,18 +105,66 @@ func main() {
conf.Net.TLS.Config = createTLSConfiguration()
}

syncProcuder, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), conf)
if err != nil {
logger.Fatalln("failed to create producer: ", err)
}
partition, offset, err := syncProcuder.SendMessage(&sarama.ProducerMessage{
Topic: *topic,
Value: sarama.StringEncoder("test_message"),
})
if err != nil {
logger.Fatalln("failed to send message to ", *topic, err)
if *mode == "consume" {
consumer, err := sarama.NewConsumer(splitBrokers, conf)
if err != nil {
panic(err)
}
log.Println("consumer created")
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
log.Println("commence consuming")
partitionConsumer, err := consumer.ConsumePartition(*topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
log.Println("in the for")
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d\n", msg.Offset)
if *logMsg {
log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
}
consumed++
case <-signals:
break ConsumerLoop
}
}

log.Printf("Consumed: %d\n", consumed)

} else {
syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
if err != nil {
logger.Fatalln("failed to create producer: ", err)
}
partition, offset, err := syncProducer.SendMessage(&sarama.ProducerMessage{
Topic: *topic,
Value: sarama.StringEncoder("test_message"),
})
if err != nil {
logger.Fatalln("failed to send message to ", *topic, err)
}
logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
_ = syncProducer.Close()
}
logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
_ = syncProcuder.Close()
logger.Println("Bye now !")

}

0 comments on commit 93e48db

Please sign in to comment.