From b22edfd62a1db5caae64694c1334b22e2dd569b7 Mon Sep 17 00:00:00 2001 From: duyuqi Date: Tue, 25 Jul 2023 16:48:48 +0800 Subject: [PATCH] Execute kinit once to check sasl config before main thread starts TODO: unit test and whether ssl need fix like this. --- src/rdkafka_sasl_cyrus.c | 69 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/src/rdkafka_sasl_cyrus.c b/src/rdkafka_sasl_cyrus.c index 89ff15c427..42299d8b99 100644 --- a/src/rdkafka_sasl_cyrus.c +++ b/src/rdkafka_sasl_cyrus.c @@ -193,6 +193,71 @@ render_callback(const char *key, char *buf, size_t size, void *opaque) { } +/** + * @brief The first execute kinit to refresh ticket before main thread starts. + * + * @returns 0 on success, -1 on error. + * + # @locality Called before rdkafka main thread + */ +static int rd_kafka_sasl_cyrus_check_kinit(rd_kafka_t *rk) { + int r; + char *cmd; + char errstr[128]; + + /* Build kinit refresh command line using string rendering and config */ + cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd, errstr, + sizeof(errstr), render_callback, rk); + if (!cmd) { + rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT", + "Failed to construct kinit command " + "from sasl.kerberos.kinit.cmd template: %s", + errstr); + return -1; + } + + /* Execute kinit */ + rd_kafka_dbg(rk, SECURITY, "SASL_CHECK_KINIT", + "Check Kerberos ticket with command: %s", cmd); + + /* Prevent multiple simultaneous refreshes by the same process to + * avoid Kerberos credential cache corruption. */ + mtx_lock(&rd_kafka_sasl_cyrus_kinit_lock); + r = system(cmd); + mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock); + if (r == -1) { + if (errno == ECHILD) { + rd_kafka_log(rk, LOG_WARNING, "SASL_CHECK_KINIT", + "Kerberos ticket refresh command " + "returned ECHILD: %s: exit status " + "unknown, assuming success", + cmd); + } else { + rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT", + "Kerberos ticket refresh failed: %s: %s", + cmd, rd_strerror(errno)); + rd_free(cmd); + return -1; + } + } else if (WIFSIGNALED(r)) { + rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT", + "Kerberos ticket refresh failed: %s: " + "received signal %d", + cmd, WTERMSIG(r)); + rd_free(cmd); + return -1; + } else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) { + rd_kafka_log(rk, LOG_ERR, "SASL_CHECK_KINIT", + "Kerberos ticket refresh failed: %s: " + "exited with code %d", + cmd, WEXITSTATUS(r)); + rd_free(cmd); + return -1; + } + + return 0; +} + /** * @brief Execute kinit to refresh ticket. * @@ -622,6 +687,10 @@ rd_kafka_sasl_cyrus_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) return 0; /* kinit not configured, no need to start timer */ + int ret = rd_kafka_sasl_cyrus_check_kinit(rk); + if (ret != 0) + return ret; + handle = rd_calloc(1, sizeof(*handle)); rk->rk_sasl.handle = handle;