Skip to content

Commit

Permalink
EndTxnRequests (sent on commit/abort) are only retried in allowed sta…
Browse files Browse the repository at this point in the history
…tes (#3041)

Previously the transaction could hang on commit_transaction() if an abortable
error was hit and the EndTxnRequest was to be retried.
  • Loading branch information
edenhill committed Sep 23, 2020
1 parent e0ac7fb commit 2627ef7
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
If the application did not take action on failed messages in its delivery
report callback and went on to commit the transaction, the transaction would
be successfully committed, simply omitting the failed messages.
* EndTxnRequests (sent on commit/abort) are only retried in allowed
states (#3041).
Previously the transaction could hang on commit_transaction() if an abortable
error was hit and the EndTxnRequest was to be retried.



Expand Down
14 changes: 9 additions & 5 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ rd_kafka_mock_partition_log_append (rd_kafka_mock_partition_t *mpart,


/**
* @brief Set the partition leader
* @brief Set the partition leader, or NULL for leader-less.
*/
static void
rd_kafka_mock_partition_set_leader0 (rd_kafka_mock_partition_t *mpart,
Expand Down Expand Up @@ -1818,10 +1818,14 @@ rd_kafka_mock_cluster_cmd (rd_kafka_mock_cluster_t *mcluster,
if (!mpart)
return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;

mrkb = rd_kafka_mock_broker_find(mcluster,
rko->rko_u.mock.broker_id);
if (!mrkb)
return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE;
if (rko->rko_u.mock.broker_id != -1) {
mrkb = rd_kafka_mock_broker_find(
mcluster, rko->rko_u.mock.broker_id);
if (!mrkb)
return RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE;
} else {
mrkb = NULL;
}

rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Set %s [%"PRId32"] leader to %"PRId32,
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ rd_kafka_mock_topic_create (rd_kafka_mock_cluster_t *mcluster,
*
* The topic will be created if it does not exist.
*
* \p broker_id needs to be an existing broker.
* \p broker_id needs to be an existing broker, or -1 to make the
* partition leader-less.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_partition_set_leader (rd_kafka_mock_cluster_t *mcluster,
Expand Down
34 changes: 23 additions & 11 deletions src/rdkafka_txnmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -906,13 +906,15 @@ rd_kafka_txn_curr_api_abort_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
rd_kafka_txn_set_abortable_error(
rkts->rkts_rk,
RD_KAFKA_RESP_ERR__TIMED_OUT,
"Transactional operation timed out");
"Transactional API operation (%s) timed out",
rkq->rkq_rk->rk_eos.txn_curr_api.name);

rd_kafka_txn_curr_api_reply_error(
rkq,
rd_kafka_error_new_txn_requires_abort(
RD_KAFKA_RESP_ERR__TIMED_OUT,
"Transactional operation timed out"));
"Transactional API operation (%s) timed out",
rkq->rkq_rk->rk_eos.txn_curr_api.name));
}

/**
Expand Down Expand Up @@ -1917,7 +1919,7 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
rd_kafka_q_t *rkq = opaque;
int16_t ErrorCode;
int actions = 0;
rd_bool_t is_commit = rd_false;
rd_bool_t is_commit = rd_false, may_retry = rd_false;

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
rd_kafka_q_destroy(rkq);
Expand All @@ -1937,19 +1939,27 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
err = rkbuf->rkbuf_err;
err:
rd_kafka_wrlock(rk);
if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION)
if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION) {
is_commit = rd_true;
else if (rk->rk_eos.txn_state ==
RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)
may_retry = rd_true;
} else if (rk->rk_eos.txn_state ==
RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) {
is_commit = rd_false;
else
may_retry = rd_true;
} else if (!err)
err = RD_KAFKA_RESP_ERR__OUTDATED;

if (!err) {
/* EndTxn successful: complete the transaction */
rd_kafka_txn_complete(rk);
}

rd_kafka_dbg(rk, EOS, "ENDTXN",
"EndTxn failed due to %s in state %s (may_retry=%s)",
rd_kafka_err2name(err),
rd_kafka_txn_state2str(rk->rk_eos.txn_state),
RD_STR_ToF(may_retry));

rd_kafka_wrunlock(rk);

switch (err)
Expand All @@ -1959,10 +1969,12 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,

case RD_KAFKA_RESP_ERR__DESTROY:
/* Producer is being terminated, ignore the response. */
case RD_KAFKA_RESP_ERR__TIMED_OUT:
/* Transaction API timeout has been hit
* (this is our internal timer) */
case RD_KAFKA_RESP_ERR__OUTDATED:
/* Set a non-actionable actions flag so that curr_api_reply()
* is called below, without other side-effects. */
actions = RD_KAFKA_ERR_ACTION_SPECIAL;
/* Transactional state no longer relevant for this
* outdated response. */
break;

case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
Expand Down Expand Up @@ -1994,7 +2006,7 @@ static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
"Failed to end transaction: %s",
rd_kafka_err2str(err));

} else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
} else if (may_retry && actions & RD_KAFKA_ERR_ACTION_RETRY) {
if (rd_kafka_buf_retry(rkb, request))
return;
actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
Expand Down
4 changes: 4 additions & 0 deletions src/rdstring.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ void *rd_strtup_list_copy (const void *elem, void *opaque);
char *rd_flags2str (char *dst, size_t size,
const char **desc, int flags);


/** @returns "true" if EXPR is true, else "false" */
#define RD_STR_ToF(EXPR) ((EXPR) ? "true" : "false")

#endif /* _RDSTRING_H_ */
155 changes: 155 additions & 0 deletions tests/0105-transactions_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,159 @@ static void do_test_txn_auth_failure (int16_t ApiKey,
}


/**
* @brief Issue #3041: Commit fails due to message flush() taking too long,
* eventually resulting in an unabortable error and failure to
* re-init the transactional producer.
*/
static void do_test_txn_flush_timeout (void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_consumer_group_metadata_t *cgmetadata;
rd_kafka_error_t *error;
const char *txnid = "myTxnId";
const char *topic = "myTopic";
const int32_t coord_id = 2;
int msgcounter = 0;
rd_bool_t is_retry = rd_false;

TEST_SAY(_C_MAG "[ %s ]\n", __FUNCTION__);

rk = create_txn_producer(&mcluster, txnid, 3,
"message.timeout.ms", "10000",
"transaction.timeout.ms", "10000",
/* Speed up coordinator reconnect */
"reconnect.backoff.max.ms", "1000",
NULL);


/* Broker down is not a test-failing error */
test_curr->is_fatal_cb = error_is_fatal_cb;
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;

rd_kafka_mock_topic_create(mcluster, topic, 1, 3);

/* Set coordinator so we can disconnect it later */
rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);

/*
* Init transactions
*/
TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

retry:
if (!is_retry) {
/* First attempt should fail. */

test_curr->ignore_dr_err = rd_true;
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

/* Assign invalid partition leaders for some partitions so
* that messages will not be delivered. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);

} else {
/* The retry should succeed */
test_curr->ignore_dr_err = rd_false;
test_curr->exp_dr_err = is_retry ? RD_KAFKA_RESP_ERR_NO_ERROR :
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);

}


/*
* Start a transaction
*/
TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

/*
* Produce some messages to specific partitions and random.
*/
test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
&msgcounter);
test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA,
0, 0, 100, NULL, 10, &msgcounter);


/*
* Send some arbitrary offsets.
*/
offsets = rd_kafka_topic_partition_list_new(4);
rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
999999111;
rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
123456789;

cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
rk, offsets,
cgmetadata, -1));

rd_kafka_consumer_group_metadata_destroy(cgmetadata);
rd_kafka_topic_partition_list_destroy(offsets);

rd_sleep(2);

if (!is_retry) {
/* Now disconnect the coordinator. */
TEST_SAY("Disconnecting transaction coordinator %"PRId32"\n",
coord_id);
rd_kafka_mock_broker_set_down(mcluster, coord_id);
}

/*
* Start committing.
*/
error = rd_kafka_commit_transaction(rk, -1);

if (!is_retry) {
TEST_ASSERT(error != NULL,
"Expected commit to fail");
TEST_SAY("commit_transaction() failed (expectedly): %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);

} else {
TEST_ASSERT(!error,
"Expected commit to succeed, not: %s",
rd_kafka_error_string(error));
}

if (!is_retry) {
/*
* Bring the coordinator back up.
*/
rd_kafka_mock_broker_set_up(mcluster, coord_id);
rd_sleep(2);

/*
* Abort, and try again, this time without error.
*/
TEST_SAY("Aborting and retrying\n");
is_retry = rd_true;

TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
goto retry;
}

/* All done */

rd_kafka_destroy(rk);

TEST_SAY(_C_GRN "[ %s PASS ]\n", __FUNCTION__);
}


int main_0105_transactions_mock (int argc, char **argv) {
if (test_needs_auth()) {
TEST_SKIP("Mock cluster does not support SSL/SASL\n");
Expand Down Expand Up @@ -841,6 +994,8 @@ int main_0105_transactions_mock (int argc, char **argv) {
RD_KAFKAP_FindCoordinator,
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);

do_test_txn_flush_timeout();

if (!test_quick)
do_test_txn_switch_coordinator();

Expand Down

0 comments on commit 2627ef7

Please sign in to comment.