Skip to content

Commit

Permalink
Messages that timed out locally would not fail the ongoing transaction
Browse files Browse the repository at this point in the history
If the application did not take action on failed messages in its delivery
report callback and went on to commit the transaction, the transaction would
be successfully committed, simply omitting the failed messages.
  • Loading branch information
edenhill committed Sep 23, 2020
1 parent b390801 commit e0ac7fb
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 1 deletion.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
if a topic was deleted from the cluster when a transaction was using it.
* `ERR_KAFKA_STORAGE_ERROR` is now correctly treated as a retriable
produce error (#3026).
* Messages that timed out locally would not fail the ongoing transaction.
If the application did not take action on failed messages in its delivery
report callback and went on to commit the transaction, the transaction would
be successfully committed, simply omitting the failed messages.



# librdkafka v1.5.0
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2740,6 +2740,10 @@ void rd_kafka_dr_msgq (rd_kafka_topic_t *rkt,
if (unlikely(rd_kafka_msgq_len(rkmq) == 0))
return;

if (err && rd_kafka_is_transactional(rk))
rd_atomic64_add(&rk->rk_eos.txn_dr_fails,
rd_kafka_msgq_len(rkmq));

/* Call on_acknowledgement() interceptors */
rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err);

Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ struct rd_kafka_s {
/**< Partitions added and registered to transaction. */
rd_kafka_toppar_tqhead_t txn_rktps;

/**< Number of messages that failed delivery.
* If this number is >0 on transaction_commit then an
* abortable transaction error will be raised.
* Is reset to zero on each begin_transaction(). */
rd_atomic64_t txn_dr_fails;

/**< Current transaction error. */
rd_kafka_resp_err_t txn_err;

Expand Down
25 changes: 24 additions & 1 deletion src/rdkafka_txnmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,7 @@ rd_kafka_txn_op_begin_transaction (rd_kafka_t *rk,
rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION);

rk->rk_eos.txn_req_cnt = 0;
rd_atomic64_set(&rk->rk_eos.txn_dr_fails, 0);
rk->rk_eos.txn_err = RD_KAFKA_RESP_ERR_NO_ERROR;
RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free);
rk->rk_eos.txn_errstr = NULL;
Expand Down Expand Up @@ -2031,6 +2032,7 @@ rd_kafka_txn_op_commit_transaction (rd_kafka_t *rk,
rd_kafka_resp_err_t err;
char errstr[512];
rd_kafka_pid_t pid;
int64_t dr_fails;

if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
return RD_KAFKA_OP_RES_HANDLED;
Expand All @@ -2051,6 +2053,18 @@ rd_kafka_txn_op_commit_transaction (rd_kafka_t *rk,
goto err;
}

/* If any messages failed delivery the transaction must be aborted. */
dr_fails = rd_atomic64_get(&rk->rk_eos.txn_dr_fails);
if (unlikely(dr_fails > 0)) {
error = rd_kafka_error_new_txn_requires_abort(
RD_KAFKA_RESP_ERR__INCONSISTENT,
"%"PRId64" message(s) failed delivery "
"(see individual delivery reports)",
dr_fails);
goto err;
}


err = rd_kafka_EndTxnRequest(rk->rk_eos.txn_coord,
rk->rk_conf.eos.transactional_id,
pid,
Expand All @@ -2073,6 +2087,14 @@ rd_kafka_txn_op_commit_transaction (rd_kafka_t *rk,
err:
rd_kafka_wrunlock(rk);

/* If the returned error is an abortable error
* also set the current transaction state accordingly. */
if (rd_kafka_error_txn_requires_abort(error))
rd_kafka_txn_set_abortable_error(
rk,
rd_kafka_error_code(error),
"%s", rd_kafka_error_string(error));

rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
error);

Expand Down Expand Up @@ -2759,5 +2781,6 @@ void rd_kafka_txns_init (rd_kafka_t *rk) {
rd_kafka_broker_persistent_connection_add(
rk->rk_eos.txn_coord,
&rk->rk_eos.txn_coord->rkb_persistconn.coord);
}

rd_atomic64_init(&rk->rk_eos.txn_dr_fails, 0);
}

0 comments on commit e0ac7fb

Please sign in to comment.