Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
1. On broker fail, reset the timer and the reauth flag.
2. rktrans state should be set to NULL after freeing for all the
   providers.
3. Add a reauth flag to prevent socket_connection_setup_timeout_ms
   related timeouts which should not occur in reauths
4. Test fixes (timeout)
  • Loading branch information
milindl committed Jun 14, 2023
1 parent df1fd02 commit 68868fd
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 7 deletions.
35 changes: 29 additions & 6 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
rkb->rkb_recv_buf = NULL;
}

rkb->rkb_reauth_in_progress = rd_false;

va_start(ap, fmt);
rd_kafka_broker_set_error(rkb, level, err, fmt, ap);
va_end(ap);
Expand All @@ -591,6 +593,11 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
old_state = rkb->rkb_state;
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);

/* Stop any pending reauth timer, since a teardown/reconnect will
* require a new timer. */
rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr,
1 /*lock*/);

/* Unlock broker since a requeue will try to lock it. */
rd_kafka_broker_unlock(rkb);

Expand Down Expand Up @@ -2237,7 +2244,8 @@ static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) {
*/
void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) {

rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_reauth_in_progress = rd_false;

rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
Expand Down Expand Up @@ -4542,8 +4550,15 @@ static int rd_kafka_broker_thread_main(void *arg) {
rd_kafka_broker_addresses_exhausted(rkb))
rd_kafka_broker_update_reconnect_backoff(
rkb, &rkb->rkb_rk->rk_conf, rd_clock());
/* If we haven't made progress from the last state, and
* if we have exceeded
* socket_connection_setup_timeout_ms, then error out.
* Don't error out in case this is a reauth, for which
* socket_connection_setup_timeout_ms is not
* applicable. */
else if (
rkb->rkb_state == orig_state &&
!rkb->rkb_reauth_in_progress &&
rd_clock() >=
(rkb->rkb_ts_connect +
(rd_ts_t)rk->rk_conf
Expand All @@ -4569,6 +4584,8 @@ static int rd_kafka_broker_thread_main(void *arg) {
*/
rd_kafka_sasl_close(rkb->rkb_transport);

rkb->rkb_reauth_in_progress = rd_true;

rd_kafka_broker_connect_auth(rkb);
break;

Expand Down Expand Up @@ -5891,12 +5908,18 @@ void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon) {
*/
void rd_kafka_broker_start_reauth_timer(rd_kafka_broker_t *rkb,
int64_t connections_max_reauth_ms) {
/* Timer should not already be started in any case. */
rd_assert(!rd_kafka_timer_is_started(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr));
if (connections_max_reauth_ms == 0) {
/* Timer should not already be started in any case. It indicates that
* we're about to schedule an extra reauth somehow. But this shouldn't
* be a cause for failure in production use cases. */
rd_dassert(!rd_kafka_timer_is_started(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr));
if (rd_kafka_timer_is_started(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr))
rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr, 1 /*lock*/);

if (connections_max_reauth_ms == 0)
return;
}

rd_kafka_timer_start_oneshot(
&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr, rd_false,
Expand Down
3 changes: 3 additions & 0 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
/** Absolute time of last connection attempt. */
rd_ts_t rkb_ts_connect;

/** True if a reauthentication is in progress. */
rd_bool_t rkb_reauth_in_progress;

/**< Persistent connection demand is tracked by
* a counter for each type of demand.
* The broker thread will maintain a persistent connection
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_sasl.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ int rd_kafka_sasl_io_event(rd_kafka_transport_t *rktrans,
* @remark May be called on non-SASL transports (no-op)
*/
void rd_kafka_sasl_close(rd_kafka_transport_t *rktrans) {
/* The broker might not be up, and the transport might not exist in that
* case.*/
if (!rktrans)
return;

const struct rd_kafka_sasl_provider *provider =
rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.provider;

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_sasl_cyrus.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ static void rd_kafka_sasl_cyrus_close(struct rd_kafka_transport_s *rktrans) {
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
}
rd_free(state);
rktrans->rktrans_sasl.state = NULL;
}


Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_sasl_scram.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static void rd_kafka_sasl_scram_close(rd_kafka_transport_t *rktrans) {
RD_IF_FREE(state->first_msg_bare.ptr, rd_free);
RD_IF_FREE(state->ServerSignatureB64, rd_free);
rd_free(state);
rktrans->rktrans_sasl.state = NULL;
}


Expand Down
2 changes: 1 addition & 1 deletion tests/0142-reauthentication.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void do_test_txn_producer(int64_t reauth_time,
test_conf_init(&conf, NULL, 30);
test_conf_set(conf, "transactional.id", topic);
test_conf_set(conf, "transaction.timeout.ms",
tsprintf("%ld", (int64_t)(reauth_time * 1.2 + 2000)));
tsprintf("%ld", (int64_t)(reauth_time * 1.2 + 60000)));
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
Expand Down

0 comments on commit 68868fd

Please sign in to comment.