Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[transactions] Implement KIP-664 DescribeProducers (#78)
Browse files Browse the repository at this point in the history
(cherry picked from commit c3376e5)
  • Loading branch information
eolivelli authored and gaoran10 committed Jul 30, 2023
1 parent c9e90c2 commit d60d786
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case DESCRIBE_GROUPS:
handleDescribeGroupRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_PRODUCERS:
handleDescribeProducersRequest(kafkaHeaderAndRequest, responseFuture);
break;
case LIST_GROUPS:
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -573,7 +576,12 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, CompletableFuture<AbstractResponse> response);
handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeProducersRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response);

protected abstract void
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
Expand Down Expand Up @@ -160,6 +161,8 @@
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
Expand Down Expand Up @@ -2041,6 +2044,99 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
));
}

@Override
protected void handleDescribeProducersRequest(KafkaHeaderAndRequest describeGroup,
CompletableFuture<AbstractResponse> responseFuture) {
// https://github.com/apache/kafka/blob/79c19da68d6a93a729a07dfdd37f238246653a46/
// core/src/main/scala/kafka/server/KafkaApis.scala#L3397
checkArgument(describeGroup.getRequest() instanceof DescribeProducersRequest);
DescribeProducersRequest request = (DescribeProducersRequest) describeGroup.getRequest();
Map<TopicPartition, DescribeProducersResponseData.PartitionResponse> allResponses = Maps.newConcurrentMap();
Map<TopicPartition, Errors> errors = Maps.newConcurrentMap();
String namespacePrefix = currentNamespacePrefix();
final int numPartitions = request.data().topics().stream()
.mapToInt(t->t.partitionIndexes().size())
.sum();
Runnable completeOne = () -> {
if (errors.size() + allResponses.size() != numPartitions) {
// not enough responses
return;
}
errors.forEach((topicPartition, tpErrors) -> {
DescribeProducersResponseData.PartitionResponse topicResponse =
new DescribeProducersResponseData.PartitionResponse()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(tpErrors.code())
.setErrorMessage(tpErrors.message());
allResponses.put(topicPartition, topicResponse);
});
DescribeProducersResponseData response = new DescribeProducersResponseData();
allResponses
.entrySet()
.stream()
.collect(Collectors.groupingBy(
entry -> entry.getKey().topic(),
Collectors.mapping(
entry -> entry.getValue(),
Collectors.toList()
)
))
.forEach((topic, partitionResponses) -> {
DescribeProducersResponseData.TopicResponse topicResponse =
new DescribeProducersResponseData.TopicResponse()
.setName(topic)
.setPartitions(partitionResponses);
response.topics().add(topicResponse);
});
responseFuture.complete(new DescribeProducersResponse(response));
};

request.data().topics().forEach ((topicRequest) -> {
topicRequest.partitionIndexes().forEach(partition -> {
TopicPartition tp = new TopicPartition(topicRequest.name(), partition);
String fullPartitionName;
try {
fullPartitionName = KopTopic.toString(tp, namespacePrefix);
} catch (KoPTopicException e) {
log.warn("Invalid topic name: {}", tp.topic(), e);
errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION);
completeOne.run();
return;
}
authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName))
.whenComplete((isAuthorized, ex) -> {
if (ex != null) {
log.error("AddPartitionsToTxn topic authorize failed, topic - {}. {}",
fullPartitionName, ex.getMessage());
errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
completeOne.run();
return;
}
if (!isAuthorized) {
errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
completeOne.run();
return;
}
CompletableFuture<DescribeProducersResponseData.PartitionResponse> topicResponse =
replicaManager.activeProducerState(tp, namespacePrefix);
topicResponse.whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("DescribeProducersRequest failed, topic - {}. {}",
fullPartitionName, throwable.getMessage());
errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION);
} else {
allResponses.put(tp, response);
}
completeOne.run();
});

});
});
});


}

@Override
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> resultFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -138,6 +140,8 @@ public class PartitionLog {

private volatile String kafkaTopicUUID;

private final AtomicBoolean unloaded = new AtomicBoolean();

public PartitionLog(KafkaServiceConfiguration kafkaConfig,
RequestStats requestStats,
Time time,
Expand Down Expand Up @@ -1176,6 +1180,29 @@ public CompletableFuture<Long> forcePurgeAbortTx() {
});
}

public DescribeProducersResponseData.PartitionResponse activeProducerState() {
DescribeProducersResponseData.PartitionResponse producerState =
new DescribeProducersResponseData.PartitionResponse()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
.setActiveProducers(new ArrayList<>());

// this utility is only for monitoring, it is fine to access this structure directly from any thread
Map<Long, ProducerStateEntry> producers = producerStateManager.getProducers();
producers.values().forEach(producerStateEntry -> {
producerState.activeProducers().add(new DescribeProducersResponseData.ProducerState()
.setProducerId(producerStateEntry.producerId())
.setLastSequence(-1) // NOT HANDLED YET
.setProducerEpoch(producerStateEntry.producerEpoch() != null
? producerStateEntry.producerEpoch().intValue() : -1)
.setLastTimestamp(producerStateEntry.lastTimestamp() != null
? producerStateEntry.lastTimestamp().longValue() : -1)
.setCoordinatorEpoch(producerStateEntry.coordinatorEpoch())
.setCurrentTxnStartOffset(producerStateEntry.currentTxnFirstOffset().orElse(-1L)));
});
return producerState;
}

public CompletableFuture<Long> recoverTxEntries(
long offset,
Executor executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,8 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset)
}
}

public Map<Long, ProducerStateEntry> getProducers() {
return producers;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
Expand Down Expand Up @@ -341,4 +342,14 @@ public CompletableFuture<?> updatePurgeAbortedTxnsOffsets() {
return logManager.updatePurgeAbortedTxnsOffsets();
}

public CompletableFuture<DescribeProducersResponseData.PartitionResponse> activeProducerState(
TopicPartition topicPartition,
String namespacePrefix) {
PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix);
// https://github.com/apache/kafka/blob/5514f372b3e12db1df35b257068f6bb5083111c7/
// core/src/main/scala/kafka/server/ReplicaManager.scala#L535
return partitionLog.awaitInitialisation()
.thenApply(log -> log.activeProducerState());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
Expand All @@ -43,14 +44,17 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -1342,47 +1346,63 @@ public void testListAndDescribeTransactions() throws Exception {
producer.initTransactions();
producer.beginTransaction();
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.EMPTY);
org.apache.kafka.clients.admin.TransactionState.EMPTY, (stateOnBroker, stateOnCoodinator) -> {
assertNull(stateOnBroker);
});
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
producer.flush();

ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
listTransactionsResult.all().get().forEach(t -> {
log.info("Found transactionalId: {} {} {}",
t.transactionalId(),
t.producerId(),
t.state());
});
// the transaction is in ONGOING state
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.ONGOING);
org.apache.kafka.clients.admin.TransactionState.ONGOING,
(stateOnBroker, stateOnCoodinator) -> {});

// wait for the brokers to update the state
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.ONGOING);
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.ONGOING,
(stateOnBroker, stateOnCoodinator) -> {
// THESE ASSERTIONS ARE NOT VALID YET
//log.info("stateOnBroker: {}", stateOnBroker);
//log.info("stateOnCoodinator: {}", stateOnCoodinator);
// assertTrue(stateOnBroker.lastTimestamp()
// >= stateOnCoodinator.transactionStartTimeMs().orElseThrow());
});
});
producer.commitTransaction();
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);
});
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT,
(stateOnBroker, stateOnCoodinator) -> {
});
});
producer.beginTransaction();

assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT,
(stateOnBroker, stateOnCoodinator) -> {
});

producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
producer.flush();
producer.abortTransaction();
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT,
(stateOnBroker, stateOnCoodinator) -> {
});
});
producer.close();
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT,
(stateOnBroker, stateOnCoodinator) -> {
});
}

private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId,
org.apache.kafka.clients.admin.TransactionState transactionState)
org.apache.kafka.clients.admin.TransactionState transactionState,
BiConsumer<ProducerState, TransactionDescription>
producerStateValidator)
throws Exception {
ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
Collection<TransactionListing> transactionListings = listTransactionsResult.all().get();
Expand Down Expand Up @@ -1457,12 +1477,44 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
assertEquals(0, transactionDescription.topicPartitions().size());
break;
case ONGOING:
assertTrue(transactionDescription.transactionStartTimeMs().orElseThrow() > 0);
assertEquals(1, transactionDescription.topicPartitions().size());
break;
default:
fail("unhandled " + transactionState);
}

DescribeProducersResult producers = kafkaAdmin.describeProducers(transactionDescription.topicPartitions());
Map<TopicPartition, DescribeProducersResult.PartitionProducerState> topicPartitionPartitionProducerStateMap =
producers.all().get();
log.debug("topicPartitionPartitionProducerStateMap {}", topicPartitionPartitionProducerStateMap);


switch (transactionState) {
case EMPTY:
case COMPLETE_COMMIT:
case COMPLETE_ABORT:
producerStateValidator.accept(null, transactionDescription);
assertEquals(0, topicPartitionPartitionProducerStateMap.size());
break;
case ONGOING:
assertEquals(1, topicPartitionPartitionProducerStateMap.size());
TopicPartition tp = transactionDescription.topicPartitions().iterator().next();
DescribeProducersResult.PartitionProducerState partitionProducerState =
topicPartitionPartitionProducerStateMap.get(tp);
List<ProducerState> producerStates = partitionProducerState.activeProducers();
assertEquals(1, producerStates.size());
ProducerState producerState = producerStates.get(0);
assertEquals(producerState.producerId(), transactionDescription.producerId());
producerStateValidator.accept(producerState, transactionDescription);


break;
default:
fail("unhandled " + transactionState);
}


}

/**
Expand Down

0 comments on commit d60d786

Please sign in to comment.