Skip to content

Commit

Permalink
Execute kinit once to check sasl config before main thread starts
Browse files Browse the repository at this point in the history
TODO: unit test and whether ssl need fix like this.
  • Loading branch information
duyuqi committed Jul 26, 2023
1 parent c07a335 commit b22edfd
Showing 1 changed file with 69 additions and 0 deletions.
69 changes: 69 additions & 0 deletions src/rdkafka_sasl_cyrus.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,71 @@ render_callback(const char *key, char *buf, size_t size, void *opaque) {
}


/**
* @brief The first execute kinit to refresh ticket before main thread starts.
*
* @returns 0 on success, -1 on error.
*
# @locality Called before rdkafka main thread
*/
static int rd_kafka_sasl_cyrus_check_kinit(rd_kafka_t *rk) {
int r;
char *cmd;
char errstr[128];

/* Build kinit refresh command line using string rendering and config */
cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd, errstr,
sizeof(errstr), render_callback, rk);
if (!cmd) {
rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT",
"Failed to construct kinit command "
"from sasl.kerberos.kinit.cmd template: %s",
errstr);
return -1;
}

/* Execute kinit */
rd_kafka_dbg(rk, SECURITY, "SASL_CHECK_KINIT",
"Check Kerberos ticket with command: %s", cmd);

/* Prevent multiple simultaneous refreshes by the same process to
* avoid Kerberos credential cache corruption. */
mtx_lock(&rd_kafka_sasl_cyrus_kinit_lock);
r = system(cmd);
mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock);
if (r == -1) {
if (errno == ECHILD) {
rd_kafka_log(rk, LOG_WARNING, "SASL_CHECK_KINIT",
"Kerberos ticket refresh command "
"returned ECHILD: %s: exit status "
"unknown, assuming success",
cmd);
} else {
rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT",
"Kerberos ticket refresh failed: %s: %s",
cmd, rd_strerror(errno));
rd_free(cmd);
return -1;
}
} else if (WIFSIGNALED(r)) {
rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT",
"Kerberos ticket refresh failed: %s: "
"received signal %d",
cmd, WTERMSIG(r));
rd_free(cmd);
return -1;
} else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) {
rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT",
"Kerberos ticket refresh failed: %s: "
"exited with code %d",
cmd, WEXITSTATUS(r));
rd_free(cmd);
return -1;
}

return 0;
}

/**
* @brief Execute kinit to refresh ticket.
*
Expand Down Expand Up @@ -622,6 +687,10 @@ rd_kafka_sasl_cyrus_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) {
strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
return 0; /* kinit not configured, no need to start timer */

int ret = rd_kafka_sasl_cyrus_check_kinit(rk);
if (ret != 0)
return ret;

handle = rd_calloc(1, sizeof(*handle));
rk->rk_sasl.handle = handle;

Expand Down

0 comments on commit b22edfd

Please sign in to comment.