Skip to content

Commit

Permalink
Treat cluster authentication failures as fatal in transactional produ…
Browse files Browse the repository at this point in the history
…cer (#2994)
  • Loading branch information
edenhill committed Sep 4, 2020
1 parent c30fc06 commit ed6a4a7
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/rdkafka_coord.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ rd_kafka_coord_req_handle_FindCoordinator (rd_kafka_t *rk,
RD_KAFKA_ERR_ACTION_PERMANENT,
RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,

RD_KAFKA_ERR_ACTION_PERMANENT,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,

RD_KAFKA_ERR_ACTION_REFRESH,
RD_KAFKA_RESP_ERR__TRANSPORT,

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_idempotence.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ rd_bool_t rd_kafka_idemp_check_error (rd_kafka_t *rk,
case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
case RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT:
case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED:
if (rd_kafka_is_transactional(rk))
rd_kafka_txn_set_fatal_error(rk, RD_DONT_LOCK,
err, "%s", errstr);
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_txnmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn (rd_kafka_t *rk,
break;

case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING:
case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
Expand Down Expand Up @@ -1454,6 +1455,7 @@ static void rd_kafka_txn_handle_TxnOffsetCommit (rd_kafka_t *rk,
break;

case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING:
case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
Expand Down Expand Up @@ -1664,6 +1666,7 @@ static void rd_kafka_txn_handle_AddOffsetsToTxn (rd_kafka_t *rk,
break;

case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT:
Expand Down Expand Up @@ -1971,6 +1974,7 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,

case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
actions |= RD_KAFKA_ERR_ACTION_FATAL;
break;
Expand Down Expand Up @@ -2486,6 +2490,7 @@ rd_kafka_txn_handle_FindCoordinator (rd_kafka_t *rk,
return;

case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED:
rd_kafka_wrlock(rk);
rd_kafka_txn_set_fatal_error(
rkb->rkb_rk, RD_DONT_LOCK, err,
Expand Down
52 changes: 52 additions & 0 deletions tests/0105-transactions_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,50 @@ static void do_test_txns_no_timeout_crash (void) {
}


/**
* @brief Test auth failure handling.
*/
static void do_test_txn_auth_failure (int16_t ApiKey,
rd_kafka_resp_err_t ErrorCode) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_error_t *error;

TEST_SAY(_C_MAG "[ %s ApiKey=%s ErrorCode=%s ]\n", __FUNCTION__,
rd_kafka_ApiKey2str(ApiKey), rd_kafka_err2name(ErrorCode));

rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

rd_kafka_mock_push_request_errors(mcluster,
ApiKey,
1,
ErrorCode);

error = rd_kafka_init_transactions(rk, 5000);
TEST_ASSERT(error, "Expected init_transactions() to fail");

TEST_SAY("init_transactions() failed: %s: %s\n",
rd_kafka_err2name(rd_kafka_error_code(error)),
rd_kafka_error_string(error));
TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
"Expected error %s, not %s",
rd_kafka_err2name(ErrorCode),
rd_kafka_err2name(rd_kafka_error_code(error)));
TEST_ASSERT(rd_kafka_error_is_fatal(error),
"Expected error to be fatal");
TEST_ASSERT(!rd_kafka_error_is_retriable(error),
"Expected error to not be retriable");
rd_kafka_error_destroy(error);

/* All done */

rd_kafka_destroy(rk);

TEST_SAY(_C_GRN "[ %s ApiKey=%s ErrorCode=%s PASS ]\n", __FUNCTION__,
rd_kafka_ApiKey2str(ApiKey), rd_kafka_err2name(ErrorCode));
}


int main_0105_transactions_mock (int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
Expand All @@ -783,6 +827,14 @@ int main_0105_transactions_mock (int argc, char **argv) {

do_test_txns_no_timeout_crash();

do_test_txn_auth_failure(
RD_KAFKAP_InitProducerId,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);

do_test_txn_auth_failure(
RD_KAFKAP_FindCoordinator,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);

if (!test_quick)
do_test_txn_switch_coordinator();

Expand Down

0 comments on commit ed6a4a7

Please sign in to comment.