You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to use the mock for unit tests, but I can't seem to find any example on how to use it.
I have pieced the following from the doc, tests and various posts, but I cannot seem to make the consuming part work, received_message is empty.
Is there an example somewhere that I could use as reference?
Thanks.
#include<string>
#include<librdkafka/rdkafka.h>
#include<librdkafka/rdkafka_mock.h>
#include"gtest/gtest.h"classKafkaClusterMock {
public:structTopicDescription {
std::string topic_name;
int partition_cnt;
int replication_factor;
};
KafkaClusterMock(int broker_cnt, const std::vector<TopicDescription> &topics)
: rk_(nullptr), mcluster_(nullptr) {
char errstr[256];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr));
if (rd_kafka_conf_set(conf, "bootstrap.servers", ",", errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
throwstd::runtime_error(std::string("Failed to initialise 'bootstrap.servers' error: ") +
errstr);
}
rk_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (nullptr == rk_) {
throwstd::runtime_error(std::string("Failed to create mock cluster rd_kafka_t: ") + errstr);
}
mcluster_ = rd_kafka_mock_cluster_new(rk_, broker_cnt);
if (nullptr == mcluster_) {
throwstd::runtime_error("Failed to acquire mock cluster");
}
createTopics(topics);
}
~KafkaClusterMock() {
rd_kafka_mock_cluster_destroy(mcluster_);
rd_kafka_destroy(rk_);
}
std::string bootstraps() const { returnrd_kafka_mock_cluster_bootstraps(mcluster_); }
protected:voidcreateTopics(const std::vector<KafkaClusterMock::TopicDescription> &topics) {
for (constauto &topic : topics) {
rd_kafka_resp_err_t topic_err = rd_kafka_mock_topic_create(
mcluster_, topic.topic_name.data(), topic.partition_cnt, topic.replication_factor);
if (RD_KAFKA_RESP_ERR_NO_ERROR != topic_err) {
throwstd::runtime_error("Failed to create the mock topic (" + topic.topic_name +
"): " + rd_kafka_err2str(topic_err));
}
seedTopic(topic.topic_name);
}
}
staticvoiddr_msg_cb(rd_kafka_t *rk, constrd_kafka_message_t *rkmessage, void *opaque) {
int* remains = static_cast<int*>(rkmessage->_private);
if (rkmessage->err == 0) {
*remains -= 1;
}
}
voidseedTopic(const std::string &topic_name) {
char errstr[256] = {'\0'};
std::string bootstraps_servers = bootstraps();
rd_kafka_conf_t *conf = rd_kafka_conf_new();
//rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr));rd_kafka_conf_set_dr_msg_cb(conf, &dr_msg_cb);
if (rd_kafka_conf_set(conf, "bootstrap.servers", bootstraps_servers.data(), errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
throwstd::runtime_error("Failed to configure 'bootstrap.servers' to seed the topic " +
topic_name + "error: " + errstr);
}
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (nullptr == rk) {
throwstd::runtime_error("Failed to create RdKafka producer to seed the topic " + topic_name +
"error: " + errstr);
}
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
// Make sure all replicas are in-sync after producing so that consume test wont fail.rd_kafka_conf_res_t conf_result =
rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1", errstr, sizeof(errstr));
if (conf_result != RD_KAFKA_CONF_OK) {
throwstd::runtime_error(std::string("Invalid configuration request.required.acks error: ") +
errstr);
}
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name.data(), topic_conf);
if (nullptr == rkt) {
throwstd::runtime_error("Failed to create RdKafka topic " + topic_name);
}
int remains = 1;
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, nullptr, 0, nullptr, 0,
&remains) == -1) {
throwstd::runtime_error("Failed to produce a message on " + topic_name + " to seed it");
}
while (remains > 0 && rd_kafka_outq_len(rk) > 0) {
rd_kafka_poll(rk, 1000);
}
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
if (remains != 0) {
throwstd::runtime_error("Failed to delivered a message on " + topic_name + " to seed it");
}
}
private:rd_kafka_t *rk_;
rd_kafka_mock_cluster_t *mcluster_;
};
classBasicProducer {
public:explicitBasicProducer(const std::string &bootstraps_servers, const std::string &topic_name) {
char errstr[256] = {'\0'};
rd_kafka_conf_t *conf = rd_kafka_conf_new();
// rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr));rd_kafka_conf_set_dr_msg_cb(conf, &dr_msg_cb);
if (rd_kafka_conf_set(conf, "bootstrap.servers", bootstraps_servers.data(), errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
throwstd::runtime_error("Failed to configure 'bootstrap.servers' for producer for topic " +
topic_name + "error: " + errstr);
}
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (nullptr == rk) {
throwstd::runtime_error("Failed to create RdKafka producer for topic " + topic_name +
"error: " + errstr);
}
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
// Make sure all replicas are in-sync after producing so that consume test wont fail.rd_kafka_conf_res_t conf_result =
rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1", errstr, sizeof(errstr));
if (conf_result != RD_KAFKA_CONF_OK) {
throwstd::runtime_error(std::string("Invalid configuration request.required.acks error: ") +
errstr);
}
rkt = rd_kafka_topic_new(rk, topic_name.data(), topic_conf);
if (nullptr == rkt) {
throwstd::runtime_error("Failed to create RdKafka topic " + topic_name);
}
}
~BasicProducer() {
if (nullptr != rk) {
rd_kafka_flush(rk, 60 * 1000);
}
if (nullptr != rkt) {
rd_kafka_topic_destroy(rkt);
}
if (nullptr != rk) {
rd_kafka_destroy(rk);
}
}
// Blocking produce.boolproduce(std::string_view message) {
bool produced = false;
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
const_cast<void *>(static_cast<constvoid *>(message.data())),
message.length(), nullptr, 0, &produced) == -1) {
return produced;
}
while (!produced && rd_kafka_outq_len(rk) > 0) {
rd_kafka_poll(rk, 1000);
}
return produced;
}
protected:staticvoiddr_msg_cb(rd_kafka_t *, constrd_kafka_message_t *rkmessage, void *) {
bool *delivered = static_cast<bool *>(rkmessage->_private);
*delivered = rkmessage->err == 0;
}
private:rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
};
classBasicConsumer {
public:explicitBasicConsumer(const std::string &bootstraps_servers, const std::string &topic_name) {
char errstr[256] = {'\0'};
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr));
if (rd_kafka_conf_set(conf, "bootstrap.servers", bootstraps_servers.data(), errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
throwstd::runtime_error("Failed to configure 'bootstrap.servers' for the topic " +
topic_name + " error: " + errstr);
}
if (rd_kafka_conf_set(conf, "group.id", "test_group", errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
throwstd::runtime_error("Failed to configure 'group.id' for the topic " + topic_name +
" error: " + errstr);
}
// if (rd_kafka_conf_set(conf, "enable.partition.eof", "true", errstr, sizeof(errstr)) !=// RD_KAFKA_CONF_OK) {// throw std::runtime_error("Failed to configure 'enable.partition.eof' for the topic " +// topic_name + " error: " + errstr);// }
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (nullptr == rk) {
throwstd::runtime_error("Failed to create RdKafka consumer for the topic " + topic_name +
" error: " + errstr);
}
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic_name.c_str(), 0);
if (auto err = rd_kafka_subscribe(rk, topics); err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(topics);
throwstd::runtime_error("Failed to subscribe to topic " + topic_name +
" error: " + rd_kafka_err2str(err));
}
rd_kafka_topic_partition_list_destroy(topics);
}
~BasicConsumer() {
if (nullptr != rk) {
rd_kafka_unsubscribe(rk);
while (rd_kafka_outq_len(rk) > 0) {
rd_kafka_poll(rk, 10);
}
rd_kafka_destroy(rk);
}
}
// Blocking receive.
std::optional<std::string> receiveOne() {
rd_kafka_poll(rk, 0);
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 10000);
std::optional<std::string> message;
if (rkmessage && rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
message = std::string(static_cast<char *>(rkmessage->payload), rkmessage->len);
rd_kafka_message_destroy(rkmessage);
}
return message;
}
private:rd_kafka_t *rk;
};
TEST(KafkaClusterMockSuite, ProduceAndReceive) {
constexprauto brokers_count = 3;
auto topics = std::vector<KafkaClusterMock::TopicDescription>{
{.topic_name = "test", .partition_cnt = 1, .replication_factor = 1}};
KafkaClusterMock kafka_cluster_mock(brokers_count, topics);
BasicProducer basic_kafka_producer(kafka_cluster_mock.bootstraps(), topics[0].topic_name);
BasicConsumer basic_kafka_consumer(kafka_cluster_mock.bootstraps(), topics[0].topic_name);
EXPECT_TRUE(basic_kafka_producer.produce("hello"));
auto received_message = basic_kafka_consumer.receiveOne();
ASSERT_TRUE(received_message);
EXPECT_EQ(received_message, "hello");
}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello,
I am trying to use the mock for unit tests, but I can't seem to find any example on how to use it.
I have pieced the following from the doc, tests and various posts, but I cannot seem to make the consuming part work,
received_message
is empty.Is there an example somewhere that I could use as reference?
Thanks.
Beta Was this translation helpful? Give feedback.
All reactions