Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ class BrokerLifecycleManager(
/**
* The event queue.
*/
private[server] val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
private[server] val eventQueue = new KafkaEventQueue(time,
logContext,
threadNamePrefix.getOrElse(""),
new ShutdownEvent())

/**
* Start the BrokerLifecycleManager.
Expand Down Expand Up @@ -239,7 +242,7 @@ class BrokerLifecycleManager(
* Start shutting down the BrokerLifecycleManager, but do not block.
*/
def beginShutdown(): Unit = {
eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
eventQueue.beginShutdown("beginShutdown");
}

/**
Expand Down Expand Up @@ -470,7 +473,7 @@ class BrokerLifecycleManager(
override def run(): Unit = {
if (!initialRegistrationSucceeded) {
error("Shutting down because we were unable to register with the controller quorum.")
eventQueue.beginShutdown("registrationTimeout", new ShutdownEvent())
eventQueue.beginShutdown("registrationTimeout");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class BrokerMetadataSnapshotter(
/**
* The event queue which runs this listener.
*/
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""), new ShutdownEvent())

override def maybeStartSnapshot(
lastContainedLogTime: Long,
Expand Down Expand Up @@ -126,7 +126,7 @@ class BrokerMetadataSnapshotter(
}

def beginShutdown(): Unit = {
eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
eventQueue.beginShutdown("beginShutdown");
}

class ShutdownEvent() extends EventQueue.Event {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
Expand Down Expand Up @@ -209,7 +210,7 @@ private MetadataLoader(
this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY;
this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix);
this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix, new ShutdownEvent());
}

private boolean stillNeedToCatchUp(long offset) {
Expand Down Expand Up @@ -537,9 +538,14 @@ public long lastAppliedOffset() {

@Override
public void beginShutdown() {
eventQueue.beginShutdown("beginShutdown", () -> {
eventQueue.beginShutdown("beginShutdown");
}

class ShutdownEvent implements EventQueue.Event {
@Override
public void run() throws Exception {
for (Iterator<MetadataPublisher> iter = uninitializedPublishers.values().iterator();
iter.hasNext(); ) {
iter.hasNext(); ) {
closePublisher(iter.next());
iter.remove();
}
Expand All @@ -548,7 +554,7 @@ public void beginShutdown() {
closePublisher(iter.next());
iter.remove();
}
});
}
}

Time time() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public SnapshotFileReader(String snapshotPath, RaftClient.Listener<ApiMessageAnd
this.snapshotPath = snapshotPath;
this.listener = listener;
this.queue = new KafkaEventQueue(Time.SYSTEM,
new LogContext("[snapshotReaderQueue] "), "snapshotReaderQueue_");
new LogContext("[snapshotReaderQueue] "), "snapshotReaderQueue_", new ShutdownEvent());
this.caughtUpFuture = new CompletableFuture<>();
}

Expand Down Expand Up @@ -174,22 +174,24 @@ public void beginShutdown(String reason) {
} else {
caughtUpFuture.completeExceptionally(new RuntimeException(reason));
}
queue.beginShutdown(reason, new EventQueue.Event() {
@Override
public void run() throws Exception {
listener.beginShutdown();
if (fileRecords != null) {
fileRecords.close();
fileRecords = null;
}
batchIterator = null;
}
queue.beginShutdown(reason);
}

@Override
public void handleException(Throwable e) {
log.error("shutdown error", e);
class ShutdownEvent implements EventQueue.Event {
@Override
public void run() throws Exception {
listener.beginShutdown();
if (fileRecords != null) {
fileRecords.close();
fileRecords = null;
}
});
batchIterator = null;
}

@Override
public void handleException(Throwable e) {
log.error("shutdown error", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ public LocalLogManager(LogContext logContext,
this.nodeId = nodeId;
this.shared = shared;
this.maxReadOffset = shared.initialMaxReadOffset();
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix, new ShutdownEvent());
shared.registerLogManager(this);
}

Expand Down Expand Up @@ -601,21 +602,26 @@ private static int messageSize(ApiMessageAndVersion messageAndVersion, ObjectSer
}

public void beginShutdown() {
eventQueue.beginShutdown("beginShutdown", () -> {
eventQueue.beginShutdown("beginShutdown");
}

class ShutdownEvent implements EventQueue.Event {
@Override
public void run() throws Exception {
try {
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
resign(leader.epoch());
for (MetaLogListenerData listenerData : listeners.values()) {
listenerData.beginShutdown();
}
shared.unregisterLogManager(this);
shared.unregisterLogManager(LocalLogManager.this);
}
} catch (Exception e) {
log.error("Unexpected exception while sending beginShutdown callbacks", e);
}
shutdown = true;
});
}
}

@Override
Expand Down
36 changes: 3 additions & 33 deletions server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.OptionalLong;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;


Expand Down Expand Up @@ -209,44 +208,15 @@ void enqueue(EventInsertionType insertionType,
Function<OptionalLong, OptionalLong> deadlineNsCalculator,
Event event);

/**
* Asynchronously shut down the event queue with no unnecessary delay.
* @see #beginShutdown(String, Event, long, TimeUnit)
*
* @param source The source of the shutdown.
*/
default void beginShutdown(String source) {
beginShutdown(source, new VoidEvent());
}

/**
* Asynchronously shut down the event queue with no unnecessary delay.
*
* @param source The source of the shutdown.
* @param cleanupEvent The mandatory event to invoke after all other events have
* been processed.
* @see #beginShutdown(String, Event, long, TimeUnit)
*/
default void beginShutdown(String source, Event cleanupEvent) {
beginShutdown(source, cleanupEvent, 0, TimeUnit.SECONDS);
}

/**
* Asynchronously shut down the event queue.
*
* No new events will be accepted, and the timeout will be initiated
* for all existing events.
* No new events will be accepted, and the queue thread will exit after running the existing events.
* Deferred events will receive TimeoutExceptions.
*
* @param source The source of the shutdown.
* @param cleanupEvent The mandatory event to invoke after all other events have
* been processed.
* @param timeSpan The amount of time to use for the timeout.
* Once the timeout elapses, any remaining queued
* events will get a
* {@link org.apache.kafka.common.errors.TimeoutException}.
* @param timeUnit The time unit to use for the timeout.
*/
void beginShutdown(String source, Event cleanupEvent, long timeSpan, TimeUnit timeUnit);
void beginShutdown(String source);

/**
* @return The number of pending and running events. If this is 0, there is no running event and
Expand Down
Loading