diff --git a/README.md b/README.md index ac9235f..7bae077 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,9 @@ To avoid getting the following error while running the test: Failed to write message: [5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes ``` -Just make sure you create the topics in Apache Kafka in advance: +You can now use `createTopic` function to create topics in Kafka. The `scripts/test_topics.js` script shows how to list topics on all Kakfa partitions and also how to create a topic. + +You always have the option to create it using `kafka-topics` command: ```bash $ docker exec -it lensesio bash diff --git a/scripts/test_topics.js b/scripts/test_topics.js new file mode 100644 index 0000000..4ea3349 --- /dev/null +++ b/scripts/test_topics.js @@ -0,0 +1,29 @@ +/* + +This is a k6 test script that imports the xk6-kafka and +list topics on all Kafka partitions and creates a topic. + +*/ + +import { + createTopic, + listTopics +} from 'k6/x/kafka'; // import kafka extension + +const address = "localhost:9092"; +const kafkaTopic = "xk6_kafka_test_topic"; + +const results = listTopics(address) +const error = createTopic(address, kafkaTopic); + +export default function () { + results.forEach(topic => console.log(topic)); + + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully created or already exists + console.log("Topic created successfully"); + } else { + console.log("Error while creating topic: ", error); + } +} diff --git a/topic.go b/topic.go new file mode 100644 index 0000000..03b4a31 --- /dev/null +++ b/topic.go @@ -0,0 +1,79 @@ +package kafka + +import ( + "net" + "strconv" + + "github.com/segmentio/kafka-go" + kafkago "github.com/segmentio/kafka-go" +) + +func (*Kafka) CreateTopic(address, topic string, partitions, replicationFactor int) error { + conn, err := kafkago.Dial("tcp", address) + if err != nil { + return err + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + return err + } + var controllerConn *kafkago.Conn + controllerConn, err = kafkago.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + return err + } + defer controllerConn.Close() + + if partitions <= 0 { + partitions = 1 + } + + if replicationFactor <= 0 { + replicationFactor = 1 + } + + topicConfigs := []kafkago.TopicConfig{ + kafka.TopicConfig{ + Topic: topic, + NumPartitions: partitions, + ReplicationFactor: replicationFactor, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + return err + } + + return nil +} + +func (*Kafka) ListTopics(address string) ([]string, error) { + conn, err := kafkago.Dial("tcp", address) + if err != nil { + return nil, err + } + defer conn.Close() + + partitions, err := conn.ReadPartitions() + if err != nil { + return nil, err + } + + // There should be a better way to return unique set of + // topics instead of looping over them twice + topicSet := map[string]struct{}{} + + for _, partition := range partitions { + topicSet[partition.Topic] = struct{}{} + } + + topics := make([]string, 0, len(topicSet)) + for topic := range topicSet { + topics = append(topics, topic) + } + + return topics, nil +}