Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker reauthentication [KIP-368] #4301

Merged
merged 9 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ librdkafka v2.2.0 is a feature release:
(#4184, #4291, #4252).
* Fix several bugs with sticky assignor in case of partition ownership
changing between members of the consumer group (#4252).
* [KIP-368](https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate):
Allow SASL Connections to Periodically Re-Authenticate
(#4301, started by @vctoriawu).
* Avoid treating an OpenSSL error as a permanent error and treat unclean SSL
closes as normal ones (#4294).

Expand Down
4 changes: 2 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1917,7 +1917,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-359 - Producer: use EpochLeaderId | 2.4.0 | Not supported |
| KIP-360 - Improve handling of unknown Idempotent Producer | 2.5.0 | Supported |
| KIP-361 - Consumer: add config to disable auto topic creation | 2.3.0 | Supported |
| KIP-368 - SASL periodic reauth | 2.2.0 | Not supported |
| KIP-368 - SASL periodic reauth | 2.2.0 | Supported |
| KIP-369 - Always roundRobin partitioner | 2.4.0 | Not supported |
| KIP-389 - Consumer group max size | 2.2.0 | Supported (error is propagated to application, but the consumer does not raise a fatal error) |
| KIP-392 - Allow consumers to fetch from closest replica | 2.4.0 | Supported |
Expand Down Expand Up @@ -1996,7 +1996,7 @@ release of librdkafka.
| 28 | TxnOffsetCommit | 3 | 3 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 1 |
| 36 | SaslAuthenticate | 2 | 0 |
| 36 | SaslAuthenticate | 2 | 1 |
emasab marked this conversation as resolved.
Show resolved Hide resolved
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 47 | OffsetDelete | 0 | 0 |
Expand Down
100 changes: 95 additions & 5 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@
static const int rd_kafka_max_block_ms = 1000;

const char *rd_kafka_broker_state_names[] = {
"INIT", "DOWN", "TRY_CONNECT", "CONNECT", "SSL_HANDSHAKE",
"AUTH_LEGACY", "UP", "UPDATE", "APIVERSION_QUERY", "AUTH_HANDSHAKE",
"AUTH_REQ"};
"INIT", "DOWN", "TRY_CONNECT", "CONNECT", "SSL_HANDSHAKE",
"AUTH_LEGACY", "UP", "UPDATE", "APIVERSION_QUERY", "AUTH_HANDSHAKE",
"AUTH_REQ", "REAUTH"};

const char *rd_kafka_secproto_names[] = {
[RD_KAFKA_PROTO_PLAINTEXT] = "plaintext",
Expand Down Expand Up @@ -573,6 +573,8 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
rkb->rkb_recv_buf = NULL;
}

rkb->rkb_reauth_in_progress = rd_false;

va_start(ap, fmt);
rd_kafka_broker_set_error(rkb, level, err, fmt, ap);
va_end(ap);
Expand All @@ -591,6 +593,11 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
old_state = rkb->rkb_state;
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);

/* Stop any pending reauth timer, since a teardown/reconnect will
* require a new timer. */
rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr,
1 /*lock*/);

/* Unlock broker since a requeue will try to lock it. */
rd_kafka_broker_unlock(rkb);

Expand Down Expand Up @@ -1834,7 +1841,7 @@ static rd_kafka_buf_t *rd_kafka_waitresp_find(rd_kafka_broker_t *rkb,
*/
static int rd_kafka_req_response(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf) {
rd_kafka_buf_t *req;
rd_kafka_buf_t *req = NULL;
int log_decode_errors = LOG_ERR;

rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
Expand Down Expand Up @@ -2237,7 +2244,8 @@ static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) {
*/
void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) {

rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_reauth_in_progress = rd_false;

rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
Expand Down Expand Up @@ -3451,6 +3459,20 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
wakeup = rd_true;
break;

case RD_KAFKA_OP_SASL_REAUTH:
rd_rkb_dbg(rkb, BROKER, "REAUTH", "Received REAUTH op");

/* We don't need a lock for rkb_max_inflight. It's changed only
* on the broker thread. */
rkb->rkb_max_inflight = 1;

rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_REAUTH);
rd_kafka_broker_unlock(rkb);

wakeup = rd_true;
break;

default:
rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type");
break;
Expand Down Expand Up @@ -4528,8 +4550,15 @@ static int rd_kafka_broker_thread_main(void *arg) {
rd_kafka_broker_addresses_exhausted(rkb))
rd_kafka_broker_update_reconnect_backoff(
rkb, &rkb->rkb_rk->rk_conf, rd_clock());
/* If we haven't made progress from the last state, and
* if we have exceeded
* socket_connection_setup_timeout_ms, then error out.
* Don't error out in case this is a reauth, for which
* socket_connection_setup_timeout_ms is not
* applicable. */
else if (
rkb->rkb_state == orig_state &&
!rkb->rkb_reauth_in_progress &&
rd_clock() >=
(rkb->rkb_ts_connect +
(rd_ts_t)rk->rk_conf
Expand All @@ -4544,6 +4573,22 @@ static int rd_kafka_broker_thread_main(void *arg) {

break;

case RD_KAFKA_BROKER_STATE_REAUTH:
/* Since we've already authenticated once, the provider
* should be ready. */
rd_assert(rd_kafka_sasl_ready(rkb->rkb_rk));

/* Since we aren't disconnecting, the transport isn't
* destroyed, and as a consequence, some of the SASL
* state leaks unless we destroy it before the reauth.
*/
rd_kafka_sasl_close(rkb->rkb_transport);

rkb->rkb_reauth_in_progress = rd_true;

rd_kafka_broker_connect_auth(rkb);
break;

case RD_KAFKA_BROKER_STATE_UPDATE:
/* FALLTHRU */
case RD_KAFKA_BROKER_STATE_UP:
Expand Down Expand Up @@ -4672,6 +4717,9 @@ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
mtx_unlock(&rkb->rkb_logname_lock);
mtx_destroy(&rkb->rkb_logname_lock);

rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr,
1 /*lock*/);

mtx_destroy(&rkb->rkb_lock);

rd_refcnt_destroy(&rkb->rkb_refcnt);
Expand Down Expand Up @@ -5851,6 +5899,48 @@ void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon) {
rd_kafka_broker_destroy(rkb);
}

/**
* @brief Starts the reauth timer for this broker.
* If connections_max_reauth_ms=0, then no timer is set.
*
* @locks none
* @locality broker thread
*/
void rd_kafka_broker_start_reauth_timer(rd_kafka_broker_t *rkb,
int64_t connections_max_reauth_ms) {
/* Timer should not already be started in any case. It indicates that
* we're about to schedule an extra reauth somehow. But this shouldn't
* be a cause for failure in production use cases. */
rd_dassert(!rd_kafka_timer_is_started(&rkb->rkb_rk->rk_timers,
milindl marked this conversation as resolved.
Show resolved Hide resolved
&rkb->rkb_sasl_reauth_tmr));
if (rd_kafka_timer_is_started(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr))
rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr, 1 /*lock*/);

if (connections_max_reauth_ms == 0)
return;

rd_kafka_timer_start_oneshot(
&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr, rd_false,
connections_max_reauth_ms * 900 /* 90% * microsecond*/,
rd_kafka_broker_start_reauth_cb, (void *)rkb);
}

/**
* @brief Starts the reauth process for the broker rkb.
*
* @locks none
* @locality main thread
*/
void rd_kafka_broker_start_reauth_cb(rd_kafka_timers_t *rkts, void *_rkb) {
rd_kafka_op_t *rko = NULL;
rd_kafka_broker_t *rkb = (rd_kafka_broker_t *)_rkb;
rd_dassert(rkb);
rko = rd_kafka_op_new(RD_KAFKA_OP_SASL_REAUTH);
rd_kafka_q_enq(rkb->rkb_ops, rko);
}

/**
* @name Unit tests
* @{
Expand Down
12 changes: 12 additions & 0 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef enum {
RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE,
RD_KAFKA_BROKER_STATE_AUTH_REQ,
RD_KAFKA_BROKER_STATE_REAUTH,
} rd_kafka_broker_state_t;

/**
Expand Down Expand Up @@ -252,6 +253,9 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
/** Absolute time of last connection attempt. */
rd_ts_t rkb_ts_connect;

/** True if a reauthentication is in progress. */
rd_bool_t rkb_reauth_in_progress;

/**< Persistent connection demand is tracked by
* a counter for each type of demand.
* The broker thread will maintain a persistent connection
Expand Down Expand Up @@ -323,6 +327,9 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
rd_kafka_resp_err_t err; /**< Last error code */
int cnt; /**< Number of identical errors */
} rkb_last_err;


rd_kafka_timer_t rkb_sasl_reauth_tmr;
};

#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
Expand Down Expand Up @@ -602,6 +609,11 @@ void rd_kafka_broker_monitor_add(rd_kafka_broker_monitor_t *rkbmon,

void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon);

void rd_kafka_broker_start_reauth_timer(rd_kafka_broker_t *rkb,
int64_t connections_max_reauth_ms);

void rd_kafka_broker_start_reauth_cb(rd_kafka_timers_t *rkts, void *rkb);

int unittest_broker(void);

#endif /* _RDKAFKA_BROKER_H_ */
2 changes: 1 addition & 1 deletion src/rdkafka_feature.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ static const struct rd_kafka_feature_map {
.depends =
{
{RD_KAFKAP_SaslHandshake, 1, 1},
{RD_KAFKAP_SaslAuthenticate, 0, 0},
{RD_KAFKAP_SaslAuthenticate, 0, 1},
{-1},
},
},
Expand Down
10 changes: 6 additions & 4 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
[RD_KAFKA_OP_TXN] = "REPLY:TXN",
[RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
"REPLY:GET_REBALANCE_PROTOCOL",
[RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS",
[RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER",
[RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS",
[RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER",
[RD_KAFKA_OP_SASL_REAUTH] = "REPLY:SASL_REAUTH",
};

if (type & RD_KAFKA_OP_REPLY)
Expand Down Expand Up @@ -253,8 +254,9 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
[RD_KAFKA_OP_TXN] = sizeof(rko->rko_u.txn),
[RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
sizeof(rko->rko_u.rebalance_protocol),
[RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders),
[RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY,
[RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders),
[RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY,
[RD_KAFKA_OP_SASL_REAUTH] = _RD_KAFKA_OP_EMPTY,
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ typedef enum {
RD_KAFKA_OP_GET_REBALANCE_PROTOCOL, /**< Get rebalance protocol */
RD_KAFKA_OP_LEADERS, /**< Partition leader query */
RD_KAFKA_OP_BARRIER, /**< Version barrier bump */
RD_KAFKA_OP_SASL_REAUTH, /**< Sasl reauthentication for broker */
RD_KAFKA_OP__END
} rd_kafka_op_type_t;

Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2611,6 +2611,18 @@ void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk,

rd_kafka_buf_read_bytes(rkbuf, &auth_data);

if (request->rkbuf_reqhdr.ApiVersion >= 1) {
int64_t session_lifetime_ms;
rd_kafka_buf_read_i64(rkbuf, &session_lifetime_ms);

if (session_lifetime_ms)
rd_kafka_dbg(
rk, SECURITY, "REAUTH",
"Received session lifetime %ld ms from broker",
session_lifetime_ms);
rd_kafka_broker_start_reauth_timer(rkb, session_lifetime_ms);
}

/* Pass SASL auth frame to SASL handler */
if (rd_kafka_sasl_recv(rkb->rkb_transport, auth_data.data,
(size_t)RD_KAFKAP_BYTES_LEN(&auth_data), errstr,
Expand Down Expand Up @@ -2644,6 +2656,8 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
rd_kafka_buf_t *rkbuf;
int16_t ApiVersion;
int features;

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0);

Expand All @@ -2658,6 +2672,10 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
* close down the connection and reconnect on failure. */
rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_SaslAuthenticate, 0, 1, &features);
rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

if (replyq.q)
rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
opaque);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque);

void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
const void *buf,
size_t size,
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_sasl.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ int rd_kafka_sasl_io_event(rd_kafka_transport_t *rktrans,
* @remark May be called on non-SASL transports (no-op)
*/
void rd_kafka_sasl_close(rd_kafka_transport_t *rktrans) {
/* The broker might not be up, and the transport might not exist in that
* case.*/
if (!rktrans)
return;

const struct rd_kafka_sasl_provider *provider =
rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.provider;

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_sasl_cyrus.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ static void rd_kafka_sasl_cyrus_close(struct rd_kafka_transport_s *rktrans) {
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
}
rd_free(state);
rktrans->rktrans_sasl.state = NULL;
milindl marked this conversation as resolved.
Show resolved Hide resolved
}


Expand Down
19 changes: 11 additions & 8 deletions src/rdkafka_sasl_oauthbearer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,7 @@ static void rd_kafka_sasl_oauthbearer_close(rd_kafka_transport_t *rktrans) {
rd_free(state->md_principal_name);
rd_list_destroy(&state->extensions);
rd_free(state);
rktrans->rktrans_sasl.state = NULL;
}


Expand Down Expand Up @@ -1300,6 +1301,16 @@ static int rd_kafka_sasl_oauthbearer_init(rd_kafka_t *rk,
rd_list_init(&handle->extensions, 0,
(void (*)(void *))rd_strtup_destroy);


if (rk->rk_conf.sasl.enable_callback_queue) {
/* SASL specific callback queue enabled */
rk->rk_sasl.callback_q = rd_kafka_q_new(rk);
handle->callback_q = rd_kafka_q_keep(rk->rk_sasl.callback_q);
} else {
/* Use main queue */
handle->callback_q = rd_kafka_q_keep(rk->rk_rep);
}

rd_kafka_timer_start(
&rk->rk_timers, &handle->token_refresh_tmr, 1 * 1000 * 1000,
rd_kafka_sasl_oauthbearer_token_refresh_tmr_cb, rk);
Expand All @@ -1316,14 +1327,6 @@ static int rd_kafka_sasl_oauthbearer_init(rd_kafka_t *rk,
return 0;
}

if (rk->rk_conf.sasl.enable_callback_queue) {
/* SASL specific callback queue enabled */
rk->rk_sasl.callback_q = rd_kafka_q_new(rk);
handle->callback_q = rd_kafka_q_keep(rk->rk_sasl.callback_q);
} else {
/* Use main queue */
handle->callback_q = rd_kafka_q_keep(rk->rk_rep);
}

#if WITH_OAUTHBEARER_OIDC
if (rk->rk_conf.sasl.oauthbearer.method ==
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_sasl_scram.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static void rd_kafka_sasl_scram_close(rd_kafka_transport_t *rktrans) {
RD_IF_FREE(state->first_msg_bare.ptr, rd_free);
RD_IF_FREE(state->ServerSignatureB64, rd_free);
rd_free(state);
rktrans->rktrans_sasl.state = NULL;
}


Expand Down
Loading