Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[FEATURE] Add authorization to produce and consume (#671)
Browse files Browse the repository at this point in the history
## Motivation
#236 Add authorization to produce and consumer

## Modifications
* Add authorization to `handleProduceRequest`
* Add authorization to `handleOffsetFetchRequest`
* Add authorization to `handleListOffsetRequest`
* Add authorization to `MessageFetchContext#handleFetch`
* Add new test units for produce or consume permissions
* Add new test units for topic level permissions
  • Loading branch information
Demogorgon314 authored and BewareMyPower committed Aug 26, 2021
1 parent 1096877 commit 5601c66
Show file tree
Hide file tree
Showing 9 changed files with 828 additions and 297 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class KafkaTopicManager {

// cache for topics: <topicName, persistentTopic>, for removing producer
@Getter
private static final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>>
private static final ConcurrentHashMap<String, CompletableFuture<Optional<PersistentTopic>>>
topics = new ConcurrentHashMap<>();
// cache for references in PersistentTopic: <topicName, producer>
@Getter
Expand Down Expand Up @@ -123,12 +123,12 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
t -> {
final CompletableFuture<KafkaTopicConsumerManager> tcmFuture = new CompletableFuture<>();
getTopic(t).whenComplete((persistentTopic, throwable) -> {
if (persistentTopic != null && throwable == null) {
if (persistentTopic.isPresent() && throwable == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Call getTopicConsumerManager for {}, and create TCM for {}.",
requestHandler.ctx.channel(), topicName, persistentTopic);
}
tcmFuture.complete(new KafkaTopicConsumerManager(requestHandler, persistentTopic));
tcmFuture.complete(new KafkaTopicConsumerManager(requestHandler, persistentTopic.get()));
} else {
if (throwable != null) {
log.error("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' throws {}",
Expand Down Expand Up @@ -201,22 +201,22 @@ private CompletableFuture<InetSocketAddress> lookupBroker(final String topic) {
}

// A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance
public CompletableFuture<PersistentTopic> getTopic(String topicName) {
public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName) {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopic({}) since channel is closing",
requestHandler.ctx.channel(), topicName);
}
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<PersistentTopic> topicCompletableFuture = new CompletableFuture<>();
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture = new CompletableFuture<>();
brokerService.getTopicIfExists(topicName).whenComplete((t2, throwable) -> {
if (throwable != null) {
// The ServiceUnitNotReadyException is retriable so we should print a warning log instead of error log
if (throwable instanceof BrokerServiceException.ServiceUnitNotReadyException) {
log.warn("[{}] Failed to getTopic {}: {}",
requestHandler.ctx.channel(), topicName, throwable.getMessage());
topicCompletableFuture.complete(null);
topicCompletableFuture.complete(Optional.empty());
} else {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), topicName, throwable);
Expand All @@ -228,11 +228,11 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
topicCompletableFuture.complete(Optional.of(persistentTopic));
} else {
log.error("[{}]Get empty topic for name {}", requestHandler.ctx.channel(), topicName);
removeTopicManagerCache(topicName);
topicCompletableFuture.complete(null);
topicCompletableFuture.complete(Optional.empty());
}
});
// cache for removing producer
Expand Down Expand Up @@ -291,7 +291,7 @@ public static Producer getReferenceProducer(String topicName) {

private static void removePersistentTopicAndReferenceProducer(final String topicName) {
// 1. Remove PersistentTopic and Producer from caches, these calls are thread safe
final CompletableFuture<PersistentTopic> topicFuture = topics.remove(topicName);
final CompletableFuture<Optional<PersistentTopic>> topicFuture = topics.remove(topicName);
final Producer producer = references.remove(topicName);

if (topicFuture == null) {
Expand All @@ -303,10 +303,10 @@ private static void removePersistentTopicAndReferenceProducer(final String topic
try {
// It's safe to wait until the future is completed because it's completed when
// `BrokerService#getTopicIfExists` completed and it won't block too long.
final PersistentTopic persistentTopic = topicFuture.get();
if (producer != null && persistentTopic != null) {
final Optional<PersistentTopic> persistentTopic = topicFuture.get();
if (producer != null && persistentTopic.isPresent()) {
try {
persistentTopic.removeProducer(producer);
persistentTopic.get().removeProducer(producer);
} catch (IllegalArgumentException ignored) {
log.error("[{}] The producer's topic ({}) doesn't match the current PersistentTopic",
topicName, (producer.getTopic() == null) ? "null" : producer.getTopic().getName());
Expand Down
Loading

0 comments on commit 5601c66

Please sign in to comment.