Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,14 @@ public interface UncheckedCloseable extends AutoCloseable {

/**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
* <b>Be cautious when passing method references as an argument.</b> For example:
* <p>
* {@code closeQuietly(task::stop, "source task");}
* <p>
* Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method
* reference from a null object will result in a {@link NullPointerException}. In the example code above,
* it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to
* use a method reference from it.
*/
public static void closeQuietly(AutoCloseable closeable, String name) {
if (closeable != null) {
Expand All @@ -1009,6 +1017,17 @@ public static void closeQuietly(AutoCloseable closeable, String name) {
}
}

/**
* Closes {@code closeable} and if an exception is thrown, it is registered to the firstException parameter.
* <b>Be cautious when passing method references as an argument.</b> For example:
* <p>
* {@code closeQuietly(task::stop, "source task");}
* <p>
* Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method
* reference from a null object will result in a {@link NullPointerException}. In the example code above,
* it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to
* use a method reference from it.
*/
public static void closeQuietly(AutoCloseable closeable, String name, AtomicReference<Throwable> firstException) {
if (closeable != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -233,7 +234,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
this.admin = admin;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
this.offsetStore = offsetStore;
this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks");
this.closeExecutor = closeExecutor;
this.sourceTaskContext = sourceTaskContext;

Expand Down