Skip to content

Commit

Permalink
cgrp: Roundrobin assignor handles unsubscribed topics (confluentinc#2121
Browse files Browse the repository at this point in the history
)

Check next roundrobin consumer is in the topic's eligible list,
otherwise select the next eligible consumer.
Previous code overflowed members[next] and crashed.
  • Loading branch information
wilmai committed Feb 12, 2020
1 parent 5f80976 commit 00a0a3b
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 9 deletions.
34 changes: 25 additions & 9 deletions src/rdkafka_roundrobin_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,37 @@ rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti];
int partition;

/* Sort eligible members by name */
rd_list_sort(&eligible_topic->members,
rd_kafka_group_member_cmp);

/* For each topic+partition, assign one member (in a cyclic
* iteration) per partition until the partitions are exhausted*/
for (partition = 0 ;
partition < eligible_topic->metadata->partition_cnt ;
partition++) {
rd_kafka_group_member_t *rkgm;
rd_kafka_group_member_t *next_rkgm = &members[next];

/* Find if next member is eligible */
rd_kafka_group_member_t *rkgm = rd_list_find(
&eligible_topic->members, next_rkgm,
rd_kafka_group_member_cmp);

/* Not found; find first eligible member >= next */
if (rkgm == NULL) {
int i;
rd_kafka_group_member_t *eligible_rkgm;

/* Scan through members until we find one with a
* subscription to this topic. */
while (!rd_kafka_group_member_find_subscription(
rk, &members[next],
eligible_topic->metadata->topic))
next++;
/* Default to first */
rkgm = rd_list_elem(&eligible_topic->members, 0);

rkgm = &members[next];
RD_LIST_FOREACH(eligible_rkgm, &eligible_topic->members, i) {
if (rd_kafka_group_member_cmp(eligible_rkgm, next_rkgm) >= 0) {
rkgm = eligible_rkgm;
break;
}
}
}

rd_kafka_dbg(rk, CGRP, "ASSIGN",
"roundrobin: Member \"%s\": "
Expand All @@ -102,7 +118,7 @@ rd_kafka_roundrobin_assignor_assign_cb (rd_kafka_t *rk,
rkgm->rkgm_assignment,
eligible_topic->metadata->topic, partition);

next = (next+1) % rd_list_cnt(&eligible_topic->members);
next = (next+1) % member_cnt;
}
}

Expand Down
144 changes: 144 additions & 0 deletions tests/0107-regex_rebalance.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2020, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "test.h"

/**
* Issue #2121:
* Test handling of asymmetric subscription race condition; if two consumers
* subscribe to "^.*", a new topic may not have been subscribed
* by both at the time rebalance is triggered.
*/

#include "rdkafka.h"


static rd_kafka_t *c1, *c2;
static rd_kafka_resp_err_t state1, state2;

static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *parts, void *opaque) {
rd_kafka_resp_err_t *statep = NULL;

if (rk == c1)
statep = &state1;
else if (rk == c2)
statep = &state2;
else
TEST_FAIL("Invalid rk %p", rk);

TEST_SAY("Rebalance for %s: %s:\n", rd_kafka_name(rk), rd_kafka_err2str(err));
test_print_partition_list(parts);

if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
rd_kafka_assign(rk, parts);
else if (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
rd_kafka_assign(rk, NULL);

*statep = err;
}

static void test_regex_rebalance(const char *assignor) {
const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1);
int64_t ts_start;
int wait_sec;
char newtopic[256];
rd_kafka_conf_t *conf;
char *topic_rgxa;
char *topic_rgxb;

state1 = state2 = 0;

rd_snprintf(newtopic, sizeof(newtopic), "%s.a", topic);

test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500");
test_conf_set(conf, "group.id", "rdkafkatest.consumer1");
test_conf_set(conf, "partition.assignment.strategy", assignor);

TEST_SAY("Creating 2 consumers\n");
c1 = test_create_consumer(topic, rebalance_cb, rd_kafka_conf_dup(conf), NULL);
c2 = test_create_consumer(topic, rebalance_cb, rd_kafka_conf_dup(conf), NULL);
rd_kafka_conf_destroy(conf);

TEST_SAY("Creating topic %s with 3 partitions\n", newtopic);
test_create_topic(c1, newtopic, 3, 1);

TEST_SAY("Subscribing\n");
// to force asymmetric subscriptions, cons1 = [ab], cons2 = [ac]
topic_rgxa = rd_strdup(tsprintf("^%s\\.[ab]", topic));
topic_rgxb = rd_strdup(tsprintf("^%s\\.[ac]", topic));
test_consumer_subscribe(c1, topic_rgxa);
test_consumer_subscribe(c2, topic_rgxb);


TEST_SAY("Waiting for initial assignment for both consumers\n");
while (state1 != RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ||
state2 != RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
test_consumer_poll_no_msgs("wait-rebalance", c1, 0, 1000);
test_consumer_poll_no_msgs("wait-rebalance", c2, 0, 1000);
}


TEST_SAY("Creating additional topics\n");
strcpy(newtopic, topic);
strcat(newtopic, ".b");
test_create_topic(c1, newtopic, 3, 1);
strcpy(newtopic, topic);
strcat(newtopic, ".c");
test_create_topic(c1, newtopic, 3, 1);

TEST_SAY("Wait 10 seconds for consumers not to crash\n");
wait_sec = test_quick ? 3 : 10;
ts_start = test_clock();
do {
test_consumer_poll_no_msgs("wait-stable", c1, 0, 1000);
test_consumer_poll_no_msgs("wait-stable", c2, 0, 1000);
} while (test_clock() < ts_start + (wait_sec * 1000000));

TEST_ASSERT(state1 == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
"Expected consumer 1 to have assignment, not in state %s",
rd_kafka_err2str(state1));
TEST_ASSERT(state2 == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
"Expected consumer 2 to have assignment, not in state %s",
rd_kafka_err2str(state2));

free(topic_rgxa);
free(topic_rgxb);
test_consumer_close(c1);
rd_kafka_destroy(c1);
test_consumer_close(c2);
rd_kafka_destroy(c2);
}

int main_0107_regex_rebalance (int argc, char **argv) {
test_regex_rebalance("range");
test_regex_rebalance("roundrobin");

return 0;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ set(
0104-fetch_from_follower_mock.c
0105-transactions_mock.c
0106-cgrp_sess_timeout.c
0107-regex_rebalance.c
0108-client_swname.c
8000-idle.cpp
test.c
Expand Down
2 changes: 2 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ _TEST_DECL(0103_transactions);
_TEST_DECL(0104_fetch_from_follower_mock);
_TEST_DECL(0105_transactions_mock);
_TEST_DECL(0106_cgrp_sess_timeout);
_TEST_DECL(0107_regex_rebalance);
_TEST_DECL(0108_client_swname);

/* Manual tests */
Expand Down Expand Up @@ -389,6 +390,7 @@ struct test tests[] = {
TEST_BRKVER(2,4,0,0)),
_TEST(0105_transactions_mock, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
_TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
_TEST(0107_regex_rebalance, 0, TEST_BRKVER(0,9,0,0)),
_TEST(0108_client_swname, 0),

/* Manual tests */
Expand Down
1 change: 1 addition & 0 deletions win32/tests/tests.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
<ClCompile Include="..\..\tests\0104-fetch_from_follower_mock.c" />
<ClCompile Include="..\..\tests\0105-transactions_mock.c" />
<ClCompile Include="..\..\tests\0106-cgrp_sess_timeout.c" />
<ClCompile Include="..\..\tests\0107-regex_rebalance.c" />
<ClCompile Include="..\..\tests\0108-client_swname.c" />
<ClCompile Include="..\..\tests\8000-idle.cpp" />
<ClCompile Include="..\..\tests\test.c" />
Expand Down

0 comments on commit 00a0a3b

Please sign in to comment.