Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[FEATURE] Add authorization to produce and consume #671

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class KafkaTopicManager {

// cache for topics: <topicName, persistentTopic>, for removing producer
@Getter
private static final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>>
private static final ConcurrentHashMap<String, CompletableFuture<Optional<PersistentTopic>>>
topics = new ConcurrentHashMap<>();
// cache for references in PersistentTopic: <topicName, producer>
@Getter
Expand Down Expand Up @@ -123,12 +123,12 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
t -> {
final CompletableFuture<KafkaTopicConsumerManager> tcmFuture = new CompletableFuture<>();
getTopic(t).whenComplete((persistentTopic, throwable) -> {
if (persistentTopic != null && throwable == null) {
if (persistentTopic.isPresent() && throwable == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Call getTopicConsumerManager for {}, and create TCM for {}.",
requestHandler.ctx.channel(), topicName, persistentTopic);
}
tcmFuture.complete(new KafkaTopicConsumerManager(requestHandler, persistentTopic));
tcmFuture.complete(new KafkaTopicConsumerManager(requestHandler, persistentTopic.get()));
} else {
if (throwable != null) {
log.error("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' throws {}",
Expand Down Expand Up @@ -201,22 +201,22 @@ private CompletableFuture<InetSocketAddress> lookupBroker(final String topic) {
}

// A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance
public CompletableFuture<PersistentTopic> getTopic(String topicName) {
public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName) {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopic({}) since channel is closing",
requestHandler.ctx.channel(), topicName);
}
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<PersistentTopic> topicCompletableFuture = new CompletableFuture<>();
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture = new CompletableFuture<>();
brokerService.getTopicIfExists(topicName).whenComplete((t2, throwable) -> {
if (throwable != null) {
// The ServiceUnitNotReadyException is retriable so we should print a warning log instead of error log
if (throwable instanceof BrokerServiceException.ServiceUnitNotReadyException) {
log.warn("[{}] Failed to getTopic {}: {}",
requestHandler.ctx.channel(), topicName, throwable.getMessage());
topicCompletableFuture.complete(null);
topicCompletableFuture.complete(Optional.empty());
} else {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), topicName, throwable);
Expand All @@ -228,11 +228,11 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
topicCompletableFuture.complete(Optional.of(persistentTopic));
} else {
log.error("[{}]Get empty topic for name {}", requestHandler.ctx.channel(), topicName);
removeTopicManagerCache(topicName);
topicCompletableFuture.complete(null);
topicCompletableFuture.complete(Optional.empty());
}
});
// cache for removing producer
Expand Down Expand Up @@ -291,7 +291,7 @@ public static Producer getReferenceProducer(String topicName) {

private static void removePersistentTopicAndReferenceProducer(final String topicName) {
// 1. Remove PersistentTopic and Producer from caches, these calls are thread safe
final CompletableFuture<PersistentTopic> topicFuture = topics.remove(topicName);
final CompletableFuture<Optional<PersistentTopic>> topicFuture = topics.remove(topicName);
final Producer producer = references.remove(topicName);

if (topicFuture == null) {
Expand All @@ -303,10 +303,10 @@ private static void removePersistentTopicAndReferenceProducer(final String topic
try {
// It's safe to wait until the future is completed because it's completed when
// `BrokerService#getTopicIfExists` completed and it won't block too long.
final PersistentTopic persistentTopic = topicFuture.get();
if (producer != null && persistentTopic != null) {
final Optional<PersistentTopic> persistentTopic = topicFuture.get();
if (producer != null && persistentTopic.isPresent()) {
try {
persistentTopic.removeProducer(producer);
persistentTopic.get().removeProducer(producer);
} catch (IllegalArgumentException ignored) {
log.error("[{}] The producer's topic ({}) doesn't match the current PersistentTopic",
topicName, (producer.getTopic() == null) ? "null" : producer.getTopic().getName());
Expand Down
Loading