From 8d122efd5106a539f0cf2af6587b5fe02abe3d12 Mon Sep 17 00:00:00 2001 From: Mimi Wang Date: Mon, 21 Aug 2023 20:52:46 -0700 Subject: [PATCH 1/2] Add `CreateTopic` method to `MockCluster` This commit adds the `CreateTopic` method to `MockCluster`. This allows the creation of topics without having to use a `Producer`. --- kafka/mockcluster.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kafka/mockcluster.go b/kafka/mockcluster.go index 08bef23e9..d5afc87fb 100644 --- a/kafka/mockcluster.go +++ b/kafka/mockcluster.go @@ -92,6 +92,18 @@ func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration return nil } +// CreateTopic creates a topic without having to use a producer +func (mc *MockCluster) CreateTopic(topic string, partitons, replicationFactor int) error { + topicStr := C.CString(topic) + defer C.free(unsafe.Pointer(topicStr)) + + cError := C.rd_kafka_mock_topic_create(mc.mcluster, topicStr, C.int(partitons), C.int(replicationFactor)) + if cError != C.RD_KAFKA_RESP_ERR_NO_ERROR { + return newError(cError) + } + return nil +} + // Close and destroy the MockCluster func (mc *MockCluster) Close() { C.rd_kafka_mock_cluster_destroy(mc.mcluster) From 946370e9f7c6fbd739537779b3df871d711924e1 Mon Sep 17 00:00:00 2001 From: Mimi Wang Date: Tue, 29 Aug 2023 14:52:21 -0700 Subject: [PATCH 2/2] fix typo --- kafka/mockcluster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/mockcluster.go b/kafka/mockcluster.go index d5afc87fb..cb395f930 100644 --- a/kafka/mockcluster.go +++ b/kafka/mockcluster.go @@ -93,11 +93,11 @@ func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration } // CreateTopic creates a topic without having to use a producer -func (mc *MockCluster) CreateTopic(topic string, partitons, replicationFactor int) error { +func (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error { topicStr := C.CString(topic) defer C.free(unsafe.Pointer(topicStr)) - cError := C.rd_kafka_mock_topic_create(mc.mcluster, topicStr, C.int(partitons), C.int(replicationFactor)) + cError := C.rd_kafka_mock_topic_create(mc.mcluster, topicStr, C.int(partitions), C.int(replicationFactor)) if cError != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(cError) }