diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index cd7198f063..eb91fa2881 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -67,7 +67,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -783,18 +782,19 @@ private void completeSendOperationForThrottling(long msgSize) { } } - private void publishMessages(final PersistentTopic persistentTopic, + private void publishMessages(final Optional persistentTopicOpt, final ByteBuf byteBuf, final int numMessages, final MemoryRecords records, final TopicPartition topicPartition, final Consumer offsetConsumer, final Consumer errorsConsumer) { - if (persistentTopic == null) { + if (!persistentTopicOpt.isPresent()) { // It will trigger a retry send of Kafka client errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION); return; } + PersistentTopic persistentTopic = persistentTopicOpt.get(); if (persistentTopic.isSystemTopic()) { log.error("Not support producing message to system topic: {}", persistentTopic); errorsConsumer.accept(Errors.INVALID_TOPIC_EXCEPTION); @@ -881,63 +881,30 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, errors -> addPartitionResponse.accept(topicPartition, new PartitionResponse(errors)); final Consumer exceptionConsumer = e -> addPartitionResponse.accept(topicPartition, new PartitionResponse(Errors.forException(e))); - final String fullPartitionName = KopTopic.toString(topicPartition); - // check KOP inner topic - if (isOffsetTopic(fullPartitionName) || isTransactionTopic(fullPartitionName)) { - log.error("[{}] Request {}: not support produce message to inner topic. topic: {}", - ctx.channel(), produceHar.getHeader(), topicPartition); - errorsConsumer.accept(Errors.INVALID_TOPIC_EXCEPTION); - return; - } - - try { - final long beforeRecordsProcess = MathUtils.nowInNano(); - final MemoryRecords validRecords = - validateRecords(produceHar.getHeader().apiVersion(), topicPartition, records); - final int numMessages = EntryFormatter.parseNumMessages(validRecords); - final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages); - requestStats.getProduceEncodeStats().registerSuccessfulEvent( - MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS); - startSendOperationForThrottling(byteBuf.readableBytes()); - - if (log.isDebugEnabled()) { - log.debug("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} ", - ctx.channel(), produceHar.getHeader(), - topicPartition.topic(), topicPartition.partition(), numPartitions); - } + authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Write topic authorize failed, topic - {}. {}", + fullPartitionName, ex.getMessage()); + errorsConsumer.accept(Errors.TOPIC_AUTHORIZATION_FAILED); + return; + } + if (!isAuthorized) { + errorsConsumer.accept(Errors.TOPIC_AUTHORIZATION_FAILED); + return; + } - final CompletableFuture topicFuture = topicManager.getTopic(fullPartitionName); - if (topicFuture.isCompletedExceptionally()) { - topicFuture.exceptionally(e -> { - exceptionConsumer.accept(e); - return null; + handlePartitionRecords(produceHar, + topicPartition, + records, + numPartitions, + fullPartitionName, + offsetConsumer, + errorsConsumer, + exceptionConsumer); }); - return; - } - if (topicFuture.isDone() && topicFuture.getNow(null) == null) { - errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION); - return; - } - - final Consumer persistentTopicConsumer = persistentTopic -> { - publishMessages(persistentTopic, byteBuf, numMessages, validRecords, topicPartition, - offsetConsumer, errorsConsumer); - }; - - if (topicFuture.isDone()) { - persistentTopicConsumer.accept(topicFuture.getNow(null)); - } else { - // topic is not available now - pendingTopicFuturesMap - .computeIfAbsent(topicPartition, ignored -> new PendingTopicFutures(requestStats)) - .addListener(topicFuture, persistentTopicConsumer, exceptionConsumer); - } - } catch (Exception e) { - log.error("[{}] Failed to handle produce request for {}", ctx.channel(), topicPartition, e); - exceptionConsumer.accept(e); - } }); // delay produce if (timeoutMs <= 0) { @@ -951,6 +918,73 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, } } + private void handlePartitionRecords(final KafkaHeaderAndRequest produceHar, + final TopicPartition topicPartition, + final MemoryRecords records, + final int numPartitions, + final String fullPartitionName, + final Consumer offsetConsumer, + final Consumer errorsConsumer, + final Consumer exceptionConsumer) { + // check KOP inner topic + if (isOffsetTopic(fullPartitionName) || isTransactionTopic(fullPartitionName)) { + log.error("[{}] Request {}: not support produce message to inner topic. topic: {}", + ctx.channel(), produceHar.getHeader(), topicPartition); + errorsConsumer.accept(Errors.INVALID_TOPIC_EXCEPTION); + return; + } + + try { + final long beforeRecordsProcess = MathUtils.nowInNano(); + final MemoryRecords validRecords = + validateRecords(produceHar.getHeader().apiVersion(), topicPartition, records); + final int numMessages = EntryFormatter.parseNumMessages(validRecords); + final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages); + requestStats.getProduceEncodeStats().registerSuccessfulEvent( + MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS); + startSendOperationForThrottling(byteBuf.readableBytes()); + + if (log.isDebugEnabled()) { + log.debug("[{}] Request {}: Produce messages for topic {} partition {}, " + + "request size: {} ", ctx.channel(), produceHar.getHeader(), + topicPartition.topic(), topicPartition.partition(), numPartitions); + } + + final CompletableFuture> topicFuture = + topicManager.getTopic(fullPartitionName); + if (topicFuture.isCompletedExceptionally()) { + topicFuture.exceptionally(e -> { + exceptionConsumer.accept(e); + return Optional.empty(); + }); + return; + } + if (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent()) { + errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION); + return; + } + + final Consumer> persistentTopicConsumer = persistentTopicOpt -> { + publishMessages(persistentTopicOpt, byteBuf, numMessages, validRecords, topicPartition, + offsetConsumer, errorsConsumer); + }; + + if (topicFuture.isDone()) { + persistentTopicConsumer.accept(topicFuture.getNow(Optional.empty())); + } else { + // topic is not available now + pendingTopicFuturesMap + .computeIfAbsent(topicPartition, ignored -> + new PendingTopicFutures(requestStats)) + .addListener(topicFuture, persistentTopicConsumer, exceptionConsumer); + } + } catch (Exception e) { + log.error("[{}] Failed to handle produce request for {}", + ctx.channel(), topicPartition, e); + exceptionConsumer.accept(e); + } + } + protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator, CompletableFuture resultFuture) { checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest); @@ -1021,72 +1055,106 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, checkState(groupCoordinator != null, "Group Coordinator not started"); - Map unknownPartitions = new HashMap<>(); + CompletableFuture> authorizeFuture = new CompletableFuture<>(); // replace Map replacingIndex = new HashMap<>(); - KeyValue> keyValue = - groupCoordinator.handleFetchOffsets( - request.groupId(), - request.partitions() == null ? Optional.ofNullable(request.partitions()) : Optional.of( - request.partitions() - .stream() - .map(tp -> { - try { - TopicPartition newTopicPartition = new TopicPartition( - new KopTopic(tp.topic()).getFullName(), tp.partition()); - replacingIndex.put(newTopicPartition, tp); - return newTopicPartition; - } catch (KoPTopicException e) { - log.warn("Invalid topic name: {}", tp.topic(), e); - unknownPartitions.put(tp, OffsetFetchResponse.UNKNOWN_PARTITION); - return null; - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toList())) - ); - if (log.isDebugEnabled()) { - log.debug("OFFSET_FETCH Unknown partitions: {}", unknownPartitions); - } + List authorizedPartitions = new ArrayList<>(); + Map unauthorizedPartitionData = + Maps.newConcurrentMap(); + Map unknownPartitionData = + Maps.newConcurrentMap(); - if (log.isTraceEnabled()) { - StringBuffer traceInfo = new StringBuffer(); - replacingIndex.forEach((inner, outer) -> - traceInfo.append(String.format("\tinnerName:%s, outerName:%s%n", inner, outer))); - log.trace("OFFSET_FETCH TopicPartition relations: \n{}", traceInfo.toString()); + if (request.partitions() == null || request.partitions().isEmpty()) { + authorizeFuture.complete(null); + } else { + AtomicInteger partitionCount = new AtomicInteger(request.partitions().size()); + + Runnable completeOneAuthorization = () -> { + if (partitionCount.decrementAndGet() == 0) { + authorizeFuture.complete(authorizedPartitions); + } + }; + request.partitions().forEach(tp -> { + try { + String fullName = new KopTopic(tp.topic()).getFullName(); + authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Describe topic authorize failed, topic - {}. {}", + fullName, ex.getMessage()); + unauthorizedPartitionData.put(tp, OffsetFetchResponse.UNAUTHORIZED_PARTITION); + completeOneAuthorization.run(); + return; + } + if (!isAuthorized) { + unauthorizedPartitionData.put(tp, OffsetFetchResponse.UNAUTHORIZED_PARTITION); + completeOneAuthorization.run(); + return; + } + TopicPartition newTopicPartition = new TopicPartition( + fullName, tp.partition()); + replacingIndex.put(newTopicPartition, tp); + authorizedPartitions.add(newTopicPartition); + completeOneAuthorization.run(); + }); + } catch (KoPTopicException e) { + log.warn("Invalid topic name: {}", tp.topic(), e); + unknownPartitionData.put(tp, OffsetFetchResponse.UNKNOWN_PARTITION); + } + }); } - // recover to original topic name - replaceTopicPartition(keyValue.getValue(), replacingIndex); - keyValue.getValue().putAll(unknownPartitions); + authorizeFuture.whenComplete((partitionList, ex) -> { + KeyValue> keyValue = + groupCoordinator.handleFetchOffsets( + request.groupId(), + Optional.ofNullable(partitionList) + ); + if (log.isDebugEnabled()) { + log.debug("OFFSET_FETCH Unknown partitions: {}, Unauthorized partitions: {}.", + unknownPartitionData, unauthorizedPartitionData); + } + + if (log.isTraceEnabled()) { + StringBuffer traceInfo = new StringBuffer(); + replacingIndex.forEach((inner, outer) -> + traceInfo.append(String.format("\tinnerName:%s, outerName:%s%n", inner, outer))); + log.trace("OFFSET_FETCH TopicPartition relations: \n{}", traceInfo); + } + + // recover to original topic name + replaceTopicPartition(keyValue.getValue(), replacingIndex); + keyValue.getValue().putAll(unauthorizedPartitionData); + keyValue.getValue().putAll(unknownPartitionData); - resultFuture.complete(new OffsetFetchResponse(keyValue.getKey(), keyValue.getValue())); + resultFuture.complete(new OffsetFetchResponse(keyValue.getKey(), keyValue.getValue())); + }); } private CompletableFuture fetchOffsetForTimestamp(String topicName, Long timestamp, boolean legacyMode) { CompletableFuture partitionData = new CompletableFuture<>(); - topicManager.getTopic(topicName).whenComplete((perTopic, t) -> { + topicManager.getTopic(topicName).whenComplete((perTopicOpt, t) -> { if (t != null) { log.error("Failed while get persistentTopic topic: {} ts: {}. ", - perTopic == null ? "null" : perTopic.getName(), timestamp, t); + !perTopicOpt.isPresent() ? "null" : perTopicOpt.get().getName(), timestamp, t); partitionData.complete(new ListOffsetResponse.PartitionData( Errors.forException(t), ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)); return; } - if (perTopic == null) { + if (!perTopicOpt.isPresent()) { partitionData.complete(new ListOffsetResponse.PartitionData( Errors.UNKNOWN_TOPIC_OR_PARTITION, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)); return; } - + PersistentTopic perTopic = perTopicOpt.get(); ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) perTopic.getManagedLedger(); PositionImpl lac = (PositionImpl) managedLedger.getLastConfirmedEntry(); if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) { @@ -1212,25 +1280,43 @@ private void fetchOffsetForTimestampSuccess(CompletableFuture resultFuture) { ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); + Map> responseData = + Maps.newConcurrentMap(); - Map> responseData = Maps.newHashMap(); - - request.partitionTimestamps().entrySet().stream().forEach(tms -> { - TopicPartition topic = tms.getKey(); - Long times = tms.getValue(); - CompletableFuture partitionData; - - partitionData = fetchOffsetForTimestamp(KopTopic.toString(topic), times, false); + request.partitionTimestamps().forEach((topic, times) -> { + String fullPartitionName = KopTopic.toString(topic); + authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullPartitionName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Describe topic authorize failed, topic - {}. {}", + fullPartitionName, ex.getMessage()); + responseData.put(topic, CompletableFuture.completedFuture(new ListOffsetResponse + .PartitionData( + Errors.TOPIC_AUTHORIZATION_FAILED, + Collections.emptyList() + ))); + return; + } + if (!isAuthorized) { + responseData.put(topic, CompletableFuture.completedFuture(new ListOffsetResponse + .PartitionData( + Errors.TOPIC_AUTHORIZATION_FAILED, + Collections.emptyList() + ))); + return; + } + responseData.put(topic, + fetchOffsetForTimestamp(fullPartitionName, times, false)); + } + ); - responseData.put(topic, partitionData); }); CompletableFuture - .allOf(responseData.values().stream().toArray(CompletableFuture[]::new)) + .allOf(responseData.values().toArray(new CompletableFuture[0])) .whenComplete((ignore, ex) -> { ListOffsetResponse response = - new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join())); - + new ListOffsetResponse(CoreUtils.mapValue(responseData, CompletableFuture::join)); resultFuture.complete(response); }); } @@ -1241,39 +1327,60 @@ private void handleListOffsetRequestV0(KafkaHeaderAndRequest listOffset, CompletableFuture resultFuture) { ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest(); - Map> responseData = Maps.newHashMap(); + Map> responseData = + Maps.newConcurrentMap(); // in v0, the iterator is offsetData, // in v1, the iterator is partitionTimestamps, if (log.isDebugEnabled()) { log.debug("received a v0 listOffset: {}", request.toString(true)); } - request.offsetData().entrySet().stream().forEach(tms -> { - TopicPartition topic = tms.getKey(); + request.offsetData().forEach((topic, value) -> { String fullPartitionName = KopTopic.toString(topic); - Long times = tms.getValue().timestamp; - CompletableFuture partitionData; - - // num_num_offsets > 1 is not handled for now, returning an error - if (tms.getValue().maxNumOffsets > 1) { - log.warn("request is asking for multiples offsets for {}, not supported for now", fullPartitionName); - partitionData = new CompletableFuture<>(); - partitionData.complete(new ListOffsetResponse - .PartitionData( - Errors.UNKNOWN_SERVER_ERROR, - Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET))); - } - partitionData = fetchOffsetForTimestamp(fullPartitionName, times, true); - responseData.put(topic, partitionData); + authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullPartitionName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Describe topic authorize failed, topic - {}. {}", + fullPartitionName, ex.getMessage()); + responseData.put(topic, CompletableFuture.completedFuture(new ListOffsetResponse + .PartitionData( + Errors.TOPIC_AUTHORIZATION_FAILED, + Collections.emptyList()))); + return; + } + if (!isAuthorized) { + responseData.put(topic, CompletableFuture.completedFuture(new ListOffsetResponse + .PartitionData( + Errors.TOPIC_AUTHORIZATION_FAILED, + Collections.emptyList()))); + return; + } + Long times = value.timestamp; + + CompletableFuture partitionData; + // num_num_offsets > 1 is not handled for now, returning an error + if (value.maxNumOffsets > 1) { + log.warn("request is asking for multiples offsets for {}, not supported for now", + fullPartitionName); + partitionData = new CompletableFuture<>(); + partitionData.complete(new ListOffsetResponse + .PartitionData( + Errors.UNKNOWN_SERVER_ERROR, + Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET))); + } + + partitionData = fetchOffsetForTimestamp(fullPartitionName, times, true); + responseData.put(topic, partitionData); + }); + }); CompletableFuture - .allOf(responseData.values().stream().toArray(CompletableFuture[]::new)) + .allOf(responseData.values().toArray(new CompletableFuture[0])) .whenComplete((ignore, ex) -> { ListOffsetResponse response = - new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join())); - + new ListOffsetResponse(CoreUtils.mapValue(responseData, CompletableFuture::join)); resultFuture.complete(response); }); } @@ -1922,15 +2029,16 @@ private CompletableFuture writeTxnMarker(TopicPartition topicPartition, String fullPartitionName = KopTopic.toString(topicPartition); TopicName topicName = TopicName.get(fullPartitionName); topicManager.getTopic(topicName.toString()) - .whenComplete((persistentTopic, throwable) -> { + .whenComplete((persistentTopicOpt, throwable) -> { if (throwable != null) { offsetFuture.completeExceptionally(throwable); return; } - if (persistentTopic == null) { + if (!persistentTopicOpt.isPresent()) { offsetFuture.complete(null); return; } + PersistentTopic persistentTopic = persistentTopicOpt.get(); persistentTopic.publishMessage(generateTxnMarker(transactionResult, producerId, producerEpoch), MessagePublishContext.get(offsetFuture, persistentTopic, 1, SystemTime.SYSTEM.milliseconds())); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index a16eb7ec7e..c2bd35b923 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -52,7 +52,7 @@ public class KafkaTopicManager { // cache for topics: , for removing producer @Getter - private static final ConcurrentHashMap> + private static final ConcurrentHashMap>> topics = new ConcurrentHashMap<>(); // cache for references in PersistentTopic: @Getter @@ -123,12 +123,12 @@ public CompletableFuture getTopicConsumerManager(Stri t -> { final CompletableFuture tcmFuture = new CompletableFuture<>(); getTopic(t).whenComplete((persistentTopic, throwable) -> { - if (persistentTopic != null && throwable == null) { + if (persistentTopic.isPresent() && throwable == null) { if (log.isDebugEnabled()) { log.debug("[{}] Call getTopicConsumerManager for {}, and create TCM for {}.", requestHandler.ctx.channel(), topicName, persistentTopic); } - tcmFuture.complete(new KafkaTopicConsumerManager(requestHandler, persistentTopic)); + tcmFuture.complete(new KafkaTopicConsumerManager(requestHandler, persistentTopic.get())); } else { if (throwable != null) { log.error("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' throws {}", @@ -201,22 +201,22 @@ private CompletableFuture lookupBroker(final String topic) { } // A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance - public CompletableFuture getTopic(String topicName) { + public CompletableFuture> getTopic(String topicName) { if (closed.get()) { if (log.isDebugEnabled()) { log.debug("[{}] Return null for getTopic({}) since channel is closing", requestHandler.ctx.channel(), topicName); } - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(Optional.empty()); } - CompletableFuture topicCompletableFuture = new CompletableFuture<>(); + CompletableFuture> topicCompletableFuture = new CompletableFuture<>(); brokerService.getTopicIfExists(topicName).whenComplete((t2, throwable) -> { if (throwable != null) { // The ServiceUnitNotReadyException is retriable so we should print a warning log instead of error log if (throwable instanceof BrokerServiceException.ServiceUnitNotReadyException) { log.warn("[{}] Failed to getTopic {}: {}", requestHandler.ctx.channel(), topicName, throwable.getMessage()); - topicCompletableFuture.complete(null); + topicCompletableFuture.complete(Optional.empty()); } else { log.error("[{}] Failed to getTopic {}. exception:", requestHandler.ctx.channel(), topicName, throwable); @@ -228,11 +228,11 @@ public CompletableFuture getTopic(String topicName) { } if (t2.isPresent()) { PersistentTopic persistentTopic = (PersistentTopic) t2.get(); - topicCompletableFuture.complete(persistentTopic); + topicCompletableFuture.complete(Optional.of(persistentTopic)); } else { log.error("[{}]Get empty topic for name {}", requestHandler.ctx.channel(), topicName); removeTopicManagerCache(topicName); - topicCompletableFuture.complete(null); + topicCompletableFuture.complete(Optional.empty()); } }); // cache for removing producer @@ -291,7 +291,7 @@ public static Producer getReferenceProducer(String topicName) { private static void removePersistentTopicAndReferenceProducer(final String topicName) { // 1. Remove PersistentTopic and Producer from caches, these calls are thread safe - final CompletableFuture topicFuture = topics.remove(topicName); + final CompletableFuture> topicFuture = topics.remove(topicName); final Producer producer = references.remove(topicName); if (topicFuture == null) { @@ -303,10 +303,10 @@ private static void removePersistentTopicAndReferenceProducer(final String topic try { // It's safe to wait until the future is completed because it's completed when // `BrokerService#getTopicIfExists` completed and it won't block too long. - final PersistentTopic persistentTopic = topicFuture.get(); - if (producer != null && persistentTopic != null) { + final Optional persistentTopic = topicFuture.get(); + if (producer != null && persistentTopic.isPresent()) { try { - persistentTopic.removeProducer(producer); + persistentTopic.get().removeProducer(producer); } catch (IllegalArgumentException ignored) { log.error("[{}] The producer's topic ({}) doesn't match the current PersistentTopic", topicName, (producer.getTopic() == null) ? "null" : producer.getTopic().getName()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index 767683f58a..3561ec65f8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -28,6 +28,8 @@ import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException; import io.streamnative.pulsar.handlers.kop.format.DecodeResult; import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; +import io.streamnative.pulsar.handlers.kop.security.auth.Resource; +import io.streamnative.pulsar.handlers.kop.security.auth.ResourceType; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils; @@ -54,6 +56,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; @@ -241,160 +244,219 @@ public void handleFetch() { fetchRequest.fetchData().forEach((topicPartition, partitionData) -> { final long startPrepareMetadataNanos = MathUtils.nowInNano(); - final long offset = partitionData.fetchOffset; final String fullTopicName = KopTopic.toString(topicPartition); - // the future that is returned by getTopicConsumerManager is always completed normally - topicManager.getTopicConsumerManager(fullTopicName).thenAccept(tcm -> { - if (tcm == null) { - statsLogger.getPrepareMetadataStats().registerFailedEvent( - MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS); - // remove null future cache - KafkaTopicManager.removeKafkaTopicConsumerManager(fullTopicName); - addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION); - return; - } - // handle offset out-of-range exception - ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger(); - long logEndOffset = MessageIdUtils.getLogEndOffset(managedLedger); - // TODO: Offset out-of-range checks are still incomplete - // We only check the case of `offset > logEndOffset` and `offset < LogStartOffset` is currently - // not handled. Because we found that the operation of obtaining `LogStartOffset` requires reading - // from disk, and such a time-consuming operation is likely to harm the performance of FETCH request. - // More discussions please refer to https://github.com/streamnative/kop/pull/531 - if (offset > logEndOffset) { - log.error("Received request for offset {} for partition {}, " - + "but we only have entries less than {}.", - offset, topicPartition, logEndOffset); - addErrorPartitionResponse(topicPartition, Errors.OFFSET_OUT_OF_RANGE); - return; - } + // Do authorization + requestHandler.authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Read topic authorize failed, topic - {}. {}", + fullTopicName, ex.getMessage()); + addErrorPartitionResponse(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED); + return; + } + if (!isAuthorized) { + addErrorPartitionResponse(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED); + return; + } + handlePartitionData(topicPartition, + partitionData, + fullTopicName, + startPrepareMetadataNanos, + readCommitted); + }); + }); + } - if (log.isDebugEnabled()) { - log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} .", - topicPartition, offset); - } + private void handlePartitionData(final TopicPartition topicPartition, + final FetchRequest.PartitionData partitionData, + final String fullTopicName, + final long startPrepareMetadataNanos, + final boolean readCommitted) { + final long offset = partitionData.fetchOffset; + // the future that is returned by getTopicConsumerManager is always completed normally + topicManager.getTopicConsumerManager(fullTopicName).thenAccept(tcm -> { + if (tcm == null) { + statsLogger.getPrepareMetadataStats().registerFailedEvent( + MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS); + // remove null future cache + KafkaTopicManager.removeKafkaTopicConsumerManager(fullTopicName); + addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION); + return; + } + + // handle offset out-of-range exception + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger(); + long logEndOffset = MessageIdUtils.getLogEndOffset(managedLedger); + // TODO: Offset out-of-range checks are still incomplete + // We only check the case of `offset > logEndOffset` and `offset < LogStartOffset` + // is currently not handled. + // Because we found that the operation of obtaining `LogStartOffset` + // requires reading from disk, + // and such a time-consuming operation is likely to harm the performance of FETCH request. + // More discussions please refer to https://github.com/streamnative/kop/pull/531 + if (offset > logEndOffset) { + log.error("Received request for offset {} for partition {}, " + + "but we only have entries less than {}.", + offset, topicPartition, logEndOffset); + addErrorPartitionResponse(topicPartition, Errors.OFFSET_OUT_OF_RANGE); + return; + } + + if (log.isDebugEnabled()) { + log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} .", + topicPartition, offset); + } + + final CompletableFuture> cursorFuture = + tcm.removeCursorFuture(offset); + if (cursorFuture == null) { + // tcm is closed, just return a NONE error because the channel may be still active + log.warn("[{}] KafkaTopicConsumerManager is closed, remove TCM of {}", + requestHandler.ctx, fullTopicName); + KafkaTopicManager.removeKafkaTopicConsumerManager(fullTopicName); + addErrorPartitionResponse(topicPartition, Errors.NONE); + return; + } - final CompletableFuture> cursorFuture = tcm.removeCursorFuture(offset); - if (cursorFuture == null) { - // tcm is closed, just return a NONE error because the channel may be still active - log.warn("[{}] KafkaTopicConsumerManager is closed, remove TCM of {}", - requestHandler.ctx, fullTopicName); - KafkaTopicManager.removeKafkaTopicConsumerManager(fullTopicName); - addErrorPartitionResponse(topicPartition, Errors.NONE); + // cursorFuture is never completed exceptionally because ManagedLedgerImpl#asyncFindPosition + // is never completed exceptionally. + cursorFuture.thenAccept(cursorLongPair -> { + if (cursorLongPair == null) { + log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. " + + "Fetch for topic return error.", + offset, topicPartition); + addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION); return; } - // cursorFuture is never completed exceptionally because ManagedLedgerImpl#asyncFindPosition is never - // completed exceptionally. - cursorFuture.thenAccept(cursorLongPair -> { - if (cursorLongPair == null) { - log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. " - + "Fetch for topic return error.", - offset, topicPartition); - addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION); + final ManagedCursor cursor = cursorLongPair.getLeft(); + final AtomicLong cursorOffset = new AtomicLong(cursorLongPair.getRight()); + + statsLogger.getPrepareMetadataStats().registerSuccessfulEvent( + MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS); + readEntries(cursor, topicPartition, cursorOffset).whenComplete((entries, throwable) -> { + if (throwable != null) { + tcm.deleteOneCursorAsync(cursorLongPair.getLeft(), + "cursor.readEntry fail. deleteCursor"); + addErrorPartitionResponse(topicPartition, Errors.forException(throwable)); + return; + } + if (entries == null) { + addErrorPartitionResponse(topicPartition, + Errors.forException(new ApiException("Cursor is null"))); return; } - final ManagedCursor cursor = cursorLongPair.getLeft(); - final AtomicLong cursorOffset = new AtomicLong(cursorLongPair.getRight()); - final long highWatermark = MessageIdUtils.getHighWatermark( - cursorLongPair.getLeft().getManagedLedger()); - statsLogger.getPrepareMetadataStats().registerSuccessfulEvent( - MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS); - readEntries(cursor, topicPartition, cursorOffset).whenComplete((entries, throwable) -> { - if (throwable != null) { - tcm.deleteOneCursorAsync(cursorLongPair.getLeft(), "cursor.readEntry fail. deleteCursor"); - addErrorPartitionResponse(topicPartition, Errors.forException(throwable)); - return; - } - if (entries == null) { - addErrorPartitionResponse(topicPartition, - Errors.forException(new ApiException("Cursor is null"))); - return; - } + handleEntries( + entries, + topicPartition, + partitionData, + fullTopicName, + tcm, + cursor, + cursorOffset, + readCommitted); + }); + }); + }); + } - // Add new offset back to TCM after entries are read successfully - tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); - - final long lso = (readCommitted - ? tc.getLastStableOffset(TopicName.get(fullTopicName), highWatermark) - : highWatermark); - List committedEntries = entries; - if (readCommitted) { - committedEntries = new ArrayList<>(); - for (Entry entry : entries) { - try { - if (lso >= MessageIdUtils.peekBaseOffsetFromEntry(entry)) { - committedEntries.add(entry); - } else { - break; - } - } catch (KoPMessageMetadataNotFoundException e) { - log.error("[{}:{}] Failed to peek base offset from entry.", - entry.getLedgerId(), entry.getEntryId()); - } - } - if (log.isDebugEnabled()) { - log.debug("Request {}: read {} entries but only {} entries are committed", - header, entries.size(), committedEntries.size()); - } - } else { - if (log.isDebugEnabled()) { - log.debug("Request {}: read {} entries", header, entries.size()); - } - } - if (committedEntries.isEmpty()) { - addErrorPartitionResponse(topicPartition, Errors.NONE); - return; - } + private void handleEntries(final List entries, + final TopicPartition topicPartition, + final FetchRequest.PartitionData partitionData, + final String fullTopicName, + final KafkaTopicConsumerManager tcm, + final ManagedCursor cursor, + final AtomicLong cursorOffset, + final boolean readCommitted) { + final long highWatermark = MessageIdUtils.getHighWatermark(cursor.getManagedLedger()); + // Add new offset back to TCM after entries are read successfully + tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); + + final long lso = (readCommitted + ? tc.getLastStableOffset(TopicName.get(fullTopicName), highWatermark) : highWatermark); + List committedEntries = entries; + if (readCommitted) { + committedEntries = getCommittedEntries(entries, lso); + if (log.isDebugEnabled()) { + log.debug("Request {}: read {} entries but only {} entries are committed", + header, entries.size(), committedEntries.size()); + } + } else { + if (log.isDebugEnabled()) { + log.debug("Request {}: read {} entries", header, entries.size()); + } + } + if (committedEntries.isEmpty()) { + addErrorPartitionResponse(topicPartition, Errors.NONE); + return; + } - // use compatible magic value by apiVersion - short apiVersion = header.apiVersion(); - byte magic = RecordBatch.CURRENT_MAGIC_VALUE; - if (apiVersion <= 1) { - magic = RecordBatch.MAGIC_VALUE_V0; - } else if (apiVersion <= 3) { - magic = RecordBatch.MAGIC_VALUE_V1; - } + // use compatible magic value by apiVersion + short apiVersion = header.apiVersion(); + byte magic = RecordBatch.CURRENT_MAGIC_VALUE; + if (apiVersion <= 1) { + magic = RecordBatch.MAGIC_VALUE_V0; + } else if (apiVersion <= 3) { + magic = RecordBatch.MAGIC_VALUE_V1; + } - // get group and consumer - final String groupName = requestHandler - .getCurrentConnectedGroup().computeIfAbsent(clientHost, ignored -> { - String zkSubPath = ZooKeeperUtils.groupIdPathFormat(clientHost, - header.clientId()); - String groupId = ZooKeeperUtils.getData( - requestHandler.getPulsarService().getZkClient(), - requestHandler.getGroupIdStoredPath(), - zkSubPath); - log.info("get group name from zk for current connection:{} groupId:{}", - clientHost, groupId); - return groupId; - }); - final long startDecodingEntriesNanos = MathUtils.nowInNano(); - final DecodeResult decodeResult = requestHandler.getEntryFormatter().decode(entries, magic); - requestHandler.requestStats.getFetchDecodeStats().registerSuccessfulEvent( - MathUtils.elapsedNanos(startDecodingEntriesNanos), TimeUnit.NANOSECONDS); - decodeResults.add(decodeResult); - - // collect consumer metrics - updateConsumerStats(topicPartition, decodeResult.getRecords(), entries.size(), groupName); - - final List abortedTransactions = - (readCommitted ? tc.getAbortedIndexList(partitionData.fetchOffset) : null); - responseData.put(topicPartition, new PartitionData<>( - Errors.NONE, - highWatermark, - lso, - highWatermark, // TODO: should it be changed to the logStartOffset? - abortedTransactions, - decodeResult.getRecords())); - tryComplete(); - }); + // get group and consumer + final String groupName = requestHandler + .getCurrentConnectedGroup().computeIfAbsent(clientHost, ignored -> { + String zkSubPath = ZooKeeperUtils.groupIdPathFormat(clientHost, + header.clientId()); + String groupId = ZooKeeperUtils.getData( + requestHandler.getPulsarService().getZkClient(), + requestHandler.getGroupIdStoredPath(), + zkSubPath); + log.info("get group name from zk for current connection:{} groupId:{}", + clientHost, groupId); + return groupId; }); - }); - }); + final long startDecodingEntriesNanos = MathUtils.nowInNano(); + final DecodeResult decodeResult = requestHandler + .getEntryFormatter().decode(entries, magic); + requestHandler.requestStats.getFetchDecodeStats().registerSuccessfulEvent( + MathUtils.elapsedNanos(startDecodingEntriesNanos), TimeUnit.NANOSECONDS); + decodeResults.add(decodeResult); + + // collect consumer metrics + updateConsumerStats(topicPartition, + decodeResult.getRecords(), + entries.size(), + groupName); + + final List abortedTransactions = + (readCommitted ? tc.getAbortedIndexList(partitionData.fetchOffset) : null); + responseData.put(topicPartition, new PartitionData<>( + Errors.NONE, + highWatermark, + lso, + highWatermark, // TODO: should it be changed to the logStartOffset? + abortedTransactions, + decodeResult.getRecords())); + tryComplete(); + } + + private List getCommittedEntries(List entries, long lso) { + List committedEntries; + committedEntries = new ArrayList<>(); + for (Entry entry : entries) { + try { + if (lso >= MessageIdUtils.peekBaseOffsetFromEntry(entry)) { + committedEntries.add(entry); + } else { + break; + } + } catch (KoPMessageMetadataNotFoundException e) { + log.error("[{}:{}] Failed to peek base offset from entry.", + entry.getLedgerId(), entry.getEntryId()); + } + } + return committedEntries; } private CompletableFuture> readEntries(final ManagedCursor cursor, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java index 253a031f6c..c514fa0d24 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.kop; import com.google.common.annotations.VisibleForTesting; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -52,8 +53,8 @@ private void registerQueueLatency(boolean success) { } } - public void addListener(CompletableFuture topicFuture, - @NonNull Consumer persistentTopicConsumer, + public void addListener(CompletableFuture> topicFuture, + @NonNull Consumer> persistentTopicConsumer, @NonNull Consumer exceptionConsumer) { if (count.compareAndSet(0, 1)) { // The first pending future comes @@ -73,7 +74,7 @@ public void addListener(CompletableFuture topicFuture, currentTopicFuture = currentTopicFuture.thenApply(topicThrowablePair -> { if (topicThrowablePair.getThrowable() == null) { registerQueueLatency(true); - persistentTopicConsumer.accept(topicThrowablePair.getPersistentTopic()); + persistentTopicConsumer.accept(topicThrowablePair.getPersistentTopicOpt()); } else { registerQueueLatency(false); exceptionConsumer.accept(topicThrowablePair.getThrowable()); @@ -104,20 +105,20 @@ public int size() { class TopicThrowablePair { @Getter - private final PersistentTopic persistentTopic; + private final Optional persistentTopicOpt; @Getter private final Throwable throwable; - public static TopicThrowablePair withTopic(final PersistentTopic persistentTopic) { - return new TopicThrowablePair(persistentTopic, null); + public static TopicThrowablePair withTopic(final Optional persistentTopicOpt) { + return new TopicThrowablePair(persistentTopicOpt, null); } public static TopicThrowablePair withThrowable(final Throwable throwable) { - return new TopicThrowablePair(null, throwable); + return new TopicThrowablePair(Optional.empty(), throwable); } - private TopicThrowablePair(final PersistentTopic persistentTopic, final Throwable throwable) { - this.persistentTopic = persistentTopic; + private TopicThrowablePair(final Optional persistentTopicOpt, final Throwable throwable) { + this.persistentTopicOpt = persistentTopicOpt; this.throwable = throwable; } }; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java index 39ddd86469..bef80fea20 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java @@ -206,12 +206,12 @@ public CompletableFuture canLookupAsync(KafkaPrincipal principal, Resou String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); CompletableFuture canLookupFuture = new CompletableFuture<>(); - authorize(principal, AuthAction.produce, resource).whenComplete((hasProducePermission, ex) -> { + authorize(principal, AuthAction.consume, resource).whenComplete((hasProducePermission, ex) -> { if (ex != null) { if (log.isDebugEnabled()) { log.debug( "Resource [{}] Principal [{}] exception occurred while trying to " - + "check Produce permissions. {}", + + "check Consume permissions. {}", resource, principal, ex.getMessage()); } hasProducePermission = false; @@ -220,12 +220,12 @@ public CompletableFuture canLookupAsync(KafkaPrincipal principal, Resou canLookupFuture.complete(true); return; } - authorize(principal, AuthAction.consume, resource).whenComplete((hasConsumerPermission, e) -> { + authorize(principal, AuthAction.produce, resource).whenComplete((hasConsumerPermission, e) -> { if (e != null) { if (log.isDebugEnabled()) { log.debug( "Resource [{}] Principal [{}] exception occurred while trying to " - + "check Consume permissions. {}", + + "check Produce permissions. {}", resource, principal, e.getMessage()); } canLookupFuture.completeExceptionally(e); diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java index 6c32cfab4d..7131599cff 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java @@ -15,6 +15,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; @@ -54,9 +55,9 @@ private static List range(int start, int end) { @Test(timeOut = 10000) void testNormalComplete() throws ExecutionException, InterruptedException { final PendingTopicFutures pendingTopicFutures = new PendingTopicFutures(null); - final CompletableFuture topicFuture = CompletableFuture.supplyAsync(() -> { + final CompletableFuture> topicFuture = CompletableFuture.supplyAsync(() -> { sleep(800); - return null; + return Optional.empty(); }); final List completedIndexes = new ArrayList<>(); final List changesOfPendingCount = new ArrayList<>(); @@ -86,7 +87,7 @@ void testNormalComplete() throws ExecutionException, InterruptedException { @Test(timeOut = 10000) void testExceptionalComplete() throws ExecutionException, InterruptedException { final PendingTopicFutures pendingTopicFutures = new PendingTopicFutures(null); - final CompletableFuture topicFuture = CompletableFuture.supplyAsync(() -> { + final CompletableFuture> topicFuture = CompletableFuture.supplyAsync(() -> { sleep(800); throw new RuntimeException("error"); }); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java index f7d43935a0..fa437188d0 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import javax.crypto.SecretKey; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -38,6 +39,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; @@ -290,5 +292,112 @@ void testListTopic() throws Exception { admin.topics().deletePartitionedTopic(fullNewTopicName); } + @Test(timeOut = 20000) + void testProduceFailed() throws PulsarAdminException, ExecutionException, InterruptedException { + String newTenant = "newProduceFailed"; + String testTopic = "persistent://" + newTenant + "/" + NAMESPACE + "/topic1"; + try { + admin.tenants().createTenant(newTenant, + TenantInfo.builder() + .adminRoles(Collections.singleton(ADMIN_USER)) + .allowedClusters(Collections.singleton(configClusterName)) + .build()); + admin.namespaces().createNamespace(newTenant + "/" + NAMESPACE); + admin.namespaces().grantPermissionOnNamespace(newTenant + "/" + NAMESPACE, SIMPLE_USER, + Sets.newHashSet(AuthAction.consume)); + admin.topics().createPartitionedTopic(testTopic, 1); + + // Admin must have produce permissions + @Cleanup + KProducer adminProducer = new KProducer(testTopic, false, "localhost", getKafkaBrokerPort(), + newTenant + "/" + NAMESPACE, "token:" + adminToken); + int totalMsgs = 10; + String messageStrPrefix = testTopic + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + adminProducer.getProducer().send(new ProducerRecord<>(testTopic, i, messageStr)).get(); + } + + // Ensure can consume message. + @Cleanup + KConsumer kConsumer = new KConsumer(testTopic, "localhost", getKafkaBrokerPort(), false, + newTenant + "/" + NAMESPACE, "token:" + adminToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(testTopic)); + + int i = 0; + while (i < totalMsgs) { + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + Integer key = record.key(); + assertEquals(messageStrPrefix + key.toString(), record.value()); + i++; + } + } + assertEquals(i, totalMsgs); + + // no more records + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); + assertTrue(records.isEmpty()); + + // User can't produce, because don't have produce action. + @Cleanup + KProducer kProducer = new KProducer(testTopic, false, "localhost", getKafkaBrokerPort(), + newTenant + "/" + NAMESPACE, "token:" + userToken); + try { + kProducer.getProducer().send(new ProducerRecord<>(testTopic, 0, "")).get(); + fail("expected TopicAuthorizationException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TopicAuthorizationException); + } + } finally { + // Cleanup + admin.topics().deletePartitionedTopic(testTopic); + admin.namespaces().deleteNamespace(newTenant + "/" + NAMESPACE); + admin.tenants().deleteTenant(newTenant); + } + } + + @Test(timeOut = 20000) + void testConsumeFailed() throws PulsarAdminException, ExecutionException, InterruptedException { + String newTenant = "testConsumeFailed"; + String testTopic = "persistent://" + newTenant + "/" + NAMESPACE + "/topic1"; + try { + // Create new tenant, namespace and topic + admin.tenants().createTenant(newTenant, + TenantInfo.builder() + .adminRoles(Collections.singleton(ADMIN_USER)) + .allowedClusters(Collections.singleton(configClusterName)) + .build()); + admin.namespaces().createNamespace(newTenant + "/" + NAMESPACE); + admin.namespaces().grantPermissionOnNamespace(newTenant + "/" + NAMESPACE, SIMPLE_USER, + Sets.newHashSet(AuthAction.produce)); + admin.topics().createPartitionedTopic(testTopic, 1); + + // SIMPLE_USER can produce + @Cleanup + KProducer adminProducer = new KProducer(testTopic, false, "localhost", getKafkaBrokerPort(), + newTenant + "/" + NAMESPACE, "token:" + userToken); + adminProducer.getProducer().send(new ProducerRecord<>(testTopic, 0, "message")).get(); + + + // Consume should be failed. + @Cleanup + KConsumer kConsumer = new KConsumer(testTopic, "localhost", getKafkaBrokerPort(), false, + newTenant + "/" + NAMESPACE, "token:" + userToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(testTopic)); + try { + kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + fail("expected TopicAuthorizationException"); + } catch (TopicAuthorizationException ignore) { + log.info("Has TopicAuthorizationException."); + } + } finally { + // Cleanup + admin.topics().deletePartitionedTopic(testTopic); + admin.namespaces().deleteNamespace(newTenant + "/" + NAMESPACE); + admin.tenants().deleteTenant(newTenant); + } + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java index 574f9fc27d..60c77b5291 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java @@ -21,8 +21,11 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.jsonwebtoken.SignatureAlgorithm; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; @@ -30,25 +33,46 @@ import io.streamnative.pulsar.handlers.kop.security.auth.Resource; import io.streamnative.pulsar.handlers.kop.security.auth.ResourceType; import io.streamnative.pulsar.handlers.kop.stats.NullStatsLogger; +import io.streamnative.pulsar.handlers.kop.utils.KopTopic; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import javax.crypto.SecretKey; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetFetchRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.broker.protocol.ProtocolHandler; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; @@ -70,9 +94,12 @@ public class KafkaRequestHandlerWithAuthorizationTest extends KopProtocolHandler private static final String SHORT_TOPIC = "topic1"; private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC; private static final int DEFAULT_PARTITION_NUM = 2; + private SocketAddress serviceAddress; private static final String ADMIN_USER = "admin_user"; + private String adminToken; + private KafkaRequestHandler handler; private AdminManager adminManager; @@ -89,7 +116,7 @@ protected void setup() throws Exception { authConf.setProperties(properties); provider.initialize(authConf); - String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); + adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); super.resetConfig(); conf.setDefaultNumPartitions(DEFAULT_PARTITION_NUM); @@ -143,6 +170,8 @@ protected void setup() throws Exception { Channel mockChannel = mock(Channel.class); doReturn(mockChannel).when(mockCtx).channel(); handler.ctx = mockCtx; + + serviceAddress = new InetSocketAddress(pulsar.getBindAddress(), kafkaBrokerPort); } @Override @@ -241,5 +270,197 @@ public void testMetadataListTopic() throws Exception { }); } + @Test(timeOut = 20000) + public void testHandleProduceRequest() throws ExecutionException, InterruptedException { + KafkaRequestHandler spyHandler = spy(handler); + final RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, (short) 1, "client", 0); + final ProduceRequest request = createProduceRequest(TOPIC); + final CompletableFuture responseFuture = new CompletableFuture<>(); + + spyHandler.handleProduceRequest(new KafkaCommandDecoder.KafkaHeaderAndRequest( + header, + request, + PulsarByteBufAllocator.DEFAULT.heapBuffer(), + null), responseFuture); + AbstractResponse response = responseFuture.get(); + assertEquals((int) response.errorCounts().get(Errors.TOPIC_AUTHORIZATION_FAILED), 1); + } + + @Test(timeOut = 20000) + public void testHandleListOffsetRequestAuthorizationSuccess() throws Exception { + KafkaRequestHandler spyHandler = spy(handler); + String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + + "testHandleListOffsetRequestAuthorizationSuccess"; + + // Mock all authorize call + doReturn(CompletableFuture.completedFuture(true)) + .when(spyHandler) + .authorize(eq(AclOperation.DESCRIBE), + eq(Resource.of(ResourceType.TOPIC, TopicName.get(topicName).getPartition(0).toString())) + ); + + // Create partitioned topic. + admin.topics().createPartitionedTopic(topicName, 1); + TopicPartition tp = new TopicPartition(topicName, 0); + + @Cleanup + KProducer kProducer = new KProducer(topicName, + false, + "localhost", + getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, + "token:" + adminToken + ); + int totalMsgs = 10; + String messageStrPrefix = topicName + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + kProducer.getProducer() + .send(new ProducerRecord<>(topicName, i, messageStr)) + .get(); + log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr); + } + + // Test for ListOffset request verify Earliest get earliest + Map targetTimes = Maps.newHashMap(); + targetTimes.put(tp, ListOffsetRequest.EARLIEST_TIMESTAMP); + + ListOffsetRequest.Builder builder = ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(targetTimes); + + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + CompletableFuture responseFuture = new CompletableFuture<>(); + spyHandler.handleListOffsetRequest(request, responseFuture); + + AbstractResponse response = responseFuture.get(); + ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response; + assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.NONE); + assertEquals(listOffsetResponse.responseData().get(tp).offset.intValue(), 0); + assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(0)); + } + + @Test(timeOut = 20000) + public void testHandleListOffsetRequestAuthorizationFailed() throws Exception { + KafkaRequestHandler spyHandler = spy(handler); + String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + + "testHandleListOffsetRequestAuthorizationFailed"; + + // create partitioned topic. + admin.topics().createPartitionedTopic(topicName, 1); + TopicPartition tp = new TopicPartition(topicName, 0); + + ListOffsetRequest.Builder builder = ListOffsetRequest.Builder + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(new HashMap(){{ + put(tp, ListOffsetRequest.EARLIEST_TIMESTAMP); + }}); + + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + CompletableFuture responseFuture = new CompletableFuture<>(); + spyHandler.handleListOffsetRequest(request, responseFuture); + + AbstractResponse response = responseFuture.get(); + ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response; + assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.TOPIC_AUTHORIZATION_FAILED); + } + + + @Test(timeOut = 20000) + public void testHandleOffsetFetchRequestAuthorizationSuccess() + throws PulsarAdminException, ExecutionException, InterruptedException { + KafkaRequestHandler spyHandler = spy(handler); + String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + + "testHandleOffsetFetchRequestAuthorizationSuccess"; + String groupId = "DemoKafkaOnPulsarConsumer"; + + // create partitioned topic. + admin.topics().createPartitionedTopic(topicName, 1); + TopicPartition tp = new TopicPartition(new KopTopic(topicName).getFullName(), 0); + doReturn(CompletableFuture.completedFuture(true)) + .when(spyHandler) + .authorize(eq(AclOperation.DESCRIBE), + eq(Resource.of(ResourceType.TOPIC, new KopTopic(tp.topic()).getFullName())) + ); + OffsetFetchRequest.Builder builder = + new OffsetFetchRequest.Builder(groupId, Collections.singletonList(tp)); + + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + CompletableFuture responseFuture = new CompletableFuture<>(); + + spyHandler.handleOffsetFetchRequest(request, responseFuture); + + AbstractResponse response = responseFuture.get(); + + assertTrue(response instanceof OffsetFetchResponse); + OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; + assertEquals(offsetFetchResponse.responseData().size(), 1); + assertEquals(offsetFetchResponse.error(), Errors.NONE); + offsetFetchResponse.responseData().forEach((topicPartition, partitionData) -> { + assertEquals(partitionData.error, Errors.NONE); + }); + } + + @Test(timeOut = 20000) + public void testHandleOffsetFetchRequestAuthorizationFailed() + throws PulsarAdminException, ExecutionException, InterruptedException { + KafkaRequestHandler spyHandler = spy(handler); + String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + + "testHandleOffsetFetchRequestAuthorizationFailed"; + String groupId = "DemoKafkaOnPulsarConsumer"; + + // create partitioned topic. + admin.topics().createPartitionedTopic(topicName, 1); + TopicPartition tp = new TopicPartition(new KopTopic(topicName).getFullName(), 0); + OffsetFetchRequest.Builder builder = + new OffsetFetchRequest.Builder(groupId, Collections.singletonList(tp)); + + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + CompletableFuture responseFuture = new CompletableFuture<>(); + + spyHandler.handleOffsetFetchRequest(request, responseFuture); + + AbstractResponse response = responseFuture.get(); + + assertTrue(response instanceof OffsetFetchResponse); + OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; + assertEquals(offsetFetchResponse.responseData().size(), 1); + assertEquals(offsetFetchResponse.error(), Errors.NONE); + offsetFetchResponse.responseData().forEach((topicPartition, partitionData) -> { + assertEquals(partitionData.error, Errors.TOPIC_AUTHORIZATION_FAILED); + }); + } + + KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder) { + AbstractRequest request = builder.build(); + builder.apiKey(); + + ByteBuffer serializedRequest = request + .serialize(new RequestHeader( + builder.apiKey(), + request.version(), + "fake_client_id", + 0) + ); + + ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest); + + RequestHeader header = RequestHeader.parse(serializedRequest); + + ApiKeys apiKey = header.apiKey(); + short apiVersion = header.apiVersion(); + Struct struct = apiKey.parseRequest(apiVersion, serializedRequest); + AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); + return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress); + } + + private ProduceRequest createProduceRequest(String topic) { + Map partitionRecords = new HashMap<>(); + TopicPartition topicPartition = new TopicPartition(topic, 0); + partitionRecords.put(topicPartition, + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes()))); + return ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, partitionRecords).build(); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java index 8ec2f7d394..459d8bfca7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -53,6 +54,7 @@ public class SimpleAclAuthorizerTest extends KopProtocolHandlerTestBase { private static final String ANOTHER_USER = "death_eater_user"; private static final String ADMIN_USER = "admin_user"; private static final String TENANT_ADMIN_USER = "tenant_admin_user"; + private static final String TOPIC_LEVEL_PERMISSIONS_USER = "topic_level_permission_user"; private static final String TENANT = "SimpleAcl"; private static final String NAMESPACE = "ns1"; @@ -237,4 +239,31 @@ public void testAuthorizeTenantAdmin() throws ExecutionException, InterruptedExc assertTrue(isAuthorized); } + + @Test + public void testTopicLevelPermissions() throws PulsarAdminException, ExecutionException, InterruptedException { + String topic = "persistent://" + TENANT + "/" + NAMESPACE + "/topic_level_permission_test_topic"; + admin.topics().createPartitionedTopic(topic, 1); + admin.topics().grantPermission(topic, TOPIC_LEVEL_PERMISSIONS_USER, Sets.newHashSet(AuthAction.produce)); + + Boolean isAuthorized = simpleAclAuthorizer.canLookupAsync( + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER), + Resource.of(ResourceType.TOPIC, topic)).get(); + assertTrue(isAuthorized); + + isAuthorized = simpleAclAuthorizer.canProduceAsync( + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER), + Resource.of(ResourceType.TOPIC, topic)).get(); + assertTrue(isAuthorized); + + isAuthorized = simpleAclAuthorizer.canConsumeAsync( + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER), + Resource.of(ResourceType.TOPIC, topic)).get(); + assertFalse(isAuthorized); + + isAuthorized = simpleAclAuthorizer.canProduceAsync( + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER), + Resource.of(ResourceType.TOPIC, TOPIC)).get(); + assertFalse(isAuthorized); + } } \ No newline at end of file