From 3972e765ead8056506eb5dd4196174cc0fe21c7a Mon Sep 17 00:00:00 2001 From: Sagar Rao Date: Tue, 21 Jun 2022 17:23:39 +0530 Subject: [PATCH 1/5] KAFKA-14012: Adding null checks for cases when closeQuietly was being passed a lambda object --- .../kafka/connect/runtime/AbstractWorkerSourceTask.java | 6 ++++-- .../kafka/connect/storage/KafkaConfigBackingStore.java | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 837891071a49a..8819547d63776 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -298,7 +298,7 @@ public void removeMetrics() { @Override protected void close() { - if (started) { + if (started && task != null) { Utils.closeQuietly(task::stop, "source task"); } @@ -310,7 +310,9 @@ protected void close() { Utils.closeQuietly(transformationChain, "transformation chain"); Utils.closeQuietly(retryWithToleranceOperator, "retry operator"); Utils.closeQuietly(offsetReader, "offset reader"); - Utils.closeQuietly(offsetStore::stop, "offset backing store"); + if (offsetReader != null) { + Utils.closeQuietly(offsetStore::stop, "offset backing store"); + } } private void closeProducer(Duration duration) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 76c626964e6d2..73f9bd36f7be1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -357,7 +357,10 @@ public void stop() { relinquishWritePrivileges(); Utils.closeQuietly(ownTopicAdmin, "admin for config topic"); - Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic"); + + if (configLog != null) { + Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic"); + } log.info("Closed KafkaConfigBackingStore"); } From cee3c3144c69dbc5c2eed55c05a6c1636cdad971 Mon Sep 17 00:00:00 2001 From: Sagar Rao Date: Tue, 21 Jun 2022 18:14:00 +0530 Subject: [PATCH 2/5] Removing null check for configLog and task and added not null for offsetStore --- .../kafka/connect/runtime/AbstractWorkerSourceTask.java | 8 ++++---- .../kafka/connect/storage/KafkaConfigBackingStore.java | 5 +---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 8819547d63776..f2cea47a30d89 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -58,6 +58,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -241,6 +242,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id, this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); this.topicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups); + Objects.requireNonNull(this.offsetStore); } @Override @@ -298,7 +300,7 @@ public void removeMetrics() { @Override protected void close() { - if (started && task != null) { + if (started) { Utils.closeQuietly(task::stop, "source task"); } @@ -310,9 +312,7 @@ protected void close() { Utils.closeQuietly(transformationChain, "transformation chain"); Utils.closeQuietly(retryWithToleranceOperator, "retry operator"); Utils.closeQuietly(offsetReader, "offset reader"); - if (offsetReader != null) { - Utils.closeQuietly(offsetStore::stop, "offset backing store"); - } + Utils.closeQuietly(offsetStore::stop, "offset backing store"); } private void closeProducer(Duration duration) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 73f9bd36f7be1..76c626964e6d2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -357,10 +357,7 @@ public void stop() { relinquishWritePrivileges(); Utils.closeQuietly(ownTopicAdmin, "admin for config topic"); - - if (configLog != null) { - Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic"); - } + Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic"); log.info("Closed KafkaConfigBackingStore"); } From e7c394adb14d2cb1dee3fe2636a379c062378e82 Mon Sep 17 00:00:00 2001 From: Sagar Rao Date: Wed, 22 Jun 2022 22:08:34 +0530 Subject: [PATCH 3/5] Review comments addressed --- .../apache/kafka/connect/runtime/AbstractWorkerSourceTask.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index f2cea47a30d89..72817a02f0f66 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -234,7 +234,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id, this.admin = admin; this.offsetReader = offsetReader; this.offsetWriter = offsetWriter; - this.offsetStore = offsetStore; + this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks"); this.closeExecutor = closeExecutor; this.sourceTaskContext = sourceTaskContext; @@ -242,7 +242,6 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id, this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); this.topicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups); - Objects.requireNonNull(this.offsetStore); } @Override From 6c244fbcdeb5565498419ae4825ebd65867d586e Mon Sep 17 00:00:00 2001 From: Sagar Rao Date: Sun, 26 Jun 2022 20:39:04 +0530 Subject: [PATCH 4/5] Adding comments to javadocs of closeQuietly and closeAllQuietly --- .../java/org/apache/kafka/common/utils/Utils.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index af71e3ecd33c5..b45abd0e4a933 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -997,7 +997,9 @@ public interface UncheckedCloseable extends AutoCloseable { } /** - * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. + * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. Note that, if + * a method reference of null object is passed as closeable to this method, then that leads to NPE and + * the close() doesn't happen as expected. */ public static void closeQuietly(AutoCloseable closeable, String name) { if (closeable != null) { @@ -1009,6 +1011,11 @@ public static void closeQuietly(AutoCloseable closeable, String name) { } } + /* + * Closes {@code closeable} and if an exception is thrown, it is registered to the firstException parameter. + * Note that, if a method reference of null object is passed as closeable to this method, then that leads to NPE and + * * the close() doesn't happen as expected. + */ public static void closeQuietly(AutoCloseable closeable, String name, AtomicReference firstException) { if (closeable != null) { try { @@ -1021,7 +1028,8 @@ public static void closeQuietly(AutoCloseable closeable, String name, AtomicRefe } /** - * close all closable objects even if one of them throws exception. + * close all closable objects even if one of them throws exception. Have care when passing method references as a + * closeable as a method reference on a null object would throw an NPE and the close won't happen as expected. * @param firstException keeps the first exception * @param name message of closing those objects * @param closeables closable objects From fd8eca7569a82c3b0a886e3ddae210efbed4c91a Mon Sep 17 00:00:00 2001 From: Sagar Rao Date: Sat, 2 Jul 2022 17:17:42 +0530 Subject: [PATCH 5/5] Updating javadocs based on suggestions from review comments --- .../org/apache/kafka/common/utils/Utils.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index b45abd0e4a933..7d84167cf24fc 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -997,9 +997,15 @@ public interface UncheckedCloseable extends AutoCloseable { } /** - * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. Note that, if - * a method reference of null object is passed as closeable to this method, then that leads to NPE and - * the close() doesn't happen as expected. + * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. + * Be cautious when passing method references as an argument. For example: + *

+ * {@code closeQuietly(task::stop, "source task");} + *

+ * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method + * reference from a null object will result in a {@link NullPointerException}. In the example code above, + * it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to + * use a method reference from it. */ public static void closeQuietly(AutoCloseable closeable, String name) { if (closeable != null) { @@ -1011,10 +1017,16 @@ public static void closeQuietly(AutoCloseable closeable, String name) { } } - /* + /** * Closes {@code closeable} and if an exception is thrown, it is registered to the firstException parameter. - * Note that, if a method reference of null object is passed as closeable to this method, then that leads to NPE and - * * the close() doesn't happen as expected. + * Be cautious when passing method references as an argument. For example: + *

+ * {@code closeQuietly(task::stop, "source task");} + *

+ * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method + * reference from a null object will result in a {@link NullPointerException}. In the example code above, + * it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to + * use a method reference from it. */ public static void closeQuietly(AutoCloseable closeable, String name, AtomicReference firstException) { if (closeable != null) { @@ -1028,8 +1040,7 @@ public static void closeQuietly(AutoCloseable closeable, String name, AtomicRefe } /** - * close all closable objects even if one of them throws exception. Have care when passing method references as a - * closeable as a method reference on a null object would throw an NPE and the close won't happen as expected. + * close all closable objects even if one of them throws exception. * @param firstException keeps the first exception * @param name message of closing those objects * @param closeables closable objects