Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move can_q_contain_fetched_msgs inside q_serve #4431

Merged
merged 6 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ librdkafka v2.3.0 is a feature release:
(#4454, started by @migarc1).
* Fix to ensure permanent errors during offset validation continue being retried and
don't cause an offset reset (#4447).
* Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with
consume_cb (#4431).


## Fixes
Expand Down Expand Up @@ -55,6 +57,12 @@ librdkafka v2.3.0 is a feature release:
would cause an offset reset.
This isn't what's expected or what the Java implementation does.
Solved by retrying even in case of permanent errors (#4447).
* If using `rd_kafka_poll_set_consumer`, along with a consume callback, and then
calling `rd_kafka_poll` to service the callbacks, would not reset
`max.poll.interval.ms.` This was because we were only checking `rk_rep` for
consumer messages, while the method to service the queue internally also
services the queue forwarded to from `rk_rep`, which is `rkcg_q`.
Solved by moving the `max.poll.interval.ms` check into `rd_kafka_q_serve` (#4431).


## Upgrade considerations
Expand Down
26 changes: 0 additions & 26 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4005,36 +4005,19 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,

int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
int r;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rk);

r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);

return r;
}


rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
rd_kafka_op_t *rko;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);


if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

if (!rko)
return NULL;
Expand All @@ -4044,18 +4027,9 @@ rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {

int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
int r;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK);

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkqu->rkqu_rk);

return r;
}

Expand Down
30 changes: 29 additions & 1 deletion src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,16 @@ rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq,

rd_kafka_yield_thread = 0;
if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK);

struct timespec timeout_tspec;

rd_timeout_init_timespec_us(&timeout_tspec, timeout_us);

if (timeout_us && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rkq->rkq_rk);

while (1) {
rd_kafka_op_res_t res;
/* Keep track of current lock status to avoid
Expand Down Expand Up @@ -440,15 +446,24 @@ rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq,
goto retry; /* Next op */
} else if (unlikely(res ==
RD_KAFKA_OP_RES_YIELD)) {
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(
rkq->rkq_rk);
/* Callback yielded, unroll */
return NULL;
} else
} else {
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(
rkq->rkq_rk);
break; /* Proper op, handle below. */
}
}

if (unlikely(rd_kafka_q_check_yield(rkq))) {
if (is_locked)
mtx_unlock(&rkq->rkq_lock);
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkq->rkq_rk);
return NULL;
}

Expand All @@ -458,6 +473,8 @@ rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq,
if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
&timeout_tspec) != thrd_success) {
mtx_unlock(&rkq->rkq_lock);
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rkq->rkq_rk);
return NULL;
}
}
Expand Down Expand Up @@ -503,6 +520,8 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq,
rd_kafka_q_t *fwdq;
int cnt = 0;
struct timespec timeout_tspec;
const rd_bool_t can_q_contain_fetched_msgs =
rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK);

rd_dassert(cb_type);

Expand All @@ -520,8 +539,12 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq,
return ret;
}


rd_timeout_init_timespec(&timeout_tspec, timeout_ms);

if (timeout_ms && can_q_contain_fetched_msgs)
rd_kafka_app_poll_blocking(rk);

/* Wait for op */
while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
!rd_kafka_q_check_yield(rkq) &&
Expand All @@ -533,6 +556,8 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq,

if (!rko) {
mtx_unlock(&rkq->rkq_lock);
if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);
return 0;
}

Expand Down Expand Up @@ -567,6 +592,9 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq,
}
}

if (can_q_contain_fetched_msgs)
rd_kafka_app_polled(rk);

rd_kafka_q_destroy_owner(&localq);

return cnt;
Expand Down
78 changes: 69 additions & 9 deletions tests/0089-max_poll_interval.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,25 @@ static void do_test_with_log_queue(void) {
* should suffice.
* We test with the result of rd_kafka_queue_get_consumer, and an arbitrary
* queue that is forwarded to by the result of rd_kafka_queue_get_consumer.
* We also test with an arbitrary queue that is forwarded to the the result of
* rd_kafka_queue_get_consumer.
*/
static void
do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) {
do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q,
rd_bool_t forward_to_consumer_q) {
const char *topic = test_mk_topic_name("0089_max_poll_interval", 1);
rd_kafka_conf_t *conf;
char groupid[64];
rd_kafka_t *rk = NULL;
rd_kafka_queue_t *consumer_queue = NULL;
rd_kafka_event_t *event = NULL;
rd_kafka_queue_t *polling_queue = NULL;
rd_kafka_t *rk = NULL;
rd_kafka_queue_t *consumer_queue = NULL;
rd_kafka_queue_t *forwarder_queue = NULL;
rd_kafka_event_t *event = NULL;
rd_kafka_queue_t *polling_queue = NULL;

SUB_TEST("Testing with forward_to_another_q = %d",
forward_to_another_q);
SUB_TEST(
"Testing with forward_to_another_q = %d, forward_to_consumer_q = "
"%d",
forward_to_another_q, forward_to_consumer_q);

test_create_topic(NULL, topic, 1, 1);

Expand All @@ -393,6 +399,10 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) {
if (forward_to_another_q) {
polling_queue = rd_kafka_queue_new(rk);
rd_kafka_queue_forward(consumer_queue, polling_queue);
} else if (forward_to_consumer_q) {
forwarder_queue = rd_kafka_queue_new(rk);
rd_kafka_queue_forward(forwarder_queue, consumer_queue);
polling_queue = forwarder_queue;
} else
polling_queue = consumer_queue;

Expand Down Expand Up @@ -430,17 +440,67 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) {

if (forward_to_another_q)
rd_kafka_queue_destroy(polling_queue);
if (forward_to_consumer_q)
rd_kafka_queue_destroy(forwarder_queue);
rd_kafka_queue_destroy(consumer_queue);
test_consumer_close(rk);
rd_kafka_destroy(rk);

SUB_TEST_PASS();
}

static void consume_cb(rd_kafka_message_t *rkmessage, void *opaque) {
TEST_SAY("Consume callback\n");
}

/**
* @brief Test that max.poll.interval.ms is reset when
* rd_kafka_poll is called with consume_cb.
* See issue #4421.
*/
static void do_test_max_poll_reset_with_consumer_cb(void) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation comment above this test
We can also mention issue # for the original case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added.

const char *topic = test_mk_topic_name("0089_max_poll_interval", 1);
rd_kafka_conf_t *conf;
char groupid[64];
rd_kafka_t *rk = NULL;

SUB_TEST();

test_create_topic(NULL, topic, 1, 1);
uint64_t testid = test_id_generate();

test_produce_msgs_easy(topic, testid, -1, 100);

test_str_id_generate(groupid, sizeof(groupid));
test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "session.timeout.ms", "10000");
test_conf_set(conf, "max.poll.interval.ms", "10000" /*10s*/);
test_conf_set(conf, "partition.assignment.strategy", "range");
rd_kafka_conf_set_consume_cb(conf, consume_cb);

rk = test_create_consumer(groupid, NULL, conf, NULL);
rd_kafka_poll_set_consumer(rk);

test_consumer_subscribe(rk, topic);
TEST_SAY("Subscribed to %s and sleeping for 5 s\n", topic);
rd_sleep(5);
rd_kafka_poll(rk, 10);
TEST_SAY(
"Polled and sleeping again for 6s. Max poll should be reset\n");
rd_sleep(6);

/* Poll should work */
rd_kafka_poll(rk, 10);
test_consumer_close(rk);
rd_kafka_destroy(rk);
}

int main_0089_max_poll_interval(int argc, char **argv) {
do_test();
do_test_with_log_queue();
do_test_rejoin_after_interval_expire(rd_false);
do_test_rejoin_after_interval_expire(rd_true);
do_test_rejoin_after_interval_expire(rd_false, rd_false);
do_test_rejoin_after_interval_expire(rd_true, rd_false);
do_test_rejoin_after_interval_expire(rd_false, rd_true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a test for the specific case that brought this bug to light, too, which uses rd_kafka_conf_set_consume_cb to set a consume_cb, and the calls rd_kafka_poll.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new test, though test_wait_event is unable to capture RD_KAFKA_EVENT_REBALANCE not sure why, so haven't added any assert in the test. But added a poll, which should fail if the consumer has left the group.

do_test_max_poll_reset_with_consumer_cb();
return 0;
}