diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index af71e3ecd33c5..7d84167cf24fc 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -998,6 +998,14 @@ public interface UncheckedCloseable extends AutoCloseable { /** * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. + * Be cautious when passing method references as an argument. For example: + *

+ * {@code closeQuietly(task::stop, "source task");} + *

+ * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method + * reference from a null object will result in a {@link NullPointerException}. In the example code above, + * it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to + * use a method reference from it. */ public static void closeQuietly(AutoCloseable closeable, String name) { if (closeable != null) { @@ -1009,6 +1017,17 @@ public static void closeQuietly(AutoCloseable closeable, String name) { } } + /** + * Closes {@code closeable} and if an exception is thrown, it is registered to the firstException parameter. + * Be cautious when passing method references as an argument. For example: + *

+ * {@code closeQuietly(task::stop, "source task");} + *

+ * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method + * reference from a null object will result in a {@link NullPointerException}. In the example code above, + * it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to + * use a method reference from it. + */ public static void closeQuietly(AutoCloseable closeable, String name, AtomicReference firstException) { if (closeable != null) { try { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 837891071a49a..72817a02f0f66 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -58,6 +58,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -233,7 +234,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id, this.admin = admin; this.offsetReader = offsetReader; this.offsetWriter = offsetWriter; - this.offsetStore = offsetStore; + this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks"); this.closeExecutor = closeExecutor; this.sourceTaskContext = sourceTaskContext;