Skip to content

Commit

Permalink
Mock cluster: add initial support for balanced consumer groups
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Feb 5, 2020
1 parent 45a1e38 commit 03c0100
Show file tree
Hide file tree
Showing 7 changed files with 1,237 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(
rdkafka_coord.c
rdkafka_mock.c
rdkafka_mock_handlers.c
rdkafka_mock_cgrp.c
rdlist.c
rdlog.c
rdmurmur2.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdvarint.c rdbuf.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
$(SRCS_y)

HDRS= rdkafka.h rdkafka_mock.h
Expand Down
47 changes: 44 additions & 3 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,27 @@ rd_kafka_mock_cluster_io_set_events (rd_kafka_mock_cluster_t *mcluster,
rd_assert(!*"mock_cluster_io_set_events: fd not found");
}

/**
* @brief Set or clear single IO events for fd
*/
static void
rd_kafka_mock_cluster_io_set_event (rd_kafka_mock_cluster_t *mcluster,
rd_socket_t fd, rd_bool_t set, int event) {
int i;

for (i = 0 ; i < mcluster->fd_cnt ; i++) {
if (mcluster->fds[i].fd == fd) {
if (set)
mcluster->fds[i].events |= event;
else
mcluster->fds[i].events &= ~event;
return;
}
}

rd_assert(!*"mock_cluster_io_set_event: fd not found");
}


/**
* @brief Clear IO events for fd
Expand Down Expand Up @@ -624,6 +645,8 @@ static void rd_kafka_mock_connection_close (rd_kafka_mock_connection_t *mconn,
rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT),
reason);

rd_kafka_mock_cgrps_connection_closed(mconn->broker->cluster, mconn);

rd_kafka_timer_stop(&mconn->broker->cluster->timers,
&mconn->write_tmr, rd_true);

Expand All @@ -644,7 +667,6 @@ static void rd_kafka_mock_connection_close (rd_kafka_mock_connection_t *mconn,


void rd_kafka_mock_connection_send_response (rd_kafka_mock_connection_t *mconn,
const rd_kafka_buf_t *request,
rd_kafka_buf_t *resp) {

resp->rkbuf_reshdr.Size =
Expand All @@ -655,8 +677,8 @@ void rd_kafka_mock_connection_send_response (rd_kafka_mock_connection_t *mconn,
rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK",
"Broker %"PRId32": Sending %sResponseV%hd to %s",
mconn->broker->id,
rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
request->rkbuf_reqhdr.ApiVersion,
rd_kafka_ApiKey2str(resp->rkbuf_reqhdr.ApiKey),
resp->rkbuf_reqhdr.ApiVersion,
rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT));

/* Set up a buffer reader for sending the buffer. */
Expand Down Expand Up @@ -800,6 +822,9 @@ rd_kafka_mock_connection_read_request (rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_t *rd_kafka_mock_buf_new_response (const rd_kafka_buf_t *request) {
rd_kafka_buf_t *rkbuf = rd_kafka_buf_new(1, 100);

/* Copy request header so the ApiVersion remains known */
rkbuf->rkbuf_reqhdr = request->rkbuf_reqhdr;

/* Size, updated later */
rd_kafka_buf_write_i32(rkbuf, 0);

Expand Down Expand Up @@ -979,6 +1004,16 @@ static void rd_kafka_mock_connection_io (rd_kafka_mock_cluster_t *mcluster,
}


/**
* @brief Set connection as blocking, POLLIN will not be served.
*/
void rd_kafka_mock_connection_set_blocking (rd_kafka_mock_connection_t *mconn,
rd_bool_t blocking) {
rd_kafka_mock_cluster_io_set_event(mconn->broker->cluster,
mconn->transport->rktrans_s,
!blocking, POLLIN);
}


static rd_kafka_mock_connection_t *
rd_kafka_mock_connection_new (rd_kafka_mock_broker_t *mrkb, rd_socket_t fd,
Expand Down Expand Up @@ -1809,6 +1844,7 @@ static void
rd_kafka_mock_cluster_destroy0 (rd_kafka_mock_cluster_t *mcluster) {
rd_kafka_mock_topic_t *mtopic;
rd_kafka_mock_broker_t *mrkb;
rd_kafka_mock_cgrp_t *mcgrp;
rd_kafka_mock_coord_t *mcoord;
rd_kafka_mock_error_stack_t *errstack;
thrd_t dummy_rkb_thread;
Expand All @@ -1820,6 +1856,9 @@ rd_kafka_mock_cluster_destroy0 (rd_kafka_mock_cluster_t *mcluster) {
while ((mrkb = TAILQ_FIRST(&mcluster->brokers)))
rd_kafka_mock_broker_destroy(mrkb);

while ((mcgrp = TAILQ_FIRST(&mcluster->cgrps)))
rd_kafka_mock_cgrp_destroy(mcgrp);

while ((mcoord = TAILQ_FIRST(&mcluster->coords)))
rd_kafka_mock_coord_destroy(mcluster, mcoord);

Expand Down Expand Up @@ -1922,6 +1961,8 @@ rd_kafka_mock_cluster_t *rd_kafka_mock_cluster_new (rd_kafka_t *rk,
mcluster->defaults.partition_cnt = 4;
mcluster->defaults.replication_factor = RD_MIN(3, broker_cnt);

TAILQ_INIT(&mcluster->cgrps);

TAILQ_INIT(&mcluster->coords);

TAILQ_INIT(&mcluster->errstacks);
Expand Down
Loading

0 comments on commit 03c0100

Please sign in to comment.