Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client certificate chain is now sent #4894

Open
wants to merge 9 commits into
base: dev_kip848_mock_handler_and_integration_tests
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
# librdkafka v2.6.2

librdkafka v2.6.2 is a maintenance release:

* Client certificate chain is now sent when using `ssl.certificate.pem`
or `ssl_certificate` or `ssl.keystore.location` (#4894).


## Fixes

### General fixes

* Issues: #3225.
Client certificate chain is now sent when using `ssl.certificate.pem`
or `ssl_certificate` or `ssl.keystore.location`.
Without that, broker must explicitly add any intermediate certification
authority certificate to its truststore to be able to accept client
certificate.
Happens since: 1.x (#4894).



# librdkafka v2.6.1

librdkafka v2.6.1 is a maintenance release:
Expand Down
27 changes: 26 additions & 1 deletion src/rdkafka_cert.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ static void rd_kafka_cert_destroy(rd_kafka_cert_t *cert) {

if (cert->x509)
X509_free(cert->x509);
if (cert->chain)
sk_X509_pop_free(cert->chain, X509_free);
if (cert->pkey)
EVP_PKEY_free(cert->pkey);
if (cert->store)
Expand Down Expand Up @@ -314,17 +316,26 @@ static rd_kafka_cert_t *rd_kafka_cert_new(const rd_kafka_conf_t *conf,
switch (encoding) {
case RD_KAFKA_CERT_ENC_PKCS12: {
EVP_PKEY *ign_pkey;
STACK_OF(X509) *ca = NULL;

action = "parse PKCS#12";
if (!PKCS12_parse(p12, conf->ssl.key_password,
&ign_pkey, &cert->x509, NULL))
&ign_pkey, &cert->x509, &ca))
goto fail;

EVP_PKEY_free(ign_pkey);

action = "retrieve public key";
if (!cert->x509)
goto fail;

if (ca) {
if (sk_X509_num(ca) > 0)
cert->chain = ca;
else
sk_X509_pop_free(cert->chain,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need to be sk_X509_pop_free(ca, instead? because cert->chain isn't assigned to yet.

X509_free);
}
} break;

case RD_KAFKA_CERT_ENC_DER:
Expand All @@ -341,6 +352,20 @@ static rd_kafka_cert_t *rd_kafka_cert_new(const rd_kafka_conf_t *conf,
(void *)conf);
if (!cert->x509)
goto fail;

cert->chain = sk_X509_new_null();
if (rd_kafka_ssl_read_cert_chain_from_BIO(
bio, cert->chain, rd_kafka_conf_ssl_passwd_cb,
(void *)conf) != 0) {
sk_X509_pop_free(cert->chain, X509_free);
cert->chain = NULL;
goto fail;
}

if (sk_X509_num(cert->chain) == 0) {
sk_X509_pop_free(cert->chain, X509_free);
cert->chain = NULL;
}
break;

default:
Expand Down
7 changes: 4 additions & 3 deletions src/rdkafka_cert.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ typedef struct rd_kafka_cert_s {
rd_kafka_cert_enc_t encoding;
rd_refcnt_t refcnt;
#if WITH_SSL
X509 *x509; /**< Certificate (public key) */
EVP_PKEY *pkey; /**< Private key */
X509_STORE *store; /**< CA certificate chain store */
X509 *x509; /**< Certificate (public key) */
STACK_OF(X509) * chain; /**< Certificate chain (public key) */
EVP_PKEY *pkey; /**< Private key */
X509_STORE *store; /**< CA trusted certificates */
#endif
} rd_kafka_cert_t;

Expand Down
146 changes: 118 additions & 28 deletions src/rdkafka_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -697,22 +697,92 @@ static EVP_PKEY *rd_kafka_ssl_PKEY_from_string(rd_kafka_t *rk,
return pkey;
}

/**
* Read a PEM formatted cert chain from BIO \p in into \p chainp .
*
* @param rk rdkafka instance.
* @param in BIO to read from.
* @param chainp Stack to push the certificates to.
*
* @return 0 on success, -1 on error.
*/
int rd_kafka_ssl_read_cert_chain_from_BIO(BIO *in,
STACK_OF(X509) * chainp,
pem_password_cb *password_cb,
void *password_cb_opaque) {
X509 *ca;
int r, ret = 0;
unsigned long err;
while (1) {
ca = X509_new();
if (ca == NULL) {
rd_assert(!*"X509_new() allocation failed");
}
if (PEM_read_bio_X509(in, &ca, password_cb,
password_cb_opaque) != NULL) {
r = sk_X509_push(chainp, ca);
if (!r) {
X509_free(ca);
ret = -1;
goto end;
}
} else {
X509_free(ca);
break;
}
}
/* When the while loop ends, it's usually just EOF. */
err = ERR_peek_last_error();
if (ERR_GET_LIB(err) == ERR_LIB_PEM &&
ERR_GET_REASON(err) == PEM_R_NO_START_LINE)
ret = 0;
else
ret = -1; /* some real error */
ERR_clear_error();
end:
return ret;
}

/**
* @brief Parse a PEM-formatted string into an X509 object.
* Rest of CA chain is pushed to the \p chainp stack.
*
* @param str Input PEM string, nul-terminated
* @param str Input PEM string, nul-terminated.
* @param chainp Stack to push the certificates to.
*
* @returns a new X509 on success or NULL on error.
*
* @remark When NULL is returned the chainp stack is not modified.
*/
static X509 *rd_kafka_ssl_X509_from_string(rd_kafka_t *rk, const char *str) {
static X509 *rd_kafka_ssl_X509_from_string(rd_kafka_t *rk,
const char *str,
STACK_OF(X509) * chainp) {
BIO *bio = BIO_new_mem_buf((void *)str, -1);
X509 *x509;

x509 =
PEM_read_bio_X509(bio, NULL, rd_kafka_transport_ssl_passwd_cb, rk);

BIO_free(bio);
if (!x509) {
BIO_free(bio);
return NULL;
}

if (rd_kafka_ssl_read_cert_chain_from_BIO(
bio, chainp, rd_kafka_transport_ssl_passwd_cb, rk) != 0) {
/* Rest of the certificate is present,
* but couldn't be read,
* returning NULL as certificate cannot be verified
* without its chain. */
rd_kafka_log(rk, LOG_WARNING, "SSL",
"Failed to read certificate chain from PEM. "
"Returning NULL certificate too.");
X509_free(x509);
BIO_free(bio);
return NULL;
}

BIO_free(bio);
return x509;
}

Expand Down Expand Up @@ -1169,6 +1239,20 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,
rd_snprintf(errstr, errstr_size, "ssl_cert failed: ");
return -1;
}

if (rk->rk_conf.ssl.cert->chain) {
r = SSL_CTX_set0_chain(ctx,
rk->rk_conf.ssl.cert->chain);
if (r != 1) {
rd_snprintf(errstr, errstr_size,
"ssl_cert failed: "
"setting certificate chain: ");
return -1;
} else {
/* The chain is now owned by the CTX */
rk->rk_conf.ssl.cert->chain = NULL;
}
}
}

if (rk->rk_conf.ssl.cert_location) {
Expand All @@ -1188,16 +1272,21 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,

if (rk->rk_conf.ssl.cert_pem) {
X509 *x509;
STACK_OF(X509) *ca = sk_X509_new_null();
if (!ca) {
rd_assert(!*"sk_X509_new_null() allocation failed");
}

rd_kafka_dbg(rk, SECURITY, "SSL",
"Loading public key from string");

x509 =
rd_kafka_ssl_X509_from_string(rk, rk->rk_conf.ssl.cert_pem);
x509 = rd_kafka_ssl_X509_from_string(
rk, rk->rk_conf.ssl.cert_pem, ca);
if (!x509) {
rd_snprintf(errstr, errstr_size,
"ssl.certificate.pem failed: "
"not in PEM format?: ");
sk_X509_pop_free(ca, X509_free);
return -1;
}

Expand All @@ -1207,11 +1296,25 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,

if (r != 1) {
rd_snprintf(errstr, errstr_size,
"ssl.certificate.pem failed: ");
"ssl.certificate.pem failed: "
"setting main certificate: ");
sk_X509_pop_free(ca, X509_free);
return -1;
}
}

if (sk_X509_num(ca) == 0) {
sk_X509_pop_free(ca, X509_free);
} else {
r = SSL_CTX_set0_chain(ctx, ca);
if (r != 1) {
rd_snprintf(errstr, errstr_size,
"ssl.certificate.pem failed: "
"setting certificate chain: ");
sk_X509_pop_free(ca, X509_free);
return -1;
}
}
}

/*
* ssl_key, ssl.key.location and ssl.key.pem
Expand Down Expand Up @@ -1284,8 +1387,8 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,
* ssl.keystore.location
*/
if (rk->rk_conf.ssl.keystore_location) {
EVP_PKEY *pkey;
X509 *cert;
EVP_PKEY *pkey = NULL;
X509 *cert = NULL;
STACK_OF(X509) *ca = NULL;
BIO *bio;
PKCS12 *p12;
Expand Down Expand Up @@ -1313,8 +1416,6 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,
return -1;
}

pkey = EVP_PKEY_new();
cert = X509_new();
if (!PKCS12_parse(p12, rk->rk_conf.ssl.keystore_password, &pkey,
&cert, &ca)) {
EVP_PKEY_free(pkey);
Expand All @@ -1330,28 +1431,17 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,
return -1;
}

if (ca != NULL)
sk_X509_pop_free(ca, X509_free);

PKCS12_free(p12);
BIO_free(bio);

r = SSL_CTX_use_certificate(ctx, cert);
X509_free(cert);
if (r != 1) {
EVP_PKEY_free(pkey);
rd_snprintf(errstr, errstr_size,
"Failed to use ssl.keystore.location "
"certificate: ");
return -1;
}

r = SSL_CTX_use_PrivateKey(ctx, pkey);
EVP_PKEY_free(pkey);
r = SSL_CTX_use_cert_and_key(ctx, cert, pkey, ca, 1);
RD_IF_FREE(cert, X509_free);
RD_IF_FREE(pkey, EVP_PKEY_free);
if (ca != NULL)
sk_X509_pop_free(ca, X509_free);
if (r != 1) {
rd_snprintf(errstr, errstr_size,
"Failed to use ssl.keystore.location "
"private key: ");
"Failed to use ssl.keystore.location: ");
return -1;
}

Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_ssl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ int rd_kafka_ssl_hmac(rd_kafka_broker_t *rkb,
int itcnt,
rd_chariov_t *out);

int rd_kafka_ssl_read_cert_chain_from_BIO(BIO *in,
STACK_OF(X509) * chainp,
pem_password_cb *password_cb,
void *password_cb_opaque);

#endif /* _RDKAFKA_SSL_H_ */
Loading