From 936f849b8a85c8494a649bb623ea0a741ff91438 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 9 Mar 2023 20:41:29 +0100 Subject: [PATCH 1/4] [transactions] Implement KIP-664 listTransactions (#76) (cherry picked from commit 5ef4a8531cde389af898cf36f57c518b340e426c) --- .../handlers/kop/KafkaCommandDecoder.java | 6 + .../handlers/kop/KafkaRequestHandler.java | 26 +++- .../coordinator/group/GroupCoordinator.java | 15 +-- .../transaction/TransactionCoordinator.java | 16 +++ .../transaction/TransactionState.java | 22 ++++ .../transaction/TransactionStateManager.java | 67 ++++++++++ .../kop/utils/KafkaResponseUtils.java | 19 +-- .../group/GroupCoordinatorTest.java | 22 ++-- .../transaction/TransactionTest.java | 114 ++++++++++++++++++ 9 files changed, 278 insertions(+), 29 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 4def7adecb..11ce298762 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -325,6 +325,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case LIST_GROUPS: handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; + case LIST_TRANSACTIONS: + handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; case DELETE_GROUPS: handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -583,6 +586,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void + handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 233b7892c2..d888957a95 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -32,6 +32,7 @@ import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator; import io.streamnative.pulsar.handlers.kop.security.Session; import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer; @@ -127,6 +128,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; @@ -175,6 +177,8 @@ import org.apache.kafka.common.requests.ListOffsetRequestV0; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.ListTransactionsRequest; +import org.apache.kafka.common.requests.ListTransactionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; @@ -2017,8 +2021,26 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture resultFuture) { checkArgument(listGroups.getRequest() instanceof ListGroupsRequest); - KeyValue> listResult = getGroupCoordinator().handleListGroups(); - resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue())); + Either> listResult = getGroupCoordinator().handleListGroups(); + resultFuture.complete(KafkaResponseUtils.newListGroups(listResult)); + } + + @Override + protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions, + CompletableFuture resultFuture) { + checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest); + ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest(); + List stateFilters = request.data().stateFilters(); + if (stateFilters == null) { + stateFilters = Collections.emptyList(); + } + List producerIdFilters = request.data().producerIdFilters(); + if (producerIdFilters == null) { + producerIdFilters = Collections.emptyList(); + } + ListTransactionsResponseData listResult = getTransactionCoordinator() + .handleListTransactions(stateFilters, producerIdFilters); + resultFuture.complete(new ListTransactionsResponse(listResult)); } @Override diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index f1f5d90ac1..98813604f7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -29,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.CoreUtils; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey; @@ -832,22 +833,16 @@ public KeyValue> handleFetchOffsets( ); } - public KeyValue> handleListGroups() { + public Either> handleListGroups() { if (!isActive.get()) { - return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>()); + return Either.left(Errors.COORDINATOR_NOT_AVAILABLE); } else { - Errors errors; if (groupManager.isLoading()) { - errors = Errors.COORDINATOR_LOAD_IN_PROGRESS; - } else { - errors = Errors.NONE; + return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS); } List overviews = new ArrayList<>(); groupManager.currentGroups().forEach(group -> overviews.add(group.overview())); - return new KeyValue<>( - errors, - overviews - ); + return Either.right(overviews); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index dfd3f08580..0771621228 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -32,6 +32,7 @@ import io.streamnative.pulsar.handlers.kop.storage.PulsarPartitionedTopicProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -52,6 +53,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.TransactionResult; @@ -80,6 +82,8 @@ public class TransactionCoordinator { private final Time time; + private final AtomicBoolean isActive = new AtomicBoolean(false); + private static final BiConsumer onEndTransactionComplete = (txnIdAndPidEpoch, errors) -> { @@ -218,6 +222,17 @@ public static String getTopicPartitionName(String topicPartitionName, int partit return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId; } + public ListTransactionsResponseData handleListTransactions(List filteredStates, + List filteredProducerIds) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259 + if (!isActive.get()) { + log.warn("The transaction coordinator is not active, so it will reject list transaction request"); + return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code()); + } + return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates); + } + @Data @EqualsAndHashCode @AllArgsConstructor @@ -956,6 +971,7 @@ public CompletableFuture startup(boolean enableTransactionalIdExpiration) return this.producerIdManager.initialize().thenCompose(ignored -> { log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata); + isActive.set(true); return CompletableFuture.completedFuture(null); }); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java index 7919449097..01d626f391 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java @@ -117,4 +117,26 @@ public boolean isExpirationAllowed() { return false; } } + + public org.apache.kafka.clients.admin.TransactionState toAdminState() { + switch (this) { + case EMPTY: + return org.apache.kafka.clients.admin.TransactionState.EMPTY; + case ONGOING: + return org.apache.kafka.clients.admin.TransactionState.ONGOING; + case PREPARE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT; + case PREPARE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT; + case COMPLETE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT; + case COMPLETE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT; + case PREPARE_EPOCH_FENCE: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE; + case DEAD: + default: + return org.apache.kafka.clients.admin.TransactionState.UNKNOWN; + } + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index f2762c57b5..08835b265d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.ProduceResponse; @@ -245,6 +247,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs <= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs()); } + private static boolean shouldInclude(TransactionMetadata txnMetadata, + List filterProducerIds, Set filterStateNames) { + if (txnMetadata.getState() == TransactionState.DEAD) { + // We filter the `Dead` state since it is a transient state which + // indicates that the transactionalId and its metadata are in the + // process of expiration and removal. + return false; + } else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) { + return false; + } else if (!filterStateNames.isEmpty() && !filterStateNames.contains( + txnMetadata.getState().toAdminState().toString())) { + return false; + } else { + return true; + } + } + + public ListTransactionsResponseData listTransactionStates(List filteredProducerIds, + List filteredStates) { + return CoreUtils.inReadLock(stateLock, () -> { + ListTransactionsResponseData response = new ListTransactionsResponseData(); + if (!loadingPartitions.isEmpty()) { + response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + } else { + Set filterStates = new HashSet<>(); + for (TransactionState stateName : TransactionState.values()) { + String nameForTheClient = stateName.toAdminState().toString(); + if (filteredStates.contains(nameForTheClient)) { + filterStates.add(nameForTheClient); + } else { + response.unknownStateFilters().add(nameForTheClient); + } + } + List states = new ArrayList<>(); + transactionMetadataCache.forEach((__, cache) -> { + cache.values().forEach(txnMetadata -> { + txnMetadata.inLock(() -> { + // use toString() to get the name of the state according to the protocol + ListTransactionsResponseData.TransactionState transactionState = + new ListTransactionsResponseData.TransactionState() + .setTransactionalId(txnMetadata.getTransactionalId()) + .setProducerId(txnMetadata.getProducerId()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()); + + if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) { + if (log.isDebugEnabled()) { + log.debug("add transaction state: {}", transactionState); + } + states.add(transactionState); + } else { + if (log.isDebugEnabled()) { + log.debug("Skip transaction state: {}", transactionState); + } + } + return null; + }); + }); + }); + response.setErrorCode(Errors.NONE.code()) + .setTransactionStates(states); + } + return response; + }); + } + @Data @AllArgsConstructor private static class TransactionalIdCoordinatorEpochAndMetadata { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index 3a654dc2b3..0fb479f517 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -15,6 +15,7 @@ import io.streamnative.pulsar.handlers.kop.ApiVersion; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -269,14 +270,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) { return new LeaveGroupResponse(data); } - public static ListGroupsResponse newListGroups(Errors errors, - List groups) { + public static ListGroupsResponse newListGroups(Either> results) { ListGroupsResponseData data = new ListGroupsResponseData(); - data.setErrorCode(errors.code()); - data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup() - .setGroupId(overView.groupId()) - .setProtocolType(overView.protocolType())) - .collect(Collectors.toList())); + data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code()); + if (!results.isLeft()) { + data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup() + .setGroupId(overView.groupId()) + .setProtocolType(overView.protocolType())) + .collect(Collectors.toList())); + + } else { + data.setGroups(Collections.emptyList()); + } return new ListGroupsResponse(data); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java index 105958975d..3696b926dc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -28,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer; import java.util.ArrayList; @@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception { assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey()); // ListGroups - KeyValue> listGroupsResult = groupCoordinator.handleListGroups(); - assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey()); + Either> listGroupsResult = groupCoordinator.handleListGroups(); + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft()); // DeleteGroups Map deleteGroupsErrors = groupCoordinator.handleDeleteGroups( @@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols() ).get(); assertEquals(Errors.NONE, syncGroupResult.getKey()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } @@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols() ); assertEquals(Errors.NONE, joinGroupResult.getError()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index ea6ef0e23b..f043c3d581 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -36,6 +36,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -52,7 +53,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTransactionsOptions; +import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -1487,6 +1491,116 @@ public void testAbortedTxEventuallyPurged() throws Exception { } } + @Test(timeOut = 100000 * 30) + public void testListTransactions() throws Exception { + + String topicName = "testListTransactions"; + String transactionalId = "myProducer_" + UUID.randomUUID(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); + + producer.initTransactions(); + producer.beginTransaction(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.EMPTY); + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + listTransactionsResult.all().get().forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + }); + producer.commitTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + }); + producer.beginTransaction(); + + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + producer.abortTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + }); + producer.close(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + } + + private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, + org.apache.kafka.clients.admin.TransactionState transactionState) + throws Exception { + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + Collection transactionListings = listTransactionsResult.all().get(); + transactionListings.forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + TransactionListing transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same state + ListTransactionsOptions optionFilterState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + + // filter for the same producer id + ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions() + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same producer id and state + ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)) + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + } + /** * Get the Kafka server address. */ From 30f78c28a26c3a609cff0c506bf4ab6a0ed10770 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 2 Mar 2023 10:40:37 +0100 Subject: [PATCH 2/4] Update Kafka wireprotocol to 3.4.0 and implement KIP-699 and KIP-709 (cherry picked from commit 71b77b2fd65fd1a2d481ed7067939fab3c0ad002) --- .../pulsar/handlers/kop/KopProtocolHandlerTestBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index fe939f3689..399b9c201d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -312,6 +312,7 @@ protected final void internalSetup(boolean startBroker) throws Exception { createClient(); MetadataUtils.createOffsetMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); + if (conf.isKafkaTransactionCoordinatorEnabled()) { MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); } From 9d440e6b2c5c5de0a9858b0b4a545ed8b3018336 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 9 Mar 2023 20:41:29 +0100 Subject: [PATCH 3/4] g[transactions] Implement KIP-664 listTransactions (#76) (cherry picked from commit 5ef4a8531cde389af898cf36f57c518b340e426c) --- .../transaction/TransactionTest.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index f043c3d581..e6abf77605 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -1601,6 +1601,116 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa assertEquals(transactionState, transactionListing.state()); } + @Test(timeOut = 100000 * 30) + public void testListTransactions() throws Exception { + + String topicName = "testListTransactions"; + String transactionalId = "myProducer_" + UUID.randomUUID(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); + + producer.initTransactions(); + producer.beginTransaction(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.EMPTY); + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + listTransactionsResult.all().get().forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + }); + producer.commitTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + }); + producer.beginTransaction(); + + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + producer.abortTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + }); + producer.close(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + } + + private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, + org.apache.kafka.clients.admin.TransactionState transactionState) + throws Exception { + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + Collection transactionListings = listTransactionsResult.all().get(); + transactionListings.forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + TransactionListing transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same state + ListTransactionsOptions optionFilterState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + + // filter for the same producer id + ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions() + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same producer id and state + ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)) + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + } + /** * Get the Kafka server address. */ From 6108953bddaeb73c82f5501fcf0d822ad1fb791f Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 10 Mar 2023 16:29:14 +0100 Subject: [PATCH 4/4] [transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS (#77) (cherry picked from commit 1f2fe99e0e88c6bba1291525c242563a5eb94ef2) --- .../handlers/kop/KafkaCommandDecoder.java | 6 + .../handlers/kop/KafkaRequestHandler.java | 13 ++ .../transaction/TransactionCoordinator.java | 72 +++++++++ .../transaction/TransactionTest.java | 144 ++++-------------- 4 files changed, 123 insertions(+), 112 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 11ce298762..dbce355edf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -328,6 +328,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case LIST_TRANSACTIONS: handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture); break; + case DESCRIBE_TRANSACTIONS: + handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; case DELETE_GROUPS: handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -589,6 +592,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void + handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index d888957a95..818d331abe 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -117,6 +117,7 @@ import org.apache.kafka.common.message.DescribeClusterResponseData; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.FetchRequestData; @@ -161,6 +162,8 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FetchRequest; @@ -2043,6 +2046,16 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransacti resultFuture.complete(new ListTransactionsResponse(listResult)); } + @Override + protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, + CompletableFuture response) { + checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest); + DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest(); + DescribeTransactionsResponseData describeResult = getTransactionCoordinator() + .handleDescribeTransactions(request.data().transactionalIds()); + response.complete(new DescribeTransactionsResponse(describeResult)); + } + @Override protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture resultFuture) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index 0771621228..5b887057a8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.coordinator.transaction; +import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT; @@ -53,6 +54,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; @@ -233,6 +235,76 @@ public ListTransactionsResponseData handleListTransactions(List filtered return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates); } + public DescribeTransactionsResponseData handleDescribeTransactions(List transactionalIds) { + DescribeTransactionsResponseData response = new DescribeTransactionsResponseData(); + if (transactionalIds != null) { + transactionalIds.forEach(transactionalId -> { + DescribeTransactionsResponseData.TransactionState transactionState = + handleDescribeTransactions(transactionalId); + response.transactionStates().add(transactionState); + }); + } + return response; + } + + private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270 + if (transactionalId == null) { + throw new IllegalArgumentException("Invalid null transactionalId"); + } + + DescribeTransactionsResponseData.TransactionState transactionState = + new DescribeTransactionsResponseData.TransactionState() + .setTransactionalId(transactionalId); + + if (!isActive.get()) { + transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()); + } else if (transactionalId.isEmpty()) { + transactionState.setErrorCode(Errors.INVALID_REQUEST.code()); + } else { + Either> tState = + txnManager.getTransactionState(transactionalId); + if (tState.isLeft()) { + transactionState.setErrorCode(tState.getLeft().code()); + } else { + Optional right = tState.getRight(); + if (!right.isPresent()) { + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get(); + TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata(); + txnMetadata.inLock(() -> { + if (txnMetadata.getState() == DEAD) { + // The transaction state is being expired, so ignore it + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + txnMetadata.getTopicPartitions().forEach(topicPartition -> { + var topicData = transactionState.topics().find(topicPartition.topic()); + if (topicData == null) { + topicData = new DescribeTransactionsResponseData.TopicData() + .setTopic(topicPartition.topic()); + transactionState.topics().add(topicData); + } + topicData.partitions().add(topicPartition.partition()); + }); + + transactionState + .setErrorCode(Errors.NONE.code()) + .setProducerId(txnMetadata.getProducerId()) + .setProducerEpoch(txnMetadata.getProducerEpoch()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()) + .setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs()) + .setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp()); + } + return null; + }); + } + } + } + return transactionState; + } + @Data @EqualsAndHashCode @AllArgsConstructor diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index e6abf77605..415846c46d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.clients.admin.ListTransactionsOptions; import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TransactionDescription; import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -87,6 +88,8 @@ @Slf4j public class TransactionTest extends KopProtocolHandlerTestBase { + private static final int TRANSACTION_TIMEOUT_CONFIG_VALUE = 600 * 1000; + protected void setupTransactions() { this.conf.setDefaultNumberOfNamespaceBundles(4); this.conf.setOffsetsTopicNumPartitions(10); @@ -1162,7 +1165,7 @@ private KafkaProducer buildTransactionProducer(String transacti producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout); } else { // very long time-out - producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000); + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_CONFIG_VALUE); } producerProps.put(CLIENT_ID_CONFIG, "dummy_client_" + UUID.randomUUID()); addCustomizeProps(producerProps); @@ -1491,10 +1494,10 @@ public void testAbortedTxEventuallyPurged() throws Exception { } } - @Test(timeOut = 100000 * 30) - public void testListTransactions() throws Exception { + @Test(timeOut = 1000 * 30) + public void testListAndDescribeTransactions() throws Exception { - String topicName = "testListTransactions"; + String topicName = "testListAndDescribeTransactions"; String transactionalId = "myProducer_" + UUID.randomUUID(); @Cleanup @@ -1599,116 +1602,33 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa .findFirst() .get(); assertEquals(transactionState, transactionListing.state()); - } - - @Test(timeOut = 100000 * 30) - public void testListTransactions() throws Exception { - - String topicName = "testListTransactions"; - String transactionalId = "myProducer_" + UUID.randomUUID(); - - @Cleanup - KafkaProducer producer = buildTransactionProducer(transactionalId); - @Cleanup - AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); - - producer.initTransactions(); - producer.beginTransaction(); - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.EMPTY); - producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); - producer.flush(); - - ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); - listTransactionsResult.all().get().forEach(t -> { - log.info("Found transactionalId: {} {} {}", - t.transactionalId(), - t.producerId(), - t.state()); - }); - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); - Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); - }); - producer.commitTransaction(); - Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); - }); - producer.beginTransaction(); - - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); - - producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); - producer.flush(); - producer.abortTransaction(); - Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); - }); - producer.close(); - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); - } - private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, - org.apache.kafka.clients.admin.TransactionState transactionState) - throws Exception { - ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); - Collection transactionListings = listTransactionsResult.all().get(); - transactionListings.forEach(t -> { - log.info("Found transactionalId: {} {} {}", - t.transactionalId(), - t.producerId(), - t.state()); - }); - TransactionListing transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); - - // filter for the same state - ListTransactionsOptions optionFilterState = new ListTransactionsOptions() - .filterStates(Collections.singleton(transactionState)); - listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState); - transactionListings = listTransactionsResult.all().get(); - transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); - - - // filter for the same producer id - ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions() - .filterProducerIds(Collections.singleton(transactionListing.producerId())); - listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer); - transactionListings = listTransactionsResult.all().get(); - transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); + Map map = + kafkaAdmin.describeTransactions(Collections.singleton(transactionalId)) + .all().get(); + assertEquals(1, map.size()); + TransactionDescription transactionDescription = map.get(transactionalId); + log.info("transactionDescription {}", transactionDescription); + assertNotNull(transactionDescription); + assertEquals(transactionDescription.state(), transactionState); + assertTrue(transactionDescription.producerEpoch() >= 0); + assertEquals(TRANSACTION_TIMEOUT_CONFIG_VALUE, transactionDescription.transactionTimeoutMs()); + assertTrue(transactionDescription.transactionStartTimeMs().isPresent()); + assertTrue(transactionDescription.coordinatorId() >= 0); + + switch (transactionState) { + case EMPTY: + case COMPLETE_COMMIT: + case COMPLETE_ABORT: + assertEquals(0, transactionDescription.topicPartitions().size()); + break; + case ONGOING: + assertEquals(1, transactionDescription.topicPartitions().size()); + break; + default: + fail("unhandled " + transactionState); + } - // filter for the same producer id and state - ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions() - .filterStates(Collections.singleton(transactionState)) - .filterProducerIds(Collections.singleton(transactionListing.producerId())); - listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState); - transactionListings = listTransactionsResult.all().get(); - transactionListing = transactionListings - .stream() - .filter(t -> t.transactionalId().equals(transactionalId)) - .findFirst() - .get(); - assertEquals(transactionState, transactionListing.state()); } /**