Skip to content
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,19 @@ class BrokerServer(
}
metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this))

/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other
* resources that might have been shutdown and cause exceptions.
* For example, if we didn't shutdown the scheduler first, when LogManager was closing
* partitions one by one, the scheduler might concurrently delete old segments due to
* retention. However, the old segments could have been closed by the LogManager, which would
* cause an IOException and subsequently mark logdir as offline. As a result, the broker would
* not flush the remaining partitions or write the clean shutdown marker. Ultimately, the
* broker would have to take hours to recover the log during restart.
*/
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)

if (transactionCoordinator != null)
CoreUtils.swallow(transactionCoordinator.shutdown(), this)
if (groupCoordinator != null)
Expand All @@ -501,9 +514,6 @@ class BrokerServer(

if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)
// be sure to shutdown scheduler after log manager
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)

if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,19 @@ class KafkaServer(
if (controlPlaneRequestHandlerPool != null)
CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)

/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other
* resources that might have been shutdown and cause exceptions.
* For example, if we didn't shutdown the scheduler first, when LogManager was closing
* partitions one by one, the scheduler might concurrently delete old segments due to
* retention. However, the old segments could have been closed by the LogManager, which would
* cause an IOException and subsequently mark logdir as offline. As a result, the broker would
* not flush the remaining partitions or write the clean shutdown marker. Ultimately, the
* broker would have to take hours to recover the log during restart.
*/
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)

if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
Expand Down Expand Up @@ -737,9 +750,6 @@ class KafkaServer(

if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)
// be sure to shutdown scheduler after log manager
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)

if (kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown(), this)
Expand Down
50 changes: 33 additions & 17 deletions core/src/main/scala/kafka/utils/KafkaScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.concurrent._
import atomic._
import org.apache.kafka.common.utils.KafkaThread

import java.util.concurrent.TimeUnit.NANOSECONDS

/**
* A scheduler for running jobs
*
Expand Down Expand Up @@ -107,21 +109,25 @@ class KafkaScheduler(val threads: Int,
debug("Scheduling task %s with initial delay %d ms and period %d ms."
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
this synchronized {
ensureRunning()
val runnable: Runnable = () => {
try {
trace("Beginning execution of scheduled task '%s'.".format(name))
fun()
} catch {
case t: Throwable => error(s"Uncaught exception in scheduled task '$name'", t)
} finally {
trace("Completed execution of scheduled task '%s'.".format(name))
if (isStarted) {
val runnable: Runnable = () => {
try {
trace("Beginning execution of scheduled task '%s'.".format(name))
fun()
} catch {
case t: Throwable => error(s"Uncaught exception in scheduled task '$name'", t)
} finally {
trace("Completed execution of scheduled task '%s'.".format(name))
}
}
if (period >= 0)
executor.scheduleAtFixedRate(runnable, delay, period, unit)
else
executor.schedule(runnable, delay, unit)
} else {
info("Kafka scheduler is not running at the time task '%s' is scheduled. The task is ignored.".format(name))
new NoOpScheduledFutureTask
}
if (period >= 0)
executor.scheduleAtFixedRate(runnable, delay, period, unit)
else
executor.schedule(runnable, delay, unit)
}
}

Expand All @@ -141,9 +147,19 @@ class KafkaScheduler(val threads: Int,
executor != null
}
}

private def ensureRunning(): Unit = {
if (!isStarted)
throw new IllegalStateException("Kafka scheduler is not running.")
}

private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
override def isCancelled: Boolean = true
override def isDone: Boolean = true
override def get(): Unit = {}
override def get(timeout: Long, unit: TimeUnit): Unit = {}
override def getDelay(unit: TimeUnit): Long = 0
override def compareTo(o: Delayed): Int = {
val diff = getDelay(NANOSECONDS) - o.getDelay(NANOSECONDS)
if (diff < 0) -1
else if (diff > 0) 1
else 0
}
}