diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f49c3e8edb65e..862a2927ecc6a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -306,7 +306,7 @@
+ files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
();
this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY;
- this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix);
+ this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix, new ShutdownEvent());
}
private boolean stillNeedToCatchUp(long offset) {
@@ -537,9 +538,14 @@ public long lastAppliedOffset() {
@Override
public void beginShutdown() {
- eventQueue.beginShutdown("beginShutdown", () -> {
+ eventQueue.beginShutdown("beginShutdown");
+ }
+
+ class ShutdownEvent implements EventQueue.Event {
+ @Override
+ public void run() throws Exception {
for (Iterator iter = uninitializedPublishers.values().iterator();
- iter.hasNext(); ) {
+ iter.hasNext(); ) {
closePublisher(iter.next());
iter.remove();
}
@@ -548,7 +554,7 @@ public void beginShutdown() {
closePublisher(iter.next());
iter.remove();
}
- });
+ }
}
Time time() {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 1e5e6371d0cb3..81945d1233741 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -63,7 +63,7 @@ public SnapshotFileReader(String snapshotPath, RaftClient.Listener();
}
@@ -174,22 +174,24 @@ public void beginShutdown(String reason) {
} else {
caughtUpFuture.completeExceptionally(new RuntimeException(reason));
}
- queue.beginShutdown(reason, new EventQueue.Event() {
- @Override
- public void run() throws Exception {
- listener.beginShutdown();
- if (fileRecords != null) {
- fileRecords.close();
- fileRecords = null;
- }
- batchIterator = null;
- }
+ queue.beginShutdown(reason);
+ }
- @Override
- public void handleException(Throwable e) {
- log.error("shutdown error", e);
+ class ShutdownEvent implements EventQueue.Event {
+ @Override
+ public void run() throws Exception {
+ listener.beginShutdown();
+ if (fileRecords != null) {
+ fileRecords.close();
+ fileRecords = null;
}
- });
+ batchIterator = null;
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("shutdown error", e);
+ }
}
@Override
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index a90e2687200ef..492251bb02117 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -501,7 +501,8 @@ public LocalLogManager(LogContext logContext,
this.nodeId = nodeId;
this.shared = shared;
this.maxReadOffset = shared.initialMaxReadOffset();
- this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
+ this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
+ threadNamePrefix, new ShutdownEvent());
shared.registerLogManager(this);
}
@@ -601,7 +602,12 @@ private static int messageSize(ApiMessageAndVersion messageAndVersion, ObjectSer
}
public void beginShutdown() {
- eventQueue.beginShutdown("beginShutdown", () -> {
+ eventQueue.beginShutdown("beginShutdown");
+ }
+
+ class ShutdownEvent implements EventQueue.Event {
+ @Override
+ public void run() throws Exception {
try {
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
@@ -609,13 +615,13 @@ public void beginShutdown() {
for (MetaLogListenerData listenerData : listeners.values()) {
listenerData.beginShutdown();
}
- shared.unregisterLogManager(this);
+ shared.unregisterLogManager(LocalLogManager.this);
}
} catch (Exception e) {
log.error("Unexpected exception while sending beginShutdown callbacks", e);
}
shutdown = true;
- });
+ }
}
@Override
diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
index d0c752e641da9..8c4022cee2be1 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
@@ -21,7 +21,6 @@
import java.util.OptionalLong;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -209,44 +208,15 @@ void enqueue(EventInsertionType insertionType,
Function deadlineNsCalculator,
Event event);
- /**
- * Asynchronously shut down the event queue with no unnecessary delay.
- * @see #beginShutdown(String, Event, long, TimeUnit)
- *
- * @param source The source of the shutdown.
- */
- default void beginShutdown(String source) {
- beginShutdown(source, new VoidEvent());
- }
-
- /**
- * Asynchronously shut down the event queue with no unnecessary delay.
- *
- * @param source The source of the shutdown.
- * @param cleanupEvent The mandatory event to invoke after all other events have
- * been processed.
- * @see #beginShutdown(String, Event, long, TimeUnit)
- */
- default void beginShutdown(String source, Event cleanupEvent) {
- beginShutdown(source, cleanupEvent, 0, TimeUnit.SECONDS);
- }
-
/**
* Asynchronously shut down the event queue.
*
- * No new events will be accepted, and the timeout will be initiated
- * for all existing events.
+ * No new events will be accepted, and the queue thread will exit after running the existing events.
+ * Deferred events will receive TimeoutExceptions.
*
* @param source The source of the shutdown.
- * @param cleanupEvent The mandatory event to invoke after all other events have
- * been processed.
- * @param timeSpan The amount of time to use for the timeout.
- * Once the timeout elapses, any remaining queued
- * events will get a
- * {@link org.apache.kafka.common.errors.TimeoutException}.
- * @param timeUnit The time unit to use for the timeout.
*/
- void beginShutdown(String source, Event cleanupEvent, long timeSpan, TimeUnit timeUnit);
+ void beginShutdown(String source);
/**
* @return The number of pending and running events. If this is 0, there is no running event and
diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index b53339edfe337..7d4f46aa77725 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -23,10 +23,10 @@
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
+
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
@@ -115,34 +115,40 @@ boolean isSingleton() {
/**
* Run the event associated with this EventContext.
+ *
+ * @param log The logger to use.
+ * @param exceptionToDeliver If non-null, the exception to deliver to the event.
+ *
+ * @return True if the thread was interrupted; false otherwise.
*/
- void run(Logger log) throws InterruptedException {
- try {
- event.run();
- } catch (InterruptedException e) {
- throw e;
- } catch (Exception e) {
+ boolean run(Logger log, Throwable exceptionToDeliver) {
+ if (exceptionToDeliver == null) {
try {
- event.handleException(e);
- } catch (Throwable t) {
- log.error("Unexpected exception in handleException", t);
+ event.run();
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while running event. Shutting down event queue");
+ return true;
+ } catch (Throwable e) {
+ log.debug("Got exception while running {}. Invoking handleException.", event, e);
+ exceptionToDeliver = e;
}
}
- }
-
- /**
- * Complete the event associated with this EventContext with a timeout exception.
- */
- void completeWithTimeout() {
- completeWithException(new TimeoutException());
+ if (exceptionToDeliver != null) {
+ completeWithException(log, exceptionToDeliver);
+ }
+ return Thread.currentThread().isInterrupted();
}
/**
* Complete the event associated with this EventContext with the specified
* exception.
*/
- void completeWithException(Throwable t) {
- event.handleException(t);
+ void completeWithException(Logger log, Throwable t) {
+ try {
+ event.handleException(t);
+ } catch (Exception e) {
+ log.error("Unexpected exception in handleException", e);
+ }
}
}
@@ -173,10 +179,14 @@ private class EventHandler implements Runnable {
public void run() {
try {
handleEvents();
- cleanupEvent.run();
} catch (Throwable e) {
log.warn("event handler thread exiting with exception", e);
}
+ try {
+ cleanupEvent.run();
+ } catch (Throwable e) {
+ log.warn("cleanup event threw exception", e);
+ }
}
private void remove(EventContext eventContext) {
@@ -192,23 +202,23 @@ private void remove(EventContext eventContext) {
}
private void handleEvents() throws InterruptedException {
- EventContext toTimeout = null;
+ Throwable toDeliver = null;
EventContext toRun = null;
+ boolean wasInterrupted = false;
while (true) {
- if (toTimeout != null) {
- toTimeout.completeWithTimeout();
- } else if (toRun != null) {
- toRun.run(log);
+ if (toRun != null) {
+ wasInterrupted = toRun.run(log, toDeliver);
}
lock.lock();
try {
- if (toTimeout != null) {
- size--;
- toTimeout = null;
- }
if (toRun != null) {
size--;
+ if (wasInterrupted) {
+ interrupted = wasInterrupted;
+ }
+ toDeliver = null;
toRun = null;
+ wasInterrupted = false;
}
long awaitNs = Long.MAX_VALUE;
Map.Entry entry = deadlineMap.firstEntry();
@@ -224,41 +234,60 @@ private void handleEvents() throws InterruptedException {
// queue. (The value for deferred events is a schedule time
// rather than a timeout.)
remove(eventContext);
+ toDeliver = null;
toRun = eventContext;
} else {
// not a deferred event, so it is a deadline, and it is timed out.
remove(eventContext);
- toTimeout = eventContext;
+ toDeliver = new TimeoutException();
+ toRun = eventContext;
}
continue;
- } else if (closingTimeNs <= now) {
+ } else if (interrupted) {
+ remove(eventContext);
+ toDeliver = new InterruptedException();
+ toRun = eventContext;
+ continue;
+ } else if (shuttingDown) {
remove(eventContext);
- toTimeout = eventContext;
+ toDeliver = new TimeoutException();
+ toRun = eventContext;
continue;
}
awaitNs = timeoutNs - now;
}
if (head.next == head) {
- if ((closingTimeNs != Long.MAX_VALUE) && deadlineMap.isEmpty()) {
+ if (deadlineMap.isEmpty() && (shuttingDown || interrupted)) {
// If there are no more entries to process, and the queue is
// closing, exit the thread.
return;
}
} else {
+ if (interrupted) {
+ toDeliver = new InterruptedException();
+ } else {
+ toDeliver = null;
+ }
toRun = head.next;
remove(toRun);
continue;
}
- if (closingTimeNs != Long.MAX_VALUE) {
- long now = time.nanoseconds();
- if (awaitNs > closingTimeNs - now) {
- awaitNs = closingTimeNs - now;
- }
- }
if (awaitNs == Long.MAX_VALUE) {
- cond.await();
+ try {
+ cond.await();
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for a new event. " +
+ "Shutting down event queue");
+ interrupted = true;
+ }
} else {
- cond.awaitNanos(awaitNs);
+ try {
+ cond.awaitNanos(awaitNs);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for a deferred event. " +
+ "Shutting down event queue");
+ interrupted = true;
+ }
}
} finally {
lock.unlock();
@@ -270,9 +299,12 @@ Exception enqueue(EventContext eventContext,
Function deadlineNsCalculator) {
lock.lock();
try {
- if (closingTimeNs != Long.MAX_VALUE) {
+ if (shuttingDown) {
return new RejectedExecutionException();
}
+ if (interrupted) {
+ return new InterruptedException();
+ }
OptionalLong existingDeadlineNs = OptionalLong.empty();
if (eventContext.tag != null) {
EventContext toRemove =
@@ -362,31 +394,69 @@ int size() {
}
}
+ /**
+ * The clock to use.
+ */
private final Time time;
+
+ /**
+ * The event to run when the queue is closing.
+ */
+ private final Event cleanupEvent;
+
+ /**
+ * The lock which protects private data.
+ */
private final ReentrantLock lock;
+
+ /**
+ * The log4j logger to use.
+ */
private final Logger log;
+
+ /**
+ * The runnable that our thread executes.
+ */
private final EventHandler eventHandler;
+
+ /**
+ * The queue thread.
+ */
private final Thread eventHandlerThread;
/**
- * The time in monotonic nanoseconds when the queue is closing, or Long.MAX_VALUE if
- * the queue is not currently closing.
+ * True if the event queue is shutting down. Protected by the lock.
*/
- private long closingTimeNs;
+ private boolean shuttingDown;
- private Event cleanupEvent;
+ /**
+ * True if the event handler thread was interrupted. Protected by the lock.
+ */
+ private boolean interrupted;
+
+ public KafkaEventQueue(
+ Time time,
+ LogContext logContext,
+ String threadNamePrefix
+ ) {
+ this(time, logContext, threadNamePrefix, VoidEvent::new);
+ }
- public KafkaEventQueue(Time time,
- LogContext logContext,
- String threadNamePrefix) {
+ public KafkaEventQueue(
+ Time time,
+ LogContext logContext,
+ String threadNamePrefix,
+ Event cleanupEvent
+ ) {
this.time = time;
+ this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
this.lock = new ReentrantLock();
this.log = logContext.logger(KafkaEventQueue.class);
this.eventHandler = new EventHandler();
this.eventHandlerThread = new KafkaThread(threadNamePrefix + "EventHandler",
this.eventHandler, false);
- this.closingTimeNs = Long.MAX_VALUE;
- this.cleanupEvent = null;
+ this.shuttingDown = false;
+ this.interrupted = false;
this.eventHandlerThread.start();
}
@@ -398,7 +468,7 @@ public void enqueue(EventInsertionType insertionType,
EventContext eventContext = new EventContext(event, insertionType, tag);
Exception e = eventHandler.enqueue(eventContext, deadlineNsCalculator);
if (e != null) {
- eventContext.completeWithException(e);
+ eventContext.completeWithException(log, e);
}
}
@@ -408,24 +478,15 @@ public void cancelDeferred(String tag) {
}
@Override
- public void beginShutdown(String source, Event newCleanupEvent,
- long timeSpan, TimeUnit timeUnit) {
- if (timeSpan < 0) {
- throw new IllegalArgumentException("beginShutdown must be called with a " +
- "non-negative timeout.");
- }
- Objects.requireNonNull(newCleanupEvent);
+ public void beginShutdown(String source) {
lock.lock();
try {
- if (cleanupEvent != null) {
+ if (shuttingDown) {
log.debug("{}: Event queue is already shutting down.", source);
return;
}
log.info("{}: shutting down event queue.", source);
- cleanupEvent = newCleanupEvent;
- long newClosingTimeNs = time.nanoseconds() + timeUnit.toNanos(timeSpan);
- if (closingTimeNs >= newClosingTimeNs)
- closingTimeNs = newClosingTimeNs;
+ shuttingDown = true;
eventHandler.cond.signal();
} finally {
lock.unlock();
diff --git a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 09910200a1f87..d210df3b7a701 100644
--- a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -35,6 +35,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import static java.util.concurrent.TimeUnit.HOURS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -165,7 +166,7 @@ public void testScheduleDeferred() throws Exception {
queue.close();
}
- private final static long ONE_HOUR_NS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.HOURS);
+ private final static long ONE_HOUR_NS = TimeUnit.NANOSECONDS.convert(1, HOURS);
@Test
public void testScheduleDeferredWithTagReplacement() throws Exception {
@@ -214,7 +215,7 @@ public void testShutdownBeforeDeferred() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
CompletableFuture future = new CompletableFuture<>();
queue.scheduleDeferred("myDeferred",
- __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1)),
+ __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + HOURS.toNanos(1)),
new FutureEvent<>(future, () -> count.getAndAdd(1)));
queue.beginShutdown("testShutdownBeforeDeferred");
assertThrows(ExecutionException.class, () -> future.get());
@@ -257,7 +258,7 @@ public void testSize() throws Exception {
future.complete(null);
TestUtils.waitForCondition(() -> queue.isEmpty(), "Failed to see the queue become empty.");
queue.scheduleDeferred("later",
- __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + TimeUnit.HOURS.toNanos(1)),
+ __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + HOURS.toNanos(1)),
() -> { });
assertFalse(queue.isEmpty());
queue.scheduleDeferred("soon",
@@ -270,4 +271,145 @@ public void testSize() throws Exception {
queue.close();
assertTrue(queue.isEmpty());
}
+
+ /**
+ * Test that we continue handling events after Event#handleException itself throws an exception.
+ */
+ @Test
+ public void testHandleExceptionThrowingAnException() throws Exception {
+ KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(),
+ "testHandleExceptionThrowingAnException");
+ CompletableFuture initialFuture = new CompletableFuture<>();
+ queue.append(() -> initialFuture.get());
+ AtomicInteger counter = new AtomicInteger(0);
+ queue.append(new EventQueue.Event() {
+ @Override
+ public void run() throws Exception {
+ counter.incrementAndGet();
+ throw new IllegalStateException("First exception");
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ if (e instanceof IllegalStateException) {
+ counter.incrementAndGet();
+ throw new RuntimeException("Second exception");
+ }
+ }
+ });
+ queue.append(() -> counter.incrementAndGet());
+ assertEquals(3, queue.size());
+ initialFuture.complete(null);
+ TestUtils.waitForCondition(() -> counter.get() == 3,
+ "Failed to see all events execute as planned.");
+ queue.close();
+ }
+
+ private static class InterruptableEvent implements EventQueue.Event {
+ private final CompletableFuture runFuture;
+ private final CompletableFuture queueThread;
+ private final AtomicInteger numCallsToRun;
+ private final AtomicInteger numInterruptedExceptionsSeen;
+
+ InterruptableEvent(
+ CompletableFuture queueThread,
+ AtomicInteger numCallsToRun,
+ AtomicInteger numInterruptedExceptionsSeen
+ ) {
+ this.runFuture = new CompletableFuture<>();
+ this.queueThread = queueThread;
+ this.numCallsToRun = numCallsToRun;
+ this.numInterruptedExceptionsSeen = numInterruptedExceptionsSeen;
+ }
+
+ @Override
+ public void run() throws Exception {
+ numCallsToRun.incrementAndGet();
+ queueThread.complete(Thread.currentThread());
+ runFuture.get();
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ if (e instanceof InterruptedException) {
+ numInterruptedExceptionsSeen.incrementAndGet();
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Test
+ public void testInterruptedExceptionHandling() throws Exception {
+ KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(),
+ "testInterruptedExceptionHandling");
+ CompletableFuture queueThread = new CompletableFuture<>();
+ AtomicInteger numCallsToRun = new AtomicInteger(0);
+ AtomicInteger numInterruptedExceptionsSeen = new AtomicInteger(0);
+ queue.append(new InterruptableEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
+ queue.append(new InterruptableEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
+ queue.append(new InterruptableEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
+ queue.append(new InterruptableEvent(queueThread, numCallsToRun, numInterruptedExceptionsSeen));
+ queueThread.get().interrupt();
+ TestUtils.retryOnExceptionWithTimeout(30000,
+ () -> assertEquals(1, numCallsToRun.get()));
+ TestUtils.retryOnExceptionWithTimeout(30000,
+ () -> assertEquals(3, numInterruptedExceptionsSeen.get()));
+ queue.close();
+ }
+
+ static class ExceptionTrapperEvent implements EventQueue.Event {
+ final CompletableFuture exception = new CompletableFuture<>();
+
+ @Override
+ public void run() throws Exception {
+ exception.complete(null);
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ exception.complete(e);
+ }
+ }
+
+ @Test
+ public void testInterruptedWithEmptyQueue() throws Exception {
+ CompletableFuture cleanupFuture = new CompletableFuture<>();
+ KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(),
+ "testInterruptedWithEmptyQueue", () -> cleanupFuture.complete(null));
+ CompletableFuture queueThread = new CompletableFuture<>();
+ queue.append(() -> queueThread.complete(Thread.currentThread()));
+ TestUtils.retryOnExceptionWithTimeout(30000, () -> assertEquals(0, queue.size()));
+ queueThread.get().interrupt();
+ cleanupFuture.get();
+ ExceptionTrapperEvent ieTrapper = new ExceptionTrapperEvent();
+ queue.append(ieTrapper);
+ assertEquals(InterruptedException.class, ieTrapper.exception.get().getClass());
+ queue.close();
+ ExceptionTrapperEvent reTrapper = new ExceptionTrapperEvent();
+ queue.append(reTrapper);
+ assertEquals(RejectedExecutionException.class, reTrapper.exception.get().getClass());
+ }
+
+ @Test
+ public void testInterruptedWithDeferredEvents() throws Exception {
+ CompletableFuture cleanupFuture = new CompletableFuture<>();
+ KafkaEventQueue queue = new KafkaEventQueue(Time.SYSTEM, new LogContext(),
+ "testInterruptedWithDeferredEvents", () -> cleanupFuture.complete(null));
+ CompletableFuture queueThread = new CompletableFuture<>();
+ queue.append(() -> queueThread.complete(Thread.currentThread()));
+ ExceptionTrapperEvent ieTrapper1 = new ExceptionTrapperEvent();
+ ExceptionTrapperEvent ieTrapper2 = new ExceptionTrapperEvent();
+ queue.scheduleDeferred("ie2",
+ __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + HOURS.toNanos(2)),
+ ieTrapper2);
+ queue.scheduleDeferred("ie1",
+ __ -> OptionalLong.of(Time.SYSTEM.nanoseconds() + HOURS.toNanos(1)),
+ ieTrapper1);
+ TestUtils.retryOnExceptionWithTimeout(30000, () -> assertEquals(2, queue.size()));
+ queueThread.get().interrupt();
+ cleanupFuture.get();
+ assertEquals(InterruptedException.class, ieTrapper1.exception.get().getClass());
+ assertEquals(InterruptedException.class, ieTrapper2.exception.get().getClass());
+ queue.close();
+ }
}
\ No newline at end of file
diff --git a/server-common/src/test/resources/test/log4j.properties b/server-common/src/test/resources/test/log4j.properties
new file mode 100644
index 0000000000000..be36f90299a77
--- /dev/null
+++ b/server-common/src/test/resources/test/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=INFO