From 71b77b2fd65fd1a2d481ed7067939fab3c0ad002 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 2 Mar 2023 10:40:37 +0100 Subject: [PATCH] Update Kafka wireprotocol to 3.4.0 and implement KIP-699 and KIP-709 --- .../pulsar/handlers/kop/DelayedFetch.java | 6 +- .../handlers/kop/KafkaCommandDecoder.java | 9 +- .../handlers/kop/KafkaRequestHandler.java | 288 ++++++++++---- .../handlers/kop/storage/PartitionLog.java | 76 ++-- .../kop/storage/ProducerStateManager.java | 10 +- .../handlers/kop/storage/ReplicaManager.java | 6 +- .../kop/utils/KafkaResponseUtils.java | 111 +++++- .../kop/utils/MessageMetadataUtils.java | 4 + .../requests/ResponseCallbackWrapper.java | 5 + .../kop/format/EntryFormatterTest.java | 10 +- pom.xml | 4 +- .../kop/KafkaProxyRequestHandler.java | 356 ++++++++++++------ .../handlers/kop/DistributedClusterTest.java | 1 + .../kop/EntryPublishTimeKafkaFormatTest.java | 2 +- .../pulsar/handlers/kop/KafkaApisTest.java | 77 ++-- .../handlers/kop/KafkaCommonTestUtils.java | 11 +- .../kop/KafkaRequestHandlerProxyTest.java | 6 + .../handlers/kop/KafkaRequestHandlerTest.java | 56 ++- ...kaRequestHandlerWithAuthorizationTest.java | 108 ++++-- .../kop/KopProtocolHandlerTestBase.java | 5 +- .../handlers/kop/MetricsProviderTest.java | 6 +- .../pulsar/handlers/kop/TransactionTest.java | 32 +- .../TransactionStateManagerTest.java | 2 + .../kop/streams/GlobalKTableTest.java | 14 +- .../kop/streams/KStreamAggregationTest.java | 31 +- .../handlers/kop/streams/KTableTest.java | 7 +- .../kop/streams/KafkaStreamsTestBase.java | 4 +- tests/src/test/resources/log4j2.xml | 2 +- 28 files changed, 909 insertions(+), 340 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java index 3a2ba21244..541fab3c85 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java @@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.message.FetchRequestData; @Slf4j public class DelayedFetch extends DelayedOperation { @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation { private final long bytesReadable; private final int fetchMaxBytes; private final boolean readCommitted; - private final Map readPartitionInfo; + private final Map readPartitionInfo; private final Map readRecordsResult; private final MessageFetchContext context; protected volatile Boolean hasError; @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs, final boolean readCommitted, final MessageFetchContext context, final ReplicaManager replicaManager, - final Map readPartitionInfo, + final Map readPartitionInfo, final Map readRecordsResult, final CompletableFuture> callback) { super(delayMs, Optional.empty()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index e921d0d5b3..5b45e3c91f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -453,7 +453,14 @@ protected void writeAndFlushResponseToClient(Channel channel) { this, request, response); } - final ByteBuf result = responseToByteBuf(response, request, true); + final ByteBuf result; + try { + result = responseToByteBuf(response, request, true); + } catch (Throwable error) { + log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error); + sendErrorResponse(request, channel, error, true); + return; + } final int resultSize = result.readableBytes(); channel.writeAndFlush(result).addListener(future -> { if (response instanceof ResponseCallbackWrapper) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 92f357afe9..67981da2f5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration.TENANT_ALLNAMESPACES_PLACEHOLDER; import static io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration.TENANT_PLACEHOLDER; +import static io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils.buildOffsetFetchResponse; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; @@ -61,7 +62,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -118,6 +118,9 @@ import org.apache.kafka.common.message.DescribeConfigsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -136,7 +139,6 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; @@ -973,19 +975,50 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest); FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + List coordinatorKeys = request.version() < FindCoordinatorRequest.MIN_BATCHED_VERSION + ? Collections.singletonList(request.data().key()) : request.data().coordinatorKeys(); + + List> futures = + new ArrayList<>(coordinatorKeys.size()); + for (String coordinatorKey : coordinatorKeys) { + CompletableFuture future = + findSingleCoordinator(coordinatorKey, findCoordinator); + futures.add(future); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .whenComplete((ignore, ex) -> { + if (ex != null) { + resultFuture.completeExceptionally(ex); + return; + } + List coordinators = new ArrayList<>(futures.size()); + for (CompletableFuture future : futures) { + coordinators.add(future.join()); + } + resultFuture.complete(KafkaResponseUtils.newFindCoordinator(coordinators, + request.version())); + }); + + } + private CompletableFuture findSingleCoordinator(String coordinatorKey, + KafkaHeaderAndRequest findCoordinator) { + FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + CompletableFuture findSingleCoordinatorResult = + new CompletableFuture<>(); String pulsarTopicName; int partition; CompletableFuture storeGroupIdFuture; if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) { TransactionCoordinator transactionCoordinator = getTransactionCoordinator(); - partition = transactionCoordinator.partitionFor(request.data().key()); + partition = transactionCoordinator.partitionFor(coordinatorKey); pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition); storeGroupIdFuture = CompletableFuture.completedFuture(null); } else if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.GROUP.id()) { - partition = getGroupCoordinator().partitionFor(request.data().key()); + partition = getGroupCoordinator().partitionFor(coordinatorKey); pulsarTopicName = getGroupCoordinator().getTopicPartitionName(partition); if (kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) { - String groupId = request.data().key(); + String groupId = coordinatorKey; String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(), findCoordinator.getHeader().clientId()); currentConnectedClientId.add(findCoordinator.getHeader().clientId()); @@ -997,8 +1030,9 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato } } else { - throw new NotImplementedException("FindCoordinatorRequest not support unknown type " - + request.data().keyType()); + return CompletableFuture.failedFuture( + new NotImplementedException("FindCoordinatorRequest not support unknown type " + + request.data().keyType())); } // Store group name to metadata store for current client, use to collect consumer metrics. @@ -1013,8 +1047,12 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.error("[{}] Request {}: Error while find coordinator.", ctx.channel(), findCoordinator.getHeader(), throwable); - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.LEADER_NOT_AVAILABLE.code()) + .setErrorMessage(Errors.LEADER_NOT_AVAILABLE.message()) + .setKey(coordinatorKey)); + return; } @@ -1022,9 +1060,17 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.debug("[{}] Found node {} as coordinator for key {} partition {}.", ctx.channel(), result.node, request.data().key(), partition); } - resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(result.node.id()) + .setHost(result.node.host()) + .setPort(result.node.port()) + .setErrorCode(result.error.code()) + .setErrorMessage(result.error.message()) + .setKey(coordinatorKey)); }); }); + return findSingleCoordinatorResult; } @VisibleForTesting @@ -1070,6 +1116,61 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, checkState(getGroupCoordinator() != null, "Group Coordinator not started"); + List> futures = new ArrayList<>(); + if (request.version() >= 8) { + request.data().groups().forEach(group -> { + String groupId = group.groupId(); + List partitions = new ArrayList<>(); + // null topics means no partitions specified, so we should fetch all partitions + if (group.topics() != null) { + group + .topics() + .forEach(topic -> { + topic.partitionIndexes() + .forEach(partition -> partitions.add(new TopicPartition(topic.name(), partition))); + }); + } + futures.add(getOffsetFetchForGroup(groupId, partitions)); + }); + + } else { + // old clients + String groupId = request.data().groupId(); + List partitions = new ArrayList<>(); + request.data().topics().forEach(topic -> { + topic + .partitionIndexes() + .forEach(partition -> partitions.add(new TopicPartition(topic.name(), partition))); + }); + futures.add(getOffsetFetchForGroup(groupId, partitions)); + } + + FutureUtil.waitForAll(futures) + .whenComplete((___, error) -> { + if (error != null) { + resultFuture.complete(request.getErrorResponse(error)); + return; + } + List partitionsResponses = new ArrayList<>(); + futures.forEach(f -> { + partitionsResponses.add(f.join()); + }); + + resultFuture.complete(buildOffsetFetchResponse(partitionsResponses, + request.version())); + }); + + + + } + + protected CompletableFuture getOffsetFetchForGroup( + String groupId, + List partitions + ) { + + CompletableFuture resultFuture = new CompletableFuture<>(); + CompletableFuture> authorizeFuture = new CompletableFuture<>(); // replace @@ -1081,10 +1182,11 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, Map unknownPartitionData = Maps.newConcurrentMap(); - if (request.partitions() == null || request.partitions().isEmpty()) { + if (partitions == null || partitions.isEmpty()) { + // fetch all partitions authorizeFuture.complete(null); } else { - AtomicInteger partitionCount = new AtomicInteger(request.partitions().size()); + AtomicInteger partitionCount = new AtomicInteger(partitions.size()); Runnable completeOneAuthorization = () -> { if (partitionCount.decrementAndGet() == 0) { @@ -1092,7 +1194,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, } }; final String namespacePrefix = currentNamespacePrefix(); - request.partitions().forEach(tp -> { + partitions.forEach(tp -> { try { String fullName = new KopTopic(tp.topic(), namespacePrefix).getFullName(); authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullName)) @@ -1125,7 +1227,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, authorizeFuture.whenComplete((partitionList, ex) -> { KeyValue> keyValue = getGroupCoordinator().handleFetchOffsets( - request.groupId(), + groupId, Optional.ofNullable(partitionList) ); if (log.isDebugEnabled()) { @@ -1141,14 +1243,21 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, } // recover to original topic name - replaceTopicPartition(keyValue.getValue(), replacingIndex); - keyValue.getValue().putAll(unauthorizedPartitionData); - keyValue.getValue().putAll(unknownPartitionData); - - resultFuture.complete(new OffsetFetchResponse(keyValue.getKey(), keyValue.getValue())); + Map partitionsResponses = keyValue.getValue(); + replaceTopicPartition(partitionsResponses, replacingIndex); + partitionsResponses.putAll(unauthorizedPartitionData); + partitionsResponses.putAll(unknownPartitionData); + + Errors errors = keyValue.getKey(); + resultFuture.complete(new KafkaResponseUtils.OffsetFetchResponseGroupData(groupId, errors, + partitionsResponses)); }); + + return resultFuture; } + + private CompletableFuture> fetchOffset(String topicName, long timestamp) { CompletableFuture> partitionData = new CompletableFuture<>(); @@ -1612,30 +1721,32 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, checkArgument(fetch.getRequest() instanceof FetchRequest); FetchRequest request = (FetchRequest) fetch.getRequest(); + FetchRequestData data = request.data(); if (log.isDebugEnabled()) { log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ", - ctx.channel(), fetch.getHeader(), request.fetchData().size()); + ctx.channel(), fetch.getHeader(), data.topics().size()); - request.fetchData().forEach((topic, data) -> { - log.debug("Fetch request topic:{} data:{}.", topic, data.toString()); + data.topics().forEach((topicData) -> { + log.debug("Fetch request topic: data:{}.", topicData.toString()); }); } - if (request.fetchData().isEmpty()) { - resultFuture.complete(new FetchResponse<>( - Errors.NONE, - new LinkedHashMap<>(), - THROTTLE_TIME_MS, - request.metadata().sessionId())); + int numPartitions = data.topics().stream().mapToInt(topic -> topic.partitions().size()).sum(); + if (numPartitions == 0) { + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setSessionId(request.metadata().sessionId()) + .setResponses(new ArrayList<>()))); return; } - ConcurrentHashMap> erroneous = + ConcurrentHashMap erroneous = new ConcurrentHashMap<>(); - ConcurrentHashMap interesting = + ConcurrentHashMap interesting = new ConcurrentHashMap<>(); - AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(request.fetchData().size()); + + AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(numPartitions); Runnable completeOne = () -> { if (unfinishedAuthorizationCount.decrementAndGet() == 0) { TransactionCoordinator transactionCoordinator = null; @@ -1648,13 +1759,12 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, int fetchMinBytes = Math.min(request.minBytes(), fetchMaxBytes); if (interesting.isEmpty()) { if (log.isDebugEnabled()) { - log.debug("Fetch interesting is empty. Partitions: [{}]", request.fetchData()); + log.debug("Fetch interesting is empty. Partitions: [{}]", data.topics()); } - resultFuture.complete(new FetchResponse<>( - Errors.NONE, - new LinkedHashMap<>(erroneous), - THROTTLE_TIME_MS, - request.metadata().sessionId())); + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setSessionId(request.metadata().sessionId()) + .setResponses(buildFetchResponses(erroneous)))); } else { MessageFetchContext context = MessageFetchContext .get(this, transactionCoordinator, maxReadEntriesNum, namespacePrefix, @@ -1667,18 +1777,17 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, request.isolationLevel(), context ).thenAccept(resultMap -> { - LinkedHashMap> partitions = - new LinkedHashMap<>(); - resultMap.forEach((tp, data) -> { - partitions.put(tp, data.toPartitionData()); + Map all = new HashMap<>(); + resultMap.forEach((tp, results) -> { + all.put(tp, results.toPartitionData()); }); - partitions.putAll(erroneous); + all.putAll(erroneous); boolean triggeredCompletion = resultFuture.complete(new ResponseCallbackWrapper( - new FetchResponse<>( - Errors.NONE, - partitions, - 0, - request.metadata().sessionId()), + new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0) + .setSessionId(request.metadata().sessionId()) + .setResponses(buildFetchResponses(all))), () -> resultMap.forEach((__, readRecordsResult) -> { readRecordsResult.recycle(); }) @@ -1695,36 +1804,72 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, }; // Regular Kafka consumers need READ permission on each partition they are fetching. - request.fetchData().forEach((topicPartition, partitionData) -> { - final String fullTopicName = KopTopic.toString(topicPartition, this.currentNamespacePrefix()); - authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)) - .whenComplete((isAuthorized, ex) -> { - if (ex != null) { - log.error("Read topic authorize failed, topic - {}. {}", - fullTopicName, ex.getMessage()); - erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); - completeOne.run(); - return; - } - if (!isAuthorized) { - erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + data.topics().forEach(topicData -> { + topicData.partitions().forEach((partitionData) -> { + TopicPartition topicPartition = new TopicPartition(topicData.topic(), partitionData.partition()); + final String fullTopicName = KopTopic.toString(topicPartition, this.currentNamespacePrefix()); + authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Read topic authorize failed, topic - {}. {}", + fullTopicName, ex.getMessage()); + erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + completeOne.run(); + return; + } + if (!isAuthorized) { + erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + completeOne.run(); + return; + } + interesting.put(topicPartition, partitionData); completeOne.run(); - return; - } - interesting.put(topicPartition, partitionData); - completeOne.run(); - }); + }); + }); }); } - private static FetchResponse.PartitionData errorResponse(Errors error) { - return new FetchResponse.PartitionData<>(error, - FetchResponse.INVALID_HIGHWATERMARK, - FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY); + public static List buildFetchResponses( + Map partitionData) { + List result = new ArrayList<>(); + partitionData.keySet() + .stream() + .map(topicPartition -> topicPartition.topic()) + .distinct() + .forEach(topic -> { + FetchResponseData.FetchableTopicResponse fetchableTopicResponse = + new FetchResponseData.FetchableTopicResponse() + .setTopic(topic) + .setPartitions(new ArrayList<>()); + result.add(fetchableTopicResponse); + + partitionData.forEach((tp, data) -> { + if (tp.topic().equals(topic)) { + fetchableTopicResponse.partitions().add(new FetchResponseData.PartitionData() + .setPartitionIndex(tp.partition()) + .setErrorCode(data.errorCode()) + .setHighWatermark(data.highWatermark()) + .setLastStableOffset(data.lastStableOffset()) + .setLogStartOffset(data.logStartOffset()) + .setAbortedTransactions(data.abortedTransactions()) + .setPreferredReadReplica(data.preferredReadReplica()) + .setRecords(data.records())); + } + }); + }); + return result; + } + + private static FetchResponseData.PartitionData errorResponse(Errors error) { + return new FetchResponseData.PartitionData() + .setErrorCode(error.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) + .setRecords(MemoryRecords.EMPTY); } @Override @@ -1760,7 +1905,8 @@ protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup, joinGroupResult.getProtocolType(), joinGroupResult.getMemberId(), joinGroupResult.getLeaderId(), - members + members, + request.version() ); if (log.isTraceEnabled()) { log.trace("Sending join group response {} for correlation id {} to client {}.", diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index bae05304cf..0e70b96969 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -78,12 +78,12 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.service.Topic; @@ -304,7 +304,7 @@ protected ReadRecordsResult newObject(Handle handle) { private final Recycler.Handle recyclerHandle; private DecodeResult decodeResult; - private List abortedTransactions; + private List abortedTransactions; private long highWatermark; private long lastStableOffset; private Position lastPosition; @@ -321,7 +321,7 @@ public Errors errors() { } public static ReadRecordsResult get(DecodeResult decodeResult, - List abortedTransactions, + List abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, @@ -337,7 +337,7 @@ public static ReadRecordsResult get(DecodeResult decodeResult, } public static ReadRecordsResult get(DecodeResult decodeResult, - List abortedTransactions, + List abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, @@ -368,7 +368,7 @@ public static ReadRecordsResult error(Position position, Errors errors, Partitio partitionLog); } - public FetchResponse.PartitionData toPartitionData() { + public FetchResponseData.PartitionData toPartitionData() { // There are three cases: // @@ -379,21 +379,20 @@ public FetchResponse.PartitionData toPartitionData() { // 3. errors == Others error : // Get errors. if (errors != null) { - return new FetchResponse.PartitionData<>( - errors, - FetchResponse.INVALID_HIGHWATERMARK, - FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, - null, - MemoryRecords.EMPTY); + return new FetchResponseData.PartitionData() + .setErrorCode(errors.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) + .setRecords(MemoryRecords.EMPTY); } - return new FetchResponse.PartitionData<>( - Errors.NONE, - highWatermark, - lastStableOffset, - highWatermark, // TODO: should it be changed to the logStartOffset? - abortedTransactions, - decodeResult.getRecords()); + return new FetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code()) + .setHighWatermark(highWatermark) + .setLastStableOffset(lastStableOffset) + .setHighWatermark(highWatermark) // TODO: should it be changed to the logStartOffset? + .setAbortedTransactions(abortedTransactions) + .setRecords(decodeResult.getRecords()); } public void recycle() { @@ -460,7 +459,7 @@ public Optional firstUndecidedOffset() { return producerStateManager.firstUndecidedOffset(); } - public List getAbortedIndexList(long fetchOffset) { + public List getAbortedIndexList(long fetchOffset) { return producerStateManager.getAbortedIndexList(fetchOffset); } @@ -538,14 +537,14 @@ public Position getLastPosition() { return persistentTopic.getLastPosition(); } - public CompletableFuture readRecords(final FetchRequest.PartitionData partitionData, + public CompletableFuture readRecords(final FetchRequestData.FetchPartition partitionData, final boolean readCommitted, final AtomicLong limitBytes, final int maxReadEntriesNum, final MessageFetchContext context) { final long startPrepareMetadataNanos = MathUtils.nowInNano(); final CompletableFuture future = new CompletableFuture<>(); - final long offset = partitionData.fetchOffset; + final long offset = partitionData.fetchOffset(); KafkaTopicManager topicManager = context.getTopicManager(); // The future that is returned by getTopicConsumerManager is always completed normally topicManager.getTopicConsumerManager(fullPartitionName).thenAccept(tcm -> { @@ -593,7 +592,7 @@ public CompletableFuture readRecords(final FetchRequest.Parti requestStats.getPrepareMetadataStats().registerSuccessfulEvent( MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS); - long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get()); + long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get()); readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, fullPartitionName -> { topicManager.invalidateCacheForFencedManagerLedgerOnTopic(fullPartitionName); @@ -661,7 +660,7 @@ private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos) private void handleEntries(final CompletableFuture future, final List entries, - final FetchRequest.PartitionData partitionData, + final FetchRequestData.FetchPartition partitionData, final KafkaTopicConsumerManager tcm, final ManagedCursor cursor, final boolean readCommitted, @@ -707,9 +706,9 @@ private void handleEntries(final CompletableFuture future, // collect consumer metrics decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats); - List abortedTransactions = null; + List abortedTransactions = null; if (readCommitted) { - abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset); + abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset()); } if (log.isDebugEnabled()) { log.debug("Partition {} read entry completed in {} ns", @@ -1125,7 +1124,15 @@ public CompletableFuture fetchOldestAvailableIndexFromTopic() { // look for the first entry with data PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition); - managedLedger.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() { + fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition); + + return future; + + } + + private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture future, + ManagedLedgerImpl managedLedger, PositionImpl position) { + managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { try { @@ -1133,6 +1140,13 @@ public void readEntryComplete(Entry entry, Object ctx) { log.info("First offset for topic {} is {} - position {}", fullPartitionName, startOffset, entry.getPosition()); future.complete(startOffset); + } catch (MetadataCorruptedException.NoBrokerEntryMetadata noBrokerEntryMetadata) { + long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger); + log.info("Legacy entry for topic {} - position {} - returning current offset {}", + fullPartitionName, + entry.getPosition(), + currentOffset); + future.complete(currentOffset); } catch (Exception err) { future.completeExceptionally(err); } finally { @@ -1145,9 +1159,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { future.completeExceptionally(exception); } }, null); - - return future; - } @VisibleForTesting @@ -1180,7 +1191,8 @@ public CompletableFuture forcePurgeAbortTx() { public CompletableFuture recoverTxEntries( long offset, Executor executor) { - if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled() + || !MessageMetadataUtils.isInterceptorConfigured(persistentTopic.getManagedLedger())) { // no need to scan the topic, because transactions are disabled return CompletableFuture.completedFuture(Long.valueOf(0)); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index e2ac5eb9b9..8c6f21ed76 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -27,8 +27,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.requests.FetchResponse; /** * Producer state manager. @@ -355,13 +355,15 @@ public long purgeAbortedTxns(long offset) { return count.get(); } - public List getAbortedIndexList(long fetchOffset) { + public List getAbortedIndexList(long fetchOffset) { synchronized (abortedIndexList) { - List abortedTransactions = new ArrayList<>(); + List abortedTransactions = new ArrayList<>(); for (AbortedTxn abortedTxn : abortedIndexList) { if (abortedTxn.lastOffset() >= fetchOffset) { abortedTransactions.add( - new FetchResponse.AbortedTransaction(abortedTxn.producerId(), abortedTxn.firstOffset())); + new FetchResponseData.AbortedTransaction() + .setProducerId(abortedTxn.producerId()) + .setFirstOffset(abortedTxn.firstOffset())); } } return abortedTransactions; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 5c3521a7e8..4b22ed359b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -43,9 +43,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -236,7 +236,7 @@ public CompletableFuture> fe final long timeout, final int fetchMinBytes, final int fetchMaxBytes, - final ConcurrentHashMap fetchInfos, + final ConcurrentHashMap fetchInfos, final IsolationLevel isolationLevel, final MessageFetchContext context) { CompletableFuture> future = @@ -290,7 +290,7 @@ public CompletableFuture> re final boolean readCommitted, final int fetchMaxBytes, final int maxReadEntriesNum, - final Map readPartitionInfo, + final Map readPartitionInfo, final MessageFetchContext context) { AtomicLong limitBytes = new AtomicLong(fetchMaxBytes); CompletableFuture> resultFuture = new CompletableFuture<>(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index fa386b26f7..f20fc5251c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -41,6 +42,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -54,6 +56,7 @@ import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupResponse; @@ -68,6 +71,7 @@ import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.pulsar.common.schema.KeyValue; +@Slf4j public class KafkaResponseUtils { public static ApiVersionsResponse newApiVersions(List versionList) { @@ -184,22 +188,52 @@ public static DescribeGroupsResponse newDescribeGroups( return new DescribeGroupsResponse(data); } - public static FindCoordinatorResponse newFindCoordinator(Node node) { + public static FindCoordinatorResponse newFindCoordinator(List coordinatorKeys, + Node node, + int version) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); - data.setNodeId(node.id()); - data.setHost(node.host()); - data.setPort(node.port()); - data.setErrorCode(Errors.NONE.code()); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) { + data.setErrorMessage(Errors.NONE.message()) + .setErrorCode(Errors.NONE.code()) + .setPort(node.port()) + .setHost(node.host()) + .setNodeId(node.id()); + } else { + // for new clients + data.setCoordinators(coordinatorKeys + .stream() + .map(key -> new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()) + .setHost(node.host()) + .setPort(node.port()) + .setNodeId(node.id()) + .setKey(key)) + .collect(Collectors.toList())); + } + return new FindCoordinatorResponse(data); } - public static FindCoordinatorResponse newFindCoordinator(Errors errors) { + public static FindCoordinatorResponse newFindCoordinator(List coordinators, + int version) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); - data.setErrorCode(errors.code()); - data.setErrorMessage(errors.message()); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) { + FindCoordinatorResponseData.Coordinator coordinator = coordinators.get(0); + data.setErrorMessage(coordinator.errorMessage()) + .setErrorCode(coordinator.errorCode()) + .setPort(coordinator.port()) + .setHost(coordinator.host()) + .setNodeId(coordinator.nodeId()); + } else { + // for new clients + data.setCoordinators(coordinators); + } + return new FindCoordinatorResponse(data); } + public static HeartbeatResponse newHeartbeat(Errors errors) { HeartbeatResponseData data = new HeartbeatResponseData(); data.setErrorCode(errors.code()); @@ -212,7 +246,8 @@ public static JoinGroupResponse newJoinGroup(Errors errors, String groupProtocolType, String memberId, String leaderId, - Map groupMembers) { + Map groupMembers, + short requestVersion) { JoinGroupResponseData data = new JoinGroupResponseData() .setErrorCode(errors.code()) .setLeader(leaderId) @@ -234,7 +269,7 @@ public static JoinGroupResponse newJoinGroup(Errors errors, data.setThrottleTimeMs(1000); } - return new JoinGroupResponse(data); + return new JoinGroupResponse(data, requestVersion); } public static LeaveGroupResponse newLeaveGroup(Errors errors) { @@ -347,6 +382,7 @@ public static MetadataResponse newMetadata(List nodes, return new MetadataResponse(data, apiVersion); } + @Getter @AllArgsConstructor public static class BrokerLookupResult { @@ -435,4 +471,59 @@ public static SyncGroupResponse newSyncGroup(Errors errors, data.setAssignment(assignment); return new SyncGroupResponse(data); } + + @AllArgsConstructor + public static class OffsetFetchResponseGroupData { + String groupId; + Errors errors; + Map partitionsResponses; + } + + public static OffsetFetchResponse buildOffsetFetchResponse( + List groups, + int version) { + + if (version < 8) { + // old clients + OffsetFetchResponseGroupData offsetFetchResponseGroupData = groups.get(0); + return new OffsetFetchResponse(offsetFetchResponseGroupData.errors, + offsetFetchResponseGroupData.partitionsResponses); + } else { + // new clients + OffsetFetchResponseData data = new OffsetFetchResponseData(); + for (OffsetFetchResponseGroupData groupData : groups) { + OffsetFetchResponseData.OffsetFetchResponseGroup offsetFetchResponseGroup = + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setErrorCode(groupData.errors.code()) + .setGroupId(groupData.groupId) + .setTopics(new ArrayList<>()); + data.groups().add(offsetFetchResponseGroup); + Set topics = groupData.partitionsResponses.keySet().stream().map(TopicPartition::topic) + .collect(Collectors.toSet()); + topics.forEach(topic -> { + offsetFetchResponseGroup.topics().add(new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(topic) + .setPartitions(groupData.partitionsResponses.entrySet() + .stream() + .filter(e -> e.getKey().topic().equals(topic)) + .map(entry -> { + OffsetFetchResponse.PartitionData value = entry.getValue(); + if (log.isDebugEnabled()) { + log.debug("Add resp for group {} topic {}: {}", + groupData.groupId, topic, value); + } + return new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setErrorCode(value.error.code()) + .setMetadata(value.metadata) + .setPartitionIndex(entry.getKey().partition()) + .setCommittedOffset(value.offset) + .setCommittedLeaderEpoch(value.leaderEpoch.orElse(-1)); + }) + .collect(Collectors.toList()))); + }); + } + return new OffsetFetchResponse(data); + } + + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java index 7940f063ad..52bb5a3b1f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java @@ -38,6 +38,10 @@ @Slf4j public class MessageMetadataUtils { + public static boolean isInterceptorConfigured(ManagedLedger managedLedger) { + return managedLedger.getManagedLedgerInterceptor() instanceof ManagedLedgerInterceptorImpl; + } + public static long getCurrentOffset(ManagedLedger managedLedger) { return ((ManagedLedgerInterceptorImpl) managedLedger.getManagedLedgerInterceptor()).getIndex(); } diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java index 21cb50eb95..038db82591 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java @@ -56,4 +56,9 @@ public int throttleTimeMs() { public ApiMessage data() { return abstractResponse.data(); } + + @Override + public void maybeSetThrottleTimeMs(int i) { + abstractResponse.maybeSetThrottleTimeMs(i); + } } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java index 089b6e2baa..b4d2edf3fa 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java @@ -342,15 +342,15 @@ public MineMemoryRecordsBuilder(ByteBufferOutputStream bufferStream, } @Override - public Long appendWithOffset(long offset, SimpleRecord record) { - return appendWithOffset(offset, + public void appendWithOffset(long offset, SimpleRecord record) { + appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers()); } - public Long appendWithOffset(long offset, + public void appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, @@ -362,9 +362,9 @@ public Long appendWithOffset(long offset, if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); - return null; + } else { - return appendLegacyRecord(offset, timestamp, key, value); + appendLegacyRecord(offset, timestamp, key, value); } } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); diff --git a/pom.xml b/pom.xml index ec21c54cc0..1a244b8dc9 100644 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,7 @@ 2.14.0 2.13.4.1 - 2.8.2 + 3.4.0 2.17.1 1.18.24 2.22.0 @@ -95,7 +95,7 @@ scm:git:git@github.com:datastax/starlight-for-kafka.git scm:git:git@github.com:datastax/starlight-for-kafka.git https://github.com/datastax/starlight-for-kafka - v2.8.0.1.0.17 + HEAD diff --git a/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java b/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java index 6d60ab50e2..16f1e6b785 100644 --- a/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java +++ b/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java @@ -45,6 +45,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -79,7 +80,8 @@ import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.EndTxnRequestData; -import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -91,6 +93,7 @@ import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; @@ -122,8 +125,7 @@ import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.FindCoordinatorRequest; -import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest;; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.JoinGroupRequest; @@ -137,6 +139,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetFetchRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; @@ -152,6 +155,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.jetbrains.annotations.NotNull; /** * This class contains all the request handling methods. @@ -569,8 +573,17 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, } resultFuture.complete(new ProduceResponse(responseMap)); }; - BiConsumer addPartitionResponse = (topicPartition, response) -> { - + BiConsumer addPartitionResponse = + (topicPartition, partitionProduceResponse) -> { + PartitionResponse response = new PartitionResponse( + Errors.forCode(partitionProduceResponse.errorCode()), + partitionProduceResponse.baseOffset(), + partitionProduceResponse.logAppendTimeMs(), + partitionProduceResponse.logStartOffset(), + partitionProduceResponse.recordErrors().stream().map(b-> + new ProduceResponse.RecordError(b.batchIndex(), b.batchIndexErrorMessage())) + .collect(Collectors.toList()), + partitionProduceResponse.errorMessage()); responseMap.put(topicPartition, response); // reset topicPartitionNum int restTopicPartitionNum = topicPartitionNum.decrementAndGet(); @@ -640,16 +653,22 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, .forwardRequest(singlePartitionRequest) .thenAccept(response -> { ProduceResponse resp = (ProduceResponse) response; - resp.responses().forEach((topicPartition, partitionResponse) -> { - if (partitionResponse.error == Errors.NONE) { - log.debug("result produce for {} to {} {}", topicPartition, - kopBroker, partitionResponse); - addPartitionResponse.accept(topicPartition, partitionResponse); - } else { - invalidateLeaderIfNeeded(namespacePrefix, kopBroker, topicPartition, - partitionResponse.error); - addPartitionResponse.accept(topicPartition, partitionResponse); - } + ProduceResponseData produceRespData = resp.data(); + produceRespData.responses().forEach(topicProduceResponse -> { + topicProduceResponse.partitionResponses().forEach(partitionResponse -> { + TopicPartition topicPartition = new TopicPartition(topicProduceResponse.name(), + partitionResponse.index()); + if (partitionResponse.errorCode() == Errors.NONE.code()) { + log.debug("result produce for {} to {} {}", topicPartition, + kopBroker, partitionResponse); + addPartitionResponse.accept(topicPartition, partitionResponse); + } else { + invalidateLeaderIfNeeded(namespacePrefix, kopBroker, topicPartition, + partitionResponse.errorCode()); + addPartitionResponse.accept(topicPartition, partitionResponse); + } + }); + }); }).exceptionally(badError -> { log.error("bad error during split produce for {}", @@ -660,7 +679,9 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, topic.partitionData().forEach(partitionProduceData -> { addPartitionResponse.accept( new TopicPartition(topic.name(), partitionProduceData.index()), - new PartitionResponse(Errors.REQUEST_TIMED_OUT)); + new ProduceResponseData.PartitionProduceResponse() + .setErrorCode(Errors.REQUEST_TIMED_OUT.code()) + .setErrorMessage(Errors.REQUEST_TIMED_OUT.message())); }); }); return null; @@ -671,10 +692,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, } } - private void invalidateLeaderIfNeeded(String namespacePrefix, Node kopBroker, TopicPartition topicPartition, - Errors error) { - invalidateLeaderIfNeeded(namespacePrefix, kopBroker, topicPartition, error.code()); - } private void invalidateLeaderIfNeeded(String namespacePrefix, Node kopBroker, TopicPartition topicPartition, short error) { if (error == Errors.NOT_LEADER_OR_FOLLOWER.code()) { @@ -696,20 +713,28 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, CompletableFuture resultFuture) { checkArgument(fetch.getRequest() instanceof FetchRequest); FetchRequest fetchRequest = (FetchRequest) fetch.getRequest(); - Map fetchData = fetchRequest.fetchData(); + FetchRequestData data = fetchRequest.data(); - final int numPartitions = fetchData.size(); + int numPartitions = data.topics().stream().mapToInt(topic -> topic.partitions().size()).sum(); if (numPartitions == 0) { - resultFuture.complete(new FetchResponse(Errors.NONE, new LinkedHashMap<>(), 0, - fetchRequest.metadata().sessionId())); + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setSessionId(fetchRequest.metadata().sessionId()) + .setResponses(new ArrayList<>()))); return; } String namespacePrefix = currentNamespacePrefix(); - Map> responseMap = new ConcurrentHashMap<>(); - final AtomicInteger topicPartitionNum = new AtomicInteger(fetchData.size()); + Map responseMap = new ConcurrentHashMap<>(); + final AtomicInteger topicPartitionNum = new AtomicInteger(numPartitions); final String metadataNamespace = kafkaConfig.getKafkaMetadataNamespace(); - + Map fetchData = new HashMap<>(); + data.topics().forEach(fetchTopic -> { + fetchTopic.partitions().forEach(fetchPartition -> { + TopicPartition topicPartition = new TopicPartition(fetchTopic.topic(), fetchPartition.partition()); + fetchData.put(topicPartition, fetchPartition); + }); + }); // validate system topics for (TopicPartition topicPartition : fetchData.keySet()) { final String fullPartitionName = KopTopic.toString(topicPartition, namespacePrefix); @@ -717,16 +742,9 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, if (KopTopic.isInternalTopic(metadataNamespace, fullPartitionName)) { log.error("[{}] Request {}: not support fetch message to inner topic. topic: {}", ctx.channel(), fetch.getHeader(), topicPartition); - Map> errorsMap = - fetchData - .keySet() - .stream() - .collect(Collectors.toMap(Function.identity(), - p -> new FetchResponse.PartitionData(Errors.INVALID_TOPIC_EXCEPTION, - 0, 0, 0, - null, MemoryRecords.EMPTY))); - resultFuture.complete(new FetchResponse(Errors.INVALID_TOPIC_EXCEPTION, - new LinkedHashMap<>(errorsMap), 0, fetchRequest.metadata().sessionId())); + FetchResponse fetchResponse = buildFetchErrorResponse(fetchRequest, + fetchData, Errors.INVALID_TOPIC_EXCEPTION); + resultFuture.complete(fetchResponse); return; } } @@ -746,16 +764,9 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, .whenComplete((result, error) -> { // TODO: report errors for specific partitions and continue for non failed lookups if (error != null) { - Map> errorsMap = - fetchData - .keySet() - .stream() - .collect(Collectors.toMap(Function.identity(), - p -> new FetchResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, - 0, 0, 0, - null, MemoryRecords.EMPTY))); - resultFuture.complete(new FetchResponse(Errors.UNKNOWN_SERVER_ERROR, - new LinkedHashMap<>(errorsMap), 0, fetchRequest.metadata().sessionId())); + FetchResponse fetchResponse = buildFetchErrorResponse(fetchRequest, + fetchData, Errors.UNKNOWN_SERVER_ERROR); + resultFuture.complete(fetchResponse); } else { boolean multipleBrokers = false; @@ -781,18 +792,9 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, resultFuture.complete(response); }).exceptionally(badError -> { log.error("bad error for FULL fetch", badError); - Map> errorsMap = - fetchData - .keySet() - .stream() - .collect(Collectors.toMap(Function.identity(), - p -> new FetchResponse.PartitionData( - Errors.UNKNOWN_SERVER_ERROR, - 0, 0, 0, - null, MemoryRecords.EMPTY))); - resultFuture.complete(new FetchResponse(Errors.UNKNOWN_SERVER_ERROR, - new LinkedHashMap<>(errorsMap), 0, - fetchRequest.metadata().sessionId())); + FetchResponse fetchResponse = buildFetchErrorResponse(fetchRequest, + fetchData, Errors.UNKNOWN_SERVER_ERROR); + resultFuture.complete(fetchResponse); return null; }); } else { @@ -812,21 +814,22 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, fetchData.keySet().forEach(topicPartition -> { if (!responseMap.containsKey(topicPartition)) { responseMap.put(topicPartition, - new FetchResponse.PartitionData(Errors.UNKNOWN_SERVER_ERROR, - 0, 0, 0, - null, MemoryRecords.EMPTY)); + getFetchPartitionDataWithError(Errors.UNKNOWN_SERVER_ERROR)); } }); if (log.isDebugEnabled()) { log.debug("[{}] Request {}: Complete handle fetch.", ctx.channel(), fetch.toString()); } - final LinkedHashMap> responseMapRaw = + final LinkedHashMap responseMapRaw = new LinkedHashMap<>(responseMap); - resultFuture.complete(new FetchResponse(Errors.NONE, - responseMapRaw, 0, fetchRequest.metadata().sessionId())); + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setResponses(KafkaRequestHandler.buildFetchResponses(responseMapRaw)) + .setErrorCode(Errors.NONE.code()) + .setSessionId(fetchRequest.metadata().sessionId()) + .setThrottleTimeMs(0))); }; - BiConsumer addFetchPartitionResponse = + BiConsumer addFetchPartitionResponse = (topicPartition, response) -> { responseMap.put(topicPartition, response); @@ -843,15 +846,14 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, } }; - final BiConsumer resultConsumer = - (topicPartition, data) -> addFetchPartitionResponse.accept( - topicPartition, data); + final BiConsumer resultConsumer = + (topicPartition, pdata) -> addFetchPartitionResponse.accept( + topicPartition, pdata); final BiConsumer errorsConsumer = (topicPartition, errors) -> addFetchPartitionResponse.accept(topicPartition, - new FetchResponse.PartitionData(errors, 0, 0, 0, - null, MemoryRecords.EMPTY)); + getFetchPartitionDataWithError(errors)); - Map> requestsByBroker = + Map> requestsByBroker = new HashMap<>(); fetchData.forEach((topicPartition, partitionData) -> { @@ -859,7 +861,7 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, KafkaResponseUtils.BrokerLookupResult topicMetadata = brokers.get(fullPartitionName); Node kopBroker = topicMetadata.node; - Map requestForSinglePartition = + Map requestForSinglePartition = requestsByBroker.computeIfAbsent(kopBroker, ___ -> new HashMap<>()); requestForSinglePartition.put(topicPartition, partitionData); }); @@ -872,10 +874,23 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, fetch.getHeader().clientId(), dummyCorrelationId ); + Map partitionDataMap = + requestsForBroker.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> new FetchRequest.PartitionData( + null, + entry.getValue().fetchOffset(), + entry.getValue().logStartOffset(), + entry.getValue().partitionMaxBytes(), + Optional.empty() + ) + )); FetchRequest requestForSingleBroker = FetchRequest.Builder - .forConsumer(((FetchRequest) fetch.getRequest()).maxWait(), + .forConsumer(fetch.getRequest().version(), + ((FetchRequest) fetch.getRequest()).maxWait(), ((FetchRequest) fetch.getRequest()).minBytes(), - requestsForBroker) + partitionDataMap) .build(); ByteBuf buffer = KopResponseUtils.serializeRequest(header, requestForSingleBroker); KafkaHeaderAndRequest singlePartitionRequest = new KafkaHeaderAndRequest( @@ -887,32 +902,36 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, buffer.release(); if (log.isDebugEnabled()) { - log.debug("forward fetch for {} to {}", requestForSingleBroker.fetchData().keySet(), + log.debug("forward fetch for {} to {}", requestForSingleBroker.data().topics(), kopBroker); } grabConnectionToBroker(kopBroker.host(), kopBroker.port()) .forwardRequest(singlePartitionRequest) .thenAccept(response -> { - FetchResponse resp = (FetchResponse) response; - resp.responseData() - .forEach((topicPartition, partitionResponse) -> { - invalidateLeaderIfNeeded(namespacePrefix, kopBroker, - topicPartition, partitionResponse.error()); - if (log.isDebugEnabled()) { - final String fullPartitionName = - KopTopic.toString(topicPartition, - namespacePrefix); - log.debug("result fetch for {} to {} {}", fullPartitionName, - kopBroker, - partitionResponse); - } - addFetchPartitionResponse.accept(topicPartition, + FetchResponse resp = (FetchResponse) response; + resp.data().responses().stream().forEach(fetchableTopicResponse -> { + fetchableTopicResponse.partitions().forEach(partitionResponse -> { + TopicPartition topicPartition = new TopicPartition( + fetchableTopicResponse.topic(), + partitionResponse.partitionIndex()); + invalidateLeaderIfNeeded(namespacePrefix, kopBroker, + topicPartition, partitionResponse.errorCode()); + if (log.isDebugEnabled()) { + final String fullPartitionName = + KopTopic.toString(topicPartition, + namespacePrefix); + log.debug("result fetch for {} to {} {}", fullPartitionName, + kopBroker, partitionResponse); - }); + } + addFetchPartitionResponse.accept(topicPartition, + partitionResponse); + }); + }); }).exceptionally(badError -> { log.error("bad error while fetching for {} from {}", - requestForSingleBroker.fetchData().keySet(), badError, kopBroker); - requestForSingleBroker.fetchData().keySet().forEach(topicPartition -> + fetchData.keySet(), badError, kopBroker); + fetchData.keySet().forEach(topicPartition -> errorsConsumer.accept(topicPartition, Errors.UNKNOWN_SERVER_ERROR) ); return null; @@ -925,6 +944,32 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, }); } + @NotNull + private static FetchResponse buildFetchErrorResponse(FetchRequest fetchRequest, + Map fetchData, + Errors finalError) { + Map errorsMap = + fetchData + .keySet() + .stream() + .collect(Collectors.toMap(Function.identity(), + p -> getFetchPartitionDataWithError(finalError))); + FetchResponse fetchResponse = new FetchResponse(new FetchResponseData() + .setErrorCode(finalError.code()) + .setResponses(KafkaRequestHandler.buildFetchResponses(errorsMap)) + .setSessionId(fetchRequest.metadata().sessionId())); + return fetchResponse; + } + + private static FetchResponseData.PartitionData getFetchPartitionDataWithError(Errors finalError) { + return new FetchResponseData.PartitionData() + .setErrorCode(finalError.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) + .setRecords(MemoryRecords.EMPTY); + } + private CompletableFuture findCoordinator( FindCoordinatorRequest.CoordinatorType type, String key) { String pulsarTopicName = computePulsarTopicName(type, key); @@ -946,6 +991,17 @@ private CompletableFuture findCoordinator return admin.namespaces().createNamespaceAsync(nameSpace); } }) + .handle((Void v, Throwable error) -> { + if (error != null) { + if (error.getCause() instanceof PulsarAdminException.ConflictException) { + // concurrent creation of namespace + return CompletableFuture.completedFuture(null); + } else { + throw new CompletionException(error); + } + } + return CompletableFuture.completedFuture(null); + }) .thenCompose(___ -> { coordinatorNamespaceExists = true; return findBroker(TopicName.get(pulsarTopicName), true); @@ -980,12 +1036,11 @@ private String computePulsarTopicName(FindCoordinatorRequest.CoordinatorType typ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator, CompletableFuture resultFuture) { Node node = newSelfNode(); - AbstractResponse response = new FindCoordinatorResponse( - new FindCoordinatorResponseData() - .setErrorCode(Errors.NONE.code()) - .setHost(node.host()) - .setPort(node.port()) - .setNodeId(node.id())); + FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + List coordinatorKeys = request.version() < FindCoordinatorRequest.MIN_BATCHED_VERSION + ? Collections.singletonList(request.data().key()) : request.data().coordinatorKeys(); + AbstractResponse response = + KafkaResponseUtils.newFindCoordinator(coordinatorKeys, node, request.version()); resultFuture.complete(response); } @@ -1297,9 +1352,6 @@ protected void handleDeleteRecords(KafkaHeaderAndRequest deleteRecords, } }; - final BiConsumer resultConsumer = - (topicPartition, data2) -> addDeletePartitionResponse.accept( - topicPartition, data2); final BiConsumer errorsConsumer = (topicPartition, errors) -> addDeletePartitionResponse.accept(topicPartition, errors); @@ -1756,8 +1808,10 @@ protected void handleListOffsetRequestV1(KafkaHeaderAndRequest listOffset, .setCurrentLeaderEpoch(listOffsetsPartition.currentLeaderEpoch()) .setMaxNumOffsets(listOffsetsPartition.maxNumOffsets())); + // see "forConsumer" implementation + boolean requireMaxTimestamp = request.version() >= 7; ListOffsetsRequest requestForSinglePartition = ListOffsetsRequest.Builder - .forConsumer(false, request.isolationLevel()) + .forConsumer(false, request.isolationLevel(), requireMaxTimestamp) .setTargetTimes(Collections.singletonList(tsData)) .build(request.version()); @@ -1795,11 +1849,93 @@ protected void handleListOffsetRequestV1(KafkaHeaderAndRequest listOffset, @Override protected void handleOffsetFetchRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture resultFuture) { - handleRequestWithCoordinator(kafkaHeaderAndRequest, resultFuture, FindCoordinatorRequest.CoordinatorType.GROUP, - OffsetFetchRequest.class, - OffsetFetchRequestData.class, - OffsetFetchRequestData::groupId, - null); + String singleGroupId; + OffsetFetchRequest offsetFetchRequest = (OffsetFetchRequest) kafkaHeaderAndRequest.getRequest(); + if (kafkaHeaderAndRequest.getRequest().version() < 8) { + // old version + singleGroupId = offsetFetchRequest.groupId(); + } else if (offsetFetchRequest.groupIds().size() == 1) { + // common case, when can simply forward the request to the coordinator + singleGroupId = offsetFetchRequest.groupIds().get(0); + } else { + // KIP-709 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258 + singleGroupId = null; + } + + if (singleGroupId != null) { + // most common case (non KIP-709) or single group + handleRequestWithCoordinator(kafkaHeaderAndRequest, resultFuture, + FindCoordinatorRequest.CoordinatorType.GROUP, + OffsetFetchRequest.class, + OffsetFetchRequestData.class, + data -> singleGroupId, + null); + } else { + List> responses = new ArrayList<>(); + for (OffsetFetchRequestData.OffsetFetchRequestGroup group : offsetFetchRequest.data().groups()) { + + Map> map = new HashMap<>(); + List partitions = new ArrayList<>(); + if (group.topics() != null) { + group.topics().forEach(t -> { + if (t.partitionIndexes() != null) { + List topicPartitions = t.partitionIndexes().stream() + .map(p -> new TopicPartition(t.name(), p)) + .collect(Collectors.toList()); + partitions.addAll(topicPartitions); + } + }); + } + // null means "all partitions" + map.put(group.groupId(), partitions.isEmpty() ? null : partitions); + OffsetFetchRequest singleGroupRequest = new OffsetFetchRequest.Builder(map, + offsetFetchRequest.requireStable(), + false) + .build(offsetFetchRequest.version()); + int dummyCorrelationId = getDummyCorrelationId(); + RequestHeader header = new RequestHeader( + kafkaHeaderAndRequest.getHeader().apiKey(), + kafkaHeaderAndRequest.getHeader().apiVersion(), + kafkaHeaderAndRequest.getHeader().clientId(), + dummyCorrelationId + ); + ByteBuf buffer = KopResponseUtils.serializeRequest(header, singleGroupRequest); + KafkaHeaderAndRequest requestWithNewHeader = new KafkaHeaderAndRequest( + header, + singleGroupRequest, + buffer, + null + ); + CompletableFuture singleResultFuture = new CompletableFuture<>(); + responses.add(singleResultFuture); + handleRequestWithCoordinator(requestWithNewHeader, singleResultFuture, + FindCoordinatorRequest.CoordinatorType.GROUP, + OffsetFetchRequest.class, + OffsetFetchRequestData.class, + data1 -> group.groupId(), + null); + } + FutureUtil.waitForAll(responses).whenComplete((ignore, ex) -> { + kafkaHeaderAndRequest.close(); + if (ex != null) { + log.error("Internal error when handling offset fetch request", ex); + resultFuture.completeExceptionally(ex); + } else { + OffsetFetchResponseData responseData = new OffsetFetchResponseData() + .setGroups(new ArrayList<>()) + .setTopics(new ArrayList<>()) + .setErrorCode(Errors.NONE.code()); + responses.forEach(future -> { + OffsetFetchResponse response = (OffsetFetchResponse) future.join(); + log.info("adding response {}", response.data()); + // here we have only request.version >= 8 + responseData.groups().addAll(response.data().groups()); + + }); + resultFuture.complete(new OffsetFetchResponse(responseData)); + } + }); + } } @Override @@ -1827,10 +1963,14 @@ R extends AbstractResponse> void handleRequestWithCoordinator( BiFunction errorBuilder; if (customErrorBuilder == null) { errorBuilder = (K request, Throwable t) -> { + while (t instanceof CompletionException && t.getCause() != null) { + t = t.getCause(); + } if (t instanceof IOException || t.getCause() instanceof IOException) { t = new CoordinatorNotAvailableException("Network error: " + t, t); } + log.debug("Unexpected error", t); return (R) request.getErrorResponse(t); }; } else { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index fd1032cbb0..4d2e60a3bf 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -79,6 +79,7 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setOffsetsTopicNumPartitions(offsetsTopicNumPartitions); kConfig.setAdvertisedAddress("localhost"); + kConfig.setKafkaTransactionCoordinatorEnabled(true); kConfig.setClusterName(configClusterName); kConfig.setManagedLedgerCacheSizeMB(8); kConfig.setActiveConsumerFailoverDelayTimeMillis(0); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java index 35efa41895..165726969a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java @@ -92,7 +92,7 @@ public void testPublishTime() throws Exception { // time before first message ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, startTime)); KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 5653f55f5c..b13831992f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -71,10 +71,12 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.ProduceRequestData; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; @@ -107,6 +109,7 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.jetbrains.annotations.NotNull; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -283,7 +286,7 @@ public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws E // 2. real test, for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, EARLIEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -336,7 +339,7 @@ public void testConsumerListOffsetLatest() throws Exception { // 2. real test, for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, ListOffsetsRequest.LATEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -570,7 +573,7 @@ public void testConsumerListOffset() throws Exception { private ListOffsetsResponse listOffset(long timestamp, TopicPartition tp) throws Exception { ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, timestamp)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -584,21 +587,24 @@ private ListOffsetsResponse listOffset(long timestamp, TopicPartition tp) throws /// Add test for FetchRequest private void checkFetchResponse(List expectedPartitions, - FetchResponse fetchResponse, + FetchResponse fetchResponse, int maxPartitionBytes, int maxResponseBytes, int numMessagesPerPartition) { - assertEquals(expectedPartitions.size(), fetchResponse.responseData().size()); - expectedPartitions.forEach(tp -> assertTrue(fetchResponse.responseData().get(tp) != null)); + assertEquals(expectedPartitions.size(), fetchResponse + .data().responses().stream().mapToInt(r->r.partitions().size())); + expectedPartitions.forEach(tp + -> assertTrue(getPartitionDataFromFetchResponse(fetchResponse, tp) != null)); final AtomicBoolean emptyResponseSeen = new AtomicBoolean(false); AtomicInteger responseSize = new AtomicInteger(0); AtomicInteger responseBufferSize = new AtomicInteger(0); expectedPartitions.forEach(tp -> { - FetchResponse.PartitionData partitionData = fetchResponse.responseData().get(tp); - assertEquals(Errors.NONE, partitionData.error()); + FetchResponseData.PartitionData partitionData = getPartitionDataFromFetchResponse(fetchResponse, tp); + + assertEquals(Errors.NONE.code(), partitionData.errorCode()); assertTrue(partitionData.highWatermark() > 0); MemoryRecords records = (MemoryRecords) partitionData.records(); @@ -629,6 +635,18 @@ private void checkFetchResponse(List expectedPartitions, // In Kop implementation, fetch at least 1 item for each topicPartition in the request. } + @NotNull + private static FetchResponseData.PartitionData getPartitionDataFromFetchResponse(FetchResponse fetchResponse, + TopicPartition tp) { + FetchResponseData.PartitionData partitionData = fetchResponse + .data() + .responses().stream().filter(t->t.topic().equals(tp.topic())) + .flatMap(r-> r.partitions().stream()) + .filter(p -> p.partitionIndex() == tp.partition()) + .findFirst().orElse(null); + return partitionData; + } + private Map createPartitionMap(int maxPartitionBytes, List topicPartitions, Map offsetMap) { @@ -647,7 +665,8 @@ private KafkaHeaderAndRequest createFetchRequest(int maxResponseBytes, Map offsetMap) { AbstractRequest.Builder builder = FetchRequest.Builder - .forConsumer(Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) + .forConsumer(ApiKeys.FETCH.latestVersion(), + Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) .setMaxBytes(maxResponseBytes); return buildRequest(builder); @@ -797,8 +816,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { Collections.EMPTY_MAP); CompletableFuture responseFuture1 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest1, responseFuture1); - FetchResponse fetchResponse1 = - (FetchResponse) responseFuture1.get(); + FetchResponse fetchResponse1 = + (FetchResponse) responseFuture1.get(); checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -816,8 +835,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { Collections.EMPTY_MAP); CompletableFuture responseFuture2 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest2, responseFuture2); - FetchResponse fetchResponse2 = - (FetchResponse) responseFuture2.get(); + FetchResponse fetchResponse2 = + (FetchResponse) responseFuture2.get(); checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -838,8 +857,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { offsetMaps); CompletableFuture responseFuture3 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest3, responseFuture3); - FetchResponse fetchResponse3 = - (FetchResponse) responseFuture3.get(); + FetchResponse fetchResponse3 = + (FetchResponse) responseFuture3.get(); checkFetchResponse(shuffledTopicPartitions3, fetchResponse3, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -983,7 +1002,7 @@ public void testGetOffsetsForUnknownTopic() throws Exception { TopicPartition tp = new TopicPartition(topicName, 0); ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, ListOffsetsRequest.LATEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -1077,11 +1096,17 @@ private void verifySendMessageToPartition(final TopicPartition topicPartition, ApiKeys.PRODUCE.latestVersion(), produceRequestData)); final CompletableFuture future = new CompletableFuture<>(); kafkaRequestHandler.handleProduceRequest(request, future); - final ProduceResponse.PartitionResponse response = - ((ProduceResponse) future.get()).responses().get(topicPartition); + final ProduceResponseData.PartitionProduceResponse response = + ((ProduceResponse) future.get()).data().responses() + .stream() + .filter(r->r.name().equals(topicPartition.topic())) + .flatMap(r->r.partitionResponses().stream()) + .filter(p->p.index() == topicPartition.partition()) + .findFirst() + .get(); assertNotNull(response); - assertEquals(response.error, expectedError); - assertEquals(response.baseOffset, expectedOffset); + assertEquals(response.errorCode(), expectedError.code()); + assertEquals(response.baseOffset(), expectedOffset); } private static MemoryRecords newIdempotentRecords( @@ -1171,10 +1196,13 @@ public void testFetchMinBytesSingleConsumer() throws Exception { final int minBytes = 1; @Cleanup - final KafkaHeaderAndRequest request = buildRequest(FetchRequest.Builder.forConsumer(maxWaitMs, minBytes, - Collections.singletonMap(topicPartition, new FetchRequest.PartitionData( + final KafkaHeaderAndRequest request = buildRequest(FetchRequest.Builder + .forConsumer(ApiKeys.FETCH.oldestVersion(), + maxWaitMs, minBytes, + Collections.singletonMap(topicPartition, new FetchRequest.PartitionData(null, 0L, -1L, 1024 * 1024, Optional.empty() )))); + final CompletableFuture future = new CompletableFuture<>(); final long startTime = System.currentTimeMillis(); kafkaRequestHandler.handleFetchRequest(request, future); @@ -1186,11 +1214,10 @@ public void testFetchMinBytesSingleConsumer() throws Exception { AbstractResponse abstractResponse = ((ResponseCallbackWrapper) future.get(maxWaitMs + 1000, TimeUnit.MILLISECONDS)).getResponse(); assertTrue(abstractResponse instanceof FetchResponse); - final FetchResponse response = (FetchResponse) abstractResponse; + final FetchResponse response = (FetchResponse) abstractResponse; assertEquals(response.error(), Errors.NONE); final long endTime = System.currentTimeMillis(); - log.info("Take {} ms to process FETCH request, record count: {}", - endTime - startTime, response.responseData().size()); + log.info("Take {} ms to process FETCH request", endTime - startTime); assertTrue(endTime - startTime <= maxWaitMs); Long waitingFetchesTriggered = kafkaRequestHandler.getRequestStats().getWaitingFetchesTriggered().get(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java index b737bfc81b..e06304e894 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop; +import static org.testng.Assert.assertEquals; + import io.netty.buffer.ByteBuf; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -48,7 +50,7 @@ public static List newListOffsetTargetT public static FetchRequest.PartitionData newFetchRequestPartitionData(long fetchOffset, long logStartOffset, int maxBytes) { - return new FetchRequest.PartitionData(fetchOffset, + return new FetchRequest.PartitionData(null, fetchOffset, logStartOffset, maxBytes, Optional.empty() @@ -110,7 +112,12 @@ public static ListOffsetsResponseData.ListOffsetsPartitionResponse getListOffset public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, SocketAddress serviceAddress) { - AbstractRequest request = builder.build(builder.apiKey().latestVersion()); + return buildRequest(builder, serviceAddress, builder.latestAllowedVersion()); + } + public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, + SocketAddress serviceAddress, short version) { + AbstractRequest request = builder.build(version); + assertEquals(version, request.version()); RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerProxyTest.java index 9eba64b5f5..6a777bc3aa 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerProxyTest.java @@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; /** * Unit test for {@link KafkaRequestHandler} but via Proxy. @@ -40,4 +41,9 @@ protected void cleanup() throws Exception { protected int getClientPort() { return getKafkaProxyPort(); } + + @Test(timeOut = 20000000) + public void testDescribeConsumerGroups() throws Exception { + super.testDescribeConsumerGroups(); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 4f3df613eb..8da26fe0c6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -45,6 +45,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -67,6 +68,8 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; @@ -756,7 +759,7 @@ public void testListOffsetsForNotExistedTopic() throws Exception { final RequestHeader header = new RequestHeader(ApiKeys.LIST_OFFSETS, ApiKeys.LIST_OFFSETS.latestVersion(), "client", 0); final ListOffsetsRequest request = - ListOffsetsRequest.Builder.forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + ListOffsetsRequest.Builder.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP)) .build(ApiKeys.LIST_OFFSETS.latestVersion()); @@ -878,12 +881,13 @@ public void testNonEmptyReplacingIndex() { replacedMap.forEach(((topicPartition, s) -> assertEquals(tp0, topicPartition))); } - @Test(timeOut = 20000) + @Test(timeOut = 20000000) public void testDescribeConsumerGroups() throws Exception { final String topic = "test-describe-group-offset"; final int numMessages = 10; final String messagePrefix = "msg-"; - final String group = "test-group"; + final String group1 = "test-group"; + final String group2 = "test-group-2"; admin.topics().createPartitionedTopic(topic, 1); @@ -894,9 +898,14 @@ public void testDescribeConsumerGroups() throws Exception { } producer.close(); - KafkaConsumer consumer = new KafkaConsumer<>(newKafkaConsumerProperties(group)); + KafkaConsumer consumer = new KafkaConsumer<>(newKafkaConsumerProperties(group1)); consumer.subscribe(Collections.singleton(topic)); + KafkaConsumer consumer2 = new KafkaConsumer<>(newKafkaConsumerProperties(group2)); + consumer2.subscribe(Collections.singleton(topic)); + KafkaConsumer consumer2b = new KafkaConsumer<>(newKafkaConsumerProperties(group2)); + consumer2b.subscribe(Collections.singleton(topic)); + int fetchMessages = 0; while (fetchMessages < numMessages) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); @@ -906,28 +915,53 @@ public void testDescribeConsumerGroups() throws Exception { consumer.commitSync(); + consumer2.poll(Duration.ofMillis(1000)); + consumer2.commitSync(); + consumer2b.poll(Duration.ofMillis(1000)); + consumer2b.commitSync(); + final Properties adminProps = new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); AdminClient kafkaAdmin = AdminClient.create(adminProps); - ConsumerGroupDescription groupDescription = - kafkaAdmin.describeConsumerGroups(Collections.singletonList(group)) - .all().get().get(group); + Map consumerGroupDescriptionMap = + kafkaAdmin.describeConsumerGroups(Arrays.asList(group1, group2)) + .all().get(); + ConsumerGroupDescription groupDescription = consumerGroupDescriptionMap.get(group1); assertEquals(1, groupDescription.members().size()); // member assignment topic name must be short topic name groupDescription.members().forEach(memberDescription -> memberDescription.assignment().topicPartitions() .forEach(topicPartition -> assertEquals(topic, topicPartition.topic()))); - Map offsetAndMetadataMap = - kafkaAdmin.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(); - assertEquals(1, offsetAndMetadataMap.size()); + ConsumerGroupDescription group2Description = consumerGroupDescriptionMap.get(group2); + assertEquals(2, group2Description.members().size()); + + // KIP-709 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258 + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = kafkaAdmin.listConsumerGroupOffsets(Map.of( + group1, new ListConsumerGroupOffsetsSpec().topicPartitions(null), + group2, new ListConsumerGroupOffsetsSpec().topicPartitions(null)) + ); + Map offsetAndMetadataMap2Group1 = + listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata(group1).get(); + assertEquals(1, offsetAndMetadataMap2Group1.size()); + + Map offsetAndMetadataMap2Group2 = + listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata(group2).get(); + assertEquals(1, offsetAndMetadataMap2Group2.size()); + + Map offsetAndMetadataMapGroup1 = + kafkaAdmin.listConsumerGroupOffsets(group1).partitionsToOffsetAndMetadata().get(); + assertEquals(1, offsetAndMetadataMapGroup1.size()); // topic name from offset fetch response must be short topic name - offsetAndMetadataMap.keySet().forEach(topicPartition -> assertEquals(topic, topicPartition.topic())); + offsetAndMetadataMapGroup1.keySet().forEach(topicPartition -> assertEquals(topic, topicPartition.topic())); + consumer.close(); + consumer2.close(); + consumer2b.close(); kafkaAdmin.close(); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java index 52323140e0..3fc987a55c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java @@ -335,9 +335,26 @@ public void testHandleProduceRequest() throws ExecutionException, InterruptedExc final ProduceResponse response = (ProduceResponse) responseFuture.get(); //Topic: "topic2" authorize success. Error is not TOPIC_AUTHORIZATION_FAILED - assertEquals(response.responses().get(topicPartition2).error, Errors.NOT_LEADER_OR_FOLLOWER); + assertEquals(response + .data() + .responses() + .stream() + .filter(t -> t.name().equals(topicPartition2.topic())) + .flatMap(t->t.partitionResponses().stream()) + .filter(p -> p.index() == topicPartition2.partition()) + .findFirst() + .get().errorCode(), Errors.NOT_LEADER_OR_FOLLOWER.code()); + //Topic: `TOPIC` authorize failed. - assertEquals(response.responses().get(topicPartition1).error, Errors.TOPIC_AUTHORIZATION_FAILED); + assertEquals(response + .data() + .responses() + .stream() + .filter(t -> t.name().equals(topicPartition1.topic())) + .flatMap(t->t.partitionResponses().stream()) + .filter(p -> p.index() == topicPartition1.partition()) + .findFirst() + .get().errorCode(), Errors.TOPIC_AUTHORIZATION_FAILED.code()); } @Test(timeOut = 20000) @@ -378,7 +395,7 @@ public void testHandleListOffsetRequestAuthorizationSuccess() throws Exception { // Test for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(tp, ListOffsetsRequest.EARLIEST_TIMESTAMP)); @@ -406,7 +423,7 @@ public void testHandleListOffsetRequestAuthorizationFailed() throws Exception { TopicPartition tp = new TopicPartition(topicName, 0); ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(tp, ListOffsetsRequest.EARLIEST_TIMESTAMP)); @@ -422,12 +439,19 @@ public void testHandleListOffsetRequestAuthorizationFailed() throws Exception { } - @Test(timeOut = 20000) - public void testHandleOffsetFetchRequestAuthorizationSuccess() + @DataProvider(name = "offsetFetchVersions") + public static Object[][] offsetFetchVersions() { + return new Object[][]{ + { (short) 7 }, + { (short) ApiKeys.OFFSET_FETCH.latestVersion() } }; + } + + @Test(timeOut = 20000, dataProvider = "offsetFetchVersions") + public void testHandleOffsetFetchRequestAuthorizationSuccess(short version) throws PulsarAdminException, ExecutionException, InterruptedException { KafkaRequestHandler spyHandler = spy(handler); String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" - + "testHandleOffsetFetchRequestAuthorizationSuccess"; + + "testHandleOffsetFetchRequestAuthorizationSuccess_" + version; String groupId = "DemoKafkaOnPulsarConsumer"; // create partitioned topic. @@ -444,7 +468,8 @@ public void testHandleOffsetFetchRequestAuthorizationSuccess() new OffsetFetchRequest.Builder(groupId, false, Collections.singletonList(tp), false); - KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder, version); + assertEquals(version, request.getRequest().version()); CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleOffsetFetchRequest(request, responseFuture); @@ -453,18 +478,35 @@ public void testHandleOffsetFetchRequestAuthorizationSuccess() assertTrue(response instanceof OffsetFetchResponse); OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; - assertEquals(offsetFetchResponse.responseData().size(), 1); - assertEquals(offsetFetchResponse.error(), Errors.NONE); - offsetFetchResponse.responseData() - .forEach((topicPartition, partitionData) -> assertEquals(partitionData.error, Errors.NONE)); + + if (request.getRequest().version() >= 8) { + assertEquals(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .count(), 1); + assertTrue(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.NONE.code())); + } else { + assertEquals(offsetFetchResponse.data() + .topics() + .stream().flatMap(t->t.partitions().stream()) + .count(), 1); + assertEquals(offsetFetchResponse.error(), Errors.NONE); + offsetFetchResponse.data().topics().stream().flatMap(t->t.partitions().stream()) + .forEach((partitionData) -> assertEquals(partitionData.errorCode(), Errors.NONE.code())); + } } - @Test(timeOut = 20000) - public void testHandleOffsetFetchRequestAuthorizationFailed() + @Test(timeOut = 20000, dataProvider = "offsetFetchVersions") + public void testHandleOffsetFetchRequestAuthorizationFailed(short version) throws PulsarAdminException, ExecutionException, InterruptedException { KafkaRequestHandler spyHandler = spy(handler); String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" - + "testHandleOffsetFetchRequestAuthorizationFailed"; + + "testHandleOffsetFetchRequestAuthorizationFailed_" + version; String groupId = "DemoKafkaOnPulsarConsumer"; // create partitioned topic. @@ -474,7 +516,8 @@ public void testHandleOffsetFetchRequestAuthorizationFailed() OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder(groupId, false, Collections.singletonList(tp), false); - KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder, version); + assertEquals(request.getRequest().version(), version); CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleOffsetFetchRequest(request, responseFuture); @@ -483,10 +526,27 @@ public void testHandleOffsetFetchRequestAuthorizationFailed() assertTrue(response instanceof OffsetFetchResponse); OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; - assertEquals(offsetFetchResponse.responseData().size(), 1); - assertEquals(offsetFetchResponse.error(), Errors.NONE); - offsetFetchResponse.responseData().forEach((topicPartition, partitionData) -> assertEquals(partitionData.error, - Errors.TOPIC_AUTHORIZATION_FAILED)); + + if (request.getRequest().version() >= 8) { + assertEquals(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .count(), 1); + assertTrue(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.TOPIC_AUTHORIZATION_FAILED.code())); + } else { + assertTrue(offsetFetchResponse.data() + .topics() + .stream().flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.TOPIC_AUTHORIZATION_FAILED.code())); + + assertEquals(offsetFetchResponse.error(), Errors.NONE); + } + } @Test(timeOut = 20000) @@ -580,7 +640,7 @@ public void testHandleTxnOffsetCommitAuthorizationFailed() throws ExecutionExcep offsetData.put(topicPartition, KafkaCommonTestUtils.newTxnOffsetCommitRequestCommittedOffset(1L, "")); TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder( - "1", group, 1, (short) 1, offsetData, false); + "1", group, 1L, (short) 1, offsetData); KafkaCommandDecoder.KafkaHeaderAndRequest headerAndRequest = buildRequest(builder); // Handle request @@ -612,7 +672,7 @@ public void testHandleTxnOffsetCommitPartAuthorizationFailed() throws ExecutionE TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder( - "1", group, 1, (short) 1, offsetData, false); + "1", group, 1L, (short) 1, offsetData); KafkaCommandDecoder.KafkaHeaderAndRequest headerAndRequest = buildRequest(builder); // Topic: `test1` authorize success. @@ -809,6 +869,10 @@ private KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.B return KafkaCommonTestUtils.buildRequest(builder, serviceAddress); } + private KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, short version) { + return KafkaCommonTestUtils.buildRequest(builder, serviceAddress, version); + } + private void handleGroupImmigration() { GroupCoordinator groupCoordinator = handler.getGroupCoordinator(); for (int i = 0; i < conf.getOffsetsTopicNumPartitions(); i++) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index caae0acab0..53a3ff3765 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -210,7 +210,8 @@ protected void resetConfig() { // kafka related settings. kafkaConfig.setOffsetsTopicNumPartitions(1); - kafkaConfig.setKafkaTransactionCoordinatorEnabled(false); + // kafka 3.1.x clients init the producerId by default, so we need to enable it. + kafkaConfig.setKafkaTransactionCoordinatorEnabled(true); kafkaConfig.setKafkaTxnLogTopicNumPartitions(1); kafkaConfig.setKafkaListeners( @@ -341,9 +342,11 @@ protected final void internalSetup(boolean startBroker) throws Exception { createAdmin(); createClient(); MetadataUtils.createOffsetMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); + if (conf.isKafkaTransactionCoordinatorEnabled()) { MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); } + // we don't want user topic to use compaction admin.namespaces().removeCompactionThreshold(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java index d231be51b8..1ecad7dbd7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java @@ -125,7 +125,7 @@ public void testMetricsProvider() throws Exception { } Assert.assertEquals(getApiKeysSet(), new TreeSet<>( - Arrays.asList(ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE))); + Arrays.asList(ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.INIT_PRODUCER_ID))); // 2. consume messages with Kafka consumer @Cleanup @@ -152,14 +152,14 @@ public void testMetricsProvider() throws Exception { Assert.assertEquals(getApiKeysSet(), new TreeSet<>(Arrays.asList( ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.FIND_COORDINATOR, ApiKeys.LIST_OFFSETS, - ApiKeys.OFFSET_FETCH, ApiKeys.FETCH + ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.INIT_PRODUCER_ID ))); // commit offsets kConsumer.getConsumer().commitSync(Duration.ofSeconds(5)); Assert.assertEquals(getApiKeysSet(), new TreeSet<>(Arrays.asList( ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.FIND_COORDINATOR, ApiKeys.LIST_OFFSETS, - ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.OFFSET_COMMIT + ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.OFFSET_COMMIT, ApiKeys.INIT_PRODUCER_ID ))); try { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java index df23f08d8b..14d150c7f6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java @@ -63,8 +63,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -867,12 +867,12 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except partitionLog.awaitInitialisation().get(); assertEquals(0, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); - List abortedIndexList = + List abortedIndexList = partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE); abortedIndexList.forEach(tx -> { log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); producer.beginTransaction(); String lastMessage = "msg1b"; @@ -907,7 +907,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except abortedIndexList.forEach(tx -> { log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); assertEquals(1, abortedIndexList.size()); waitForTransactionsToBeInStableState(transactionalId); @@ -946,7 +946,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except }); assertEquals(1, abortedIndexList.size()); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); producer.beginTransaction(); producer.send(new ProducerRecord<>(topicName, 0, "msg4")).get(); // OFFSET 8 @@ -973,8 +973,8 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(11, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(11, abortedIndexList.get(1).firstOffset()); assertEquals(2, abortedIndexList.size()); producer.beginTransaction(); @@ -999,8 +999,8 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(11, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(11, abortedIndexList.get(1).firstOffset()); assertEquals(2, abortedIndexList.size()); @@ -1015,7 +1015,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); assertEquals(1, abortedIndexList.size()); - assertEquals(11, abortedIndexList.get(0).firstOffset); + assertEquals(11, abortedIndexList.get(0).firstOffset()); // use a new consumer group, it will read from the beginning of the topic assertEquals( @@ -1448,12 +1448,12 @@ public void testAbortedTxEventuallyPurged() throws Exception { .getPartitionLog(topicPartition, namespacePrefix); partitionLog.awaitInitialisation().get(); - List abortedIndexList = + List abortedIndexList = partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE); assertEquals(2, abortedIndexList.size()); assertEquals(2, abortedIndexList.size()); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(3, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(3, abortedIndexList.get(1).firstOffset()); takeSnapshot(topicName); @@ -1478,8 +1478,8 @@ public void testAbortedTxEventuallyPurged() throws Exception { abortedIndexList = partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE); assertEquals(2, abortedIndexList.size()); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(3, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(3, abortedIndexList.get(1).firstOffset()); // force reading the minimum valid offset // the timer is not started by the PH because @@ -1498,7 +1498,7 @@ public void testAbortedTxEventuallyPurged() throws Exception { assertEquals(1, abortedIndexList.size()); // the second TX cannot be purged because the lastOffset is 5, that is the boundary of the // trimmed portion of the topic - assertEquals(3, abortedIndexList.get(0).firstOffset); + assertEquals(3, abortedIndexList.get(0).firstOffset()); producer1.close(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java index 8fe9c77839..9f792019a0 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java @@ -106,6 +106,8 @@ private static TransactionMetadata transactionMetadata(String transactionalId, @BeforeClass @Override protected void setup() throws Exception { + // we need to disable the kafka transaction coordinator to avoid the conflict + this.conf.setKafkaTransactionCoordinatorEnabled(false); this.conf.setKafkaTxnLogTopicNumPartitions(numPartitions); internalSetup(); MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java index af5869529f..e8ac8ee348 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; @@ -109,7 +110,8 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); @@ -143,7 +145,9 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters + .fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); @@ -173,13 +177,15 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started ReadOnlyKeyValueStore store = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); assertEquals(store.approximateNumEntries(), 4L); kafkaStreams.close(); startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started - store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + store = kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); assertEquals(store.approximateNumEntries(), 4L); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java index 83c7c99d30..5e2c8fd609 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java @@ -48,9 +48,11 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; @@ -58,7 +60,6 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; @@ -115,7 +116,7 @@ protected void extraSetup() throws Exception { groupedStream = stream .groupBy( mapper, - Serialized.with(Serdes.String(), Serdes.String())); + Grouped.with(Serdes.String(), Serdes.String())); reducer = (value1, value2) -> value1 + ":" + value2; initializer = () -> 0; @@ -174,7 +175,7 @@ public void shouldReduceWindowed() throws Exception { final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); groupedStream - .windowedBy(TimeWindows.of(500L)) + .windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .reduce(reducer) .toStream() .to(outputTopic, Produced.with(windowedSerde, Serdes.String())); @@ -279,7 +280,7 @@ public void shouldAggregateWindowed() throws Exception { produceMessages(secondTimestamp); final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); - groupedStream.windowedBy(TimeWindows.of(500L)) + groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .aggregate( initializer, aggregator, @@ -415,8 +416,8 @@ public void shouldGroupByKey() throws Exception { produceMessages(timestamp); produceMessages(timestamp); - stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String())) - .windowedBy(TimeWindows.of(500L)) + stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) + .windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .count() .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); @@ -510,8 +511,9 @@ public void shouldCountSessionWindows() throws Exception { final CountDownLatch latch = new CountDownLatch(11); builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(sessionGap), + Duration.ofMillis(maintainMillis))) .count() .toStream() .transform(() -> new Transformer, Long, KeyValue>() { @@ -609,8 +611,9 @@ public void shouldReduceSessionWindows() throws Exception { final CountDownLatch latch = new CountDownLatch(11); final String userSessionsStore = "UserSessionsStore"; builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(sessionGap), + Duration.ofMillis(maintainMillis))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore)) .toStream() .foreach((key, value) -> { @@ -621,7 +624,8 @@ public void shouldReduceSessionWindows() throws Exception { startStreams(); latch.await(30, TimeUnit.SECONDS); final ReadOnlySessionStore sessionStore = - kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(userSessionsStore, QueryableStoreTypes.sessionStore())); // verify correct data received assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start")); @@ -691,6 +695,9 @@ private List> receiveMessages(final Deserializer keyDes consumerProperties.setProperty(StreamsConfig.WINDOW_SIZE_MS_CONFIG, Long.MAX_VALUE + ""); if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) { + consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, + Serdes.serdeFrom(innerClass).getClass().getName()); + consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()); } @@ -741,6 +748,8 @@ private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial final Map configs = new HashMap<>(); Serde serde = Serdes.serdeFrom(innerClass); configs.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, serde.getClass().getName()); + configs.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, serde.getClass().getName()); + serde.close(); // https://issues.apache.org/jira/browse/KAFKA-10366 configs.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, Long.toString(Long.MAX_VALUE)); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java index b27b455ee8..73197e75f1 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; @@ -115,14 +116,16 @@ public void shouldRestoreInMemoryKTableOnRestart() throws Exception { startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started final ReadOnlyKeyValueStore store = - kafkaStreams.store(this.store, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(this.store, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> store.approximateNumEntries() == 4L, 30000L, "waiting for values"); kafkaStreams.close(); startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started final ReadOnlyKeyValueStore recoveredStore = - kafkaStreams.store(this.store, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(this.store, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> { try { return recoveredStore.approximateNumEntries() == 4L; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java index 88a0ad7db7..41c676eb28 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java @@ -15,8 +15,8 @@ import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTime; +import java.time.Duration; import java.util.Properties; -import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.NonNull; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -83,7 +83,7 @@ protected void setupTestCase() throws Exception { @AfterMethod protected void cleanupTestCase() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(3, TimeUnit.SECONDS); + kafkaStreams.close(Duration.ofSeconds(3)); TestUtils.purgeLocalStreamsState(streamsConfiguration); } } diff --git a/tests/src/test/resources/log4j2.xml b/tests/src/test/resources/log4j2.xml index eac6556f2a..37ebaf7ffe 100644 --- a/tests/src/test/resources/log4j2.xml +++ b/tests/src/test/resources/log4j2.xml @@ -43,6 +43,6 @@ - +