From 629b2e92700dc2a316ed52aa086b12529278f6e6 Mon Sep 17 00:00:00 2001 From: Tim te Beek Date: Tue, 3 Aug 2021 21:57:00 +0200 Subject: [PATCH 1/3] Require key for produce onto compacted topic Fixes #133 --- src/main/java/org/akhq/models/Topic.java | 6 +++++- src/main/java/org/akhq/repositories/RecordRepository.java | 5 +++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/akhq/models/Topic.java b/src/main/java/org/akhq/models/Topic.java index 18ae06007..c4cab5af7 100644 --- a/src/main/java/org/akhq/models/Topic.java +++ b/src/main/java/org/akhq/models/Topic.java @@ -126,7 +126,11 @@ public Boolean canDeleteRecords(String clusterId, ConfigRepository configReposit return false; } - List configs = configRepository.findByTopic(clusterId, this.getName()); + return isCompacted(clusterId, this.getName(), configRepository); + } + + public static boolean isCompacted(String clusterId, String topic, ConfigRepository configRepository) throws ExecutionException, InterruptedException { + List configs = configRepository.findByTopic(clusterId, topic); return configs != null && configs .stream() diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index e627313ad..6bbac5847 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -45,6 +45,9 @@ public class RecordRepository extends AbstractRepository { @Inject private KafkaModule kafkaModule; + @Inject + private ConfigRepository configRepository; + @Inject private TopicRepository topicRepository; @@ -536,6 +539,8 @@ public RecordMetadata produce( } else { keyAsBytes = key.get().getBytes(); } + } else if (Topic.isCompacted(clusterId, topic, configRepository)) { + throw new IllegalArgumentException("Key missing for produce onto compacted topic"); } if (value != null && valueSchemaId.isPresent()) { From 0d6cf93f7b1ad15e2556cdb9ad788e6992c47137 Mon Sep 17 00:00:00 2001 From: Tim te Beek Date: Fri, 6 Aug 2021 23:13:54 +0200 Subject: [PATCH 2/3] Catch exception when determining if topic is compacted --- src/main/java/org/akhq/models/Topic.java | 12 +++++------- .../java/org/akhq/repositories/RecordRepository.java | 10 ++++++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/akhq/models/Topic.java b/src/main/java/org/akhq/models/Topic.java index c4cab5af7..e967ed491 100644 --- a/src/main/java/org/akhq/models/Topic.java +++ b/src/main/java/org/akhq/models/Topic.java @@ -126,15 +126,13 @@ public Boolean canDeleteRecords(String clusterId, ConfigRepository configReposit return false; } - return isCompacted(clusterId, this.getName(), configRepository); + return isCompacted(configRepository.findByTopic(clusterId, this.getName())); } - public static boolean isCompacted(String clusterId, String topic, ConfigRepository configRepository) throws ExecutionException, InterruptedException { - List configs = configRepository.findByTopic(clusterId, topic); - + public static boolean isCompacted(List configs) { return configs != null && configs - .stream() - .filter(config -> config.getName().equals(TopicConfig.CLEANUP_POLICY_CONFIG)) - .anyMatch(config -> config.getValue().contains(TopicConfig.CLEANUP_POLICY_COMPACT)); + .stream() + .filter(config -> config.getName().equals(TopicConfig.CLEANUP_POLICY_CONFIG)) + .anyMatch(config -> config.getValue().contains(TopicConfig.CLEANUP_POLICY_COMPACT)); } } diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 6bbac5847..e22a98af0 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -539,8 +539,14 @@ public RecordMetadata produce( } else { keyAsBytes = key.get().getBytes(); } - } else if (Topic.isCompacted(clusterId, topic, configRepository)) { - throw new IllegalArgumentException("Key missing for produce onto compacted topic"); + } else { + try { + if (Topic.isCompacted(configRepository.findByTopic(clusterId, value))) { + throw new IllegalArgumentException("Key missing for produce onto compacted topic"); + } + } catch (ExecutionException ex) { + log.debug("Failed to determine if {} topic {} is compacted", clusterId, topic, ex); + } } if (value != null && valueSchemaId.isPresent()) { From e013dfac64c1ab9f968070e58d5b88cc2559a430 Mon Sep 17 00:00:00 2001 From: Tim te Beek Date: Fri, 6 Aug 2021 23:14:27 +0200 Subject: [PATCH 3/3] Restore indentation --- src/main/java/org/akhq/models/Topic.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/akhq/models/Topic.java b/src/main/java/org/akhq/models/Topic.java index e967ed491..8e7d22db5 100644 --- a/src/main/java/org/akhq/models/Topic.java +++ b/src/main/java/org/akhq/models/Topic.java @@ -131,8 +131,8 @@ public Boolean canDeleteRecords(String clusterId, ConfigRepository configReposit public static boolean isCompacted(List configs) { return configs != null && configs - .stream() - .filter(config -> config.getName().equals(TopicConfig.CLEANUP_POLICY_CONFIG)) - .anyMatch(config -> config.getValue().contains(TopicConfig.CLEANUP_POLICY_COMPACT)); + .stream() + .filter(config -> config.getName().equals(TopicConfig.CLEANUP_POLICY_CONFIG)) + .anyMatch(config -> config.getValue().contains(TopicConfig.CLEANUP_POLICY_COMPACT)); } }