Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -3085,7 +3086,7 @@ public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {

time.sleep(20);

sendIdempotentProducerResponse(0, tp0, Errors.INVALID_RECORD, -1);
sendIdempotentProducerResponse(0, tp0, Errors.INVALID_TXN_STATE, -1);
sender.runOnce(); // receive late response

// Loop once and confirm that the transaction manager does not enter a fatal error state
Expand All @@ -3107,6 +3108,45 @@ public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
txnManager.beginTransaction();
}

@Test
public void testInvalidTxnStateIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testInvalidTxnState", 60000, 100, apiVersions);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();

Future<RecordMetadata> request = appendToAccumulator(tp0);
sender.runOnce(); // send request
sendIdempotentProducerResponse(0, tp0, Errors.INVALID_TXN_STATE, -1);

// Return InvalidTxnState error. It should be abortable.
sender.runOnce();
assertFutureFailure(request, InvalidTxnStateException.class);
assertTrue(txnManager.hasAbortableError());
TransactionalRequestResult result = txnManager.beginAbort();
sender.runOnce();

// Once the transaction is aborted, we should be able to begin a new one.
respondToEndTxn(Errors.NONE);
sender.runOnce();
assertTrue(txnManager::isInitializing);
prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the current logic force us to reset the epoch after any error which causes an abort? It seems like a good idea, but just wanted to confirm what the trigger is.

Copy link
Member Author

@jolshan jolshan Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The places we call:

 if (canBumpEpoch()) {
                            epochBumpRequired = true;
                            KafkaException exception = new KafkaException(unackedMessagesErr + "It is safe to abort " +
                                    "the transaction and continue.");
                            transitionToAbortableError(exception);
                            }
    public synchronized void maybeTransitionToErrorState(RuntimeException exception) {
        if (exception instanceof ClusterAuthorizationException
                || exception instanceof TransactionalIdAuthorizationException
                || exception instanceof ProducerFencedException
                || exception instanceof UnsupportedVersionException) {
            transitionToFatalError(exception);
        } else if (isTransactional()) {
            if (canBumpEpoch() && !isCompleting()) {
                epochBumpRequired = true;
            }
            transitionToAbortableError(exception);
        }

and a third that sometimes does and sometimes doesn't.

I think the path for this specific produce request error is the second one so we may not bump epoch in some cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

camBumpEpoch should be true for newer versions of transactional producers.

sender.runOnce();
assertTrue(txnManager::isReady);

assertTrue(result.isSuccessful());
result.await();

txnManager.beginTransaction();
}

private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidRecordException("Record was not part of an ongoing transaction")
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
}

// We cache offset metadata for the start of each transaction. This allows us to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time
if (addPartitionsToTxnResponseData.errorCode != 0) {
error(s"AddPartitionsToTxnRequest for node ${response.destination} returned with error ${Errors.forCode(addPartitionsToTxnResponseData.errorCode)}.")
// The client should not be exposed to CLUSTER_AUTHORIZATION_FAILED so modify the error to signify the verification did not complete.
// Older clients return with INVALID_RECORD and newer ones can return with INVALID_TXN_STATE.
// Return INVALID_TXN_STATE.
val finalError = if (addPartitionsToTxnResponseData.errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code)
Errors.INVALID_RECORD.code
Errors.INVALID_TXN_STATE.code
else
addPartitionsToTxnResponseData.errorCode

Expand All @@ -160,9 +160,6 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time
val code =
if (partitionResult.partitionErrorCode == Errors.PRODUCER_FENCED.code)
Errors.INVALID_PRODUCER_EPOCH.code
// Older clients return INVALID_RECORD
else if (partitionResult.partitionErrorCode == Errors.INVALID_TXN_STATE.code)
Errors.INVALID_RECORD.code
else
partitionResult.partitionErrorCode
unverified.put(tp, Errors.forCode(code))
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,7 @@ class ReplicaManager(val config: KafkaConfig,
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) =>
// NOTE: Older clients return INVALID_RECORD, but newer clients will return INVALID_TXN_STATE
val message = if (error.equals(Errors.INVALID_RECORD)) "Partition was not added to the transaction" else error.message()
val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message()
topicPartition -> LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(error.exception(message))
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest, ListOffsetsRequest, RequestHeader}
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.{InvalidRecordException, IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -3387,14 +3387,14 @@ class PartitionTest extends AbstractPartitionTest {
producerId = producerId)

// When verification guard is not there, we should not be able to append.
assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))

// Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertNotNull(verificationGuard)

// With the wrong verification guard, append should fail.
assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(),
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(),
origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))

// We should return the same verification object when we still need to verify. Append should proceed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class AddPartitionsToTxnManagerTest {
assertEquals(expectedDisconnectedErrors, transaction1Errors)
assertEquals(expectedDisconnectedErrors, transaction2Errors)

val expectedTopLevelErrors = topicPartitions.map(_ -> Errors.INVALID_RECORD).toMap
val expectedTopLevelErrors = topicPartitions.map(_ -> Errors.INVALID_TXN_STATE).toMap
val topLevelErrorAddPartitionsResponse = new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()))
val topLevelErrorResponse = clientResponse(topLevelErrorAddPartitionsResponse)
addTransactionsToVerify()
Expand All @@ -212,9 +212,9 @@ class AddPartitionsToTxnManagerTest {
val preConvertedTransaction1Errors = topicPartitions.map(_ -> Errors.PRODUCER_FENCED).toMap
val expectedTransaction1Errors = topicPartitions.map(_ -> Errors.INVALID_PRODUCER_EPOCH).toMap
val preConvertedTransaction2Errors = Map(new TopicPartition("foo", 1) -> Errors.NONE,
new TopicPartition("foo", 2) -> Errors.INVALID_RECORD,
new TopicPartition("foo", 2) -> Errors.INVALID_TXN_STATE,
new TopicPartition("foo", 3) -> Errors.NONE)
val expectedTransaction2Errors = Map(new TopicPartition("foo", 2) -> Errors.INVALID_RECORD)
val expectedTransaction2Errors = Map(new TopicPartition("foo", 2) -> Errors.INVALID_TXN_STATE)

val transaction1ErrorResponse = AddPartitionsToTxnResponse.resultForTransaction(transactionalId1, preConvertedTransaction1Errors.asJava)
val transaction2ErrorResponse = AddPartitionsToTxnResponse.resultForTransaction(transactionalId2, preConvertedTransaction2Errors.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2231,8 +2231,8 @@ class ReplicaManagerTest {

// Confirm we did not write to the log and instead returned error.
val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
callback(Map(tp0 -> Errors.INVALID_RECORD).toMap)
assertEquals(Errors.INVALID_RECORD, result.assertFired.error)
callback(Map(tp0 -> Errors.INVALID_TXN_STATE).toMap)
assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))

// This time verification is successful.
Expand Down