Skip to content

Commit

Permalink
Add createTopic and listTopics functions
Browse files Browse the repository at this point in the history
Add example script for creating a topic and listing topics on all partitions
Update README
  • Loading branch information
mostafa committed Jul 4, 2021
1 parent 4f46ff5 commit c4fcac2
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ To avoid getting the following error while running the test:
Failed to write message: [5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes
```

Just make sure you create the topics in Apache Kafka in advance:
You can now use `createTopic` function to create topics in Kafka. The `scripts/test_topics.js` script shows how to list topics on all Kakfa partitions and also how to create a topic.

You always have the option to create it using `kafka-topics` command:

```bash
$ docker exec -it lensesio bash
Expand Down
29 changes: 29 additions & 0 deletions scripts/test_topics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
This is a k6 test script that imports the xk6-kafka and
list topics on all Kafka partitions and creates a topic.
*/

import {
createTopic,
listTopics
} from 'k6/x/kafka'; // import kafka extension

const address = "localhost:9092";
const kafkaTopic = "xk6_kafka_test_topic";

const results = listTopics(address)
const error = createTopic(address, kafkaTopic);

export default function () {
results.forEach(topic => console.log(topic));

if (error === undefined) {
// If no error returns, it means that the topic
// is successfully created or already exists
console.log("Topic created successfully");
} else {
console.log("Error while creating topic: ", error);
}
}
79 changes: 79 additions & 0 deletions topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package kafka

import (
"net"
"strconv"

"github.com/segmentio/kafka-go"
kafkago "github.com/segmentio/kafka-go"
)

func (*Kafka) CreateTopic(address, topic string, partitions, replicationFactor int) error {
conn, err := kafkago.Dial("tcp", address)
if err != nil {
return err
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
return err
}
var controllerConn *kafkago.Conn
controllerConn, err = kafkago.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
return err
}
defer controllerConn.Close()

if partitions <= 0 {
partitions = 1
}

if replicationFactor <= 0 {
replicationFactor = 1
}

topicConfigs := []kafkago.TopicConfig{
kafka.TopicConfig{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
},
}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
return err
}

return nil
}

func (*Kafka) ListTopics(address string) ([]string, error) {
conn, err := kafkago.Dial("tcp", address)
if err != nil {
return nil, err
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
return nil, err
}

// There should be a better way to return unique set of
// topics instead of looping over them twice
topicSet := map[string]struct{}{}

for _, partition := range partitions {
topicSet[partition.Topic] = struct{}{}
}

topics := make([]string, 0, len(topicSet))
for topic := range topicSet {
topics = append(topics, topic)
}

return topics, nil
}

0 comments on commit c4fcac2

Please sign in to comment.