KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown#11351
KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown#11351junrao merged 12 commits intoapache:trunkfrom
Conversation
| if (logManager != null) | ||
| CoreUtils.swallow(logManager.shutdown(), this) | ||
| // be sure to shutdown scheduler after log manager | ||
| // be sure to shutdown scheduler before log manager |
There was a problem hiding this comment.
(1) Could we add some comment to explain why we want to shut down the log manager later?
(2) If we are shutting down, it makes sense to stop the scheduler first since there is no guarantee any asynchronously scheduled task will complete. If this is the case, should we shut down the scheduler before any other component (e.g. ReplicaManger also uses scheduler)?
(3) Once the scheduler is shut down, scheduling new tasks causes IllegalStateException. That's probably the exception that #10538 tries to fix. To avoid polluting the log, perhaps we could change KafkaScheduler such that it avoids throwing an exception once it's shutdown.
There was a problem hiding this comment.
Are all tasks scheduled of the async variety? It would be nice if we would stop issuing new async tasks when shutdown starts, but let the existing ones complete.
There was a problem hiding this comment.
It seems that none of the scheduled calls depends on the completion of the returned future. When shutting down KafkaScheduler, we call ScheduledThreadPoolExecutor.shutdown(), which doesn't wait for all existing tasks to complete. This seems ok since for important tasks (e.g. flushing the log), we make explicit calls during shutdown.
There was a problem hiding this comment.
Thanks for the code review. I moved kafkaScheduler.shutdown upfront, changed the exception to an info-level log, and added comments.
For the exception to log change: I looked at the code that calls startup and shutdown. It appears we always call startup right after creating the scheduler object and call shutdown when calling the parent's shutdown or close. I think the callers make sure that they won't call scheduler.schedule to a scheduler that has been shutdown, unless we are shutting down the broker. So it should be okay to change the IllegalStateException to an info-level log
There was a problem hiding this comment.
@junrao shutdown:
"Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down."
Maybe you were thinking of shutdownNow?
There was a problem hiding this comment.
IIUC, the code (1) lets running tasks to complete, (2) waits for tasks scheduled with executor.schedule(runnable, delay, unit) to finish after the delay, and (3) cancels future tasks scheduled by executor.scheduleAtFixedRate(runnable, delay, period, unit)
kafka/core/src/main/scala/kafka/utils/KafkaScheduler.scala
Lines 121 to 124 in b61ec00
There was a problem hiding this comment.
@ijuma and @ccding : The description of ScheduledThreadPoolExecutor.shutdown() says
This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.
So, we probably want to call awaitTermination() with a timeout like 10secs to make sure all existing tasks complete before shutting down other components.
We probably can't use shutdownNow() since it interrupts the task and could cause IOException when blocking operations (e.g., force) are applied on a file channel. This will then lead to unclean shutdown.
There was a problem hiding this comment.
Yeah, it does not wait for previously submitted tasks to complete execution, also does not kill the tasks. KafkaScheduler does call awaitTermination with a 1-day timeout. Do you think we want a shorter timeout?
There was a problem hiding this comment.
@ccding : Missed that part. awaitTermination with a 1-day timeout is fine.
|
FYI: there is a side effect of not throwing kafka/core/src/main/scala/kafka/log/LocalLog.scala Lines 895 to 909 in 5a6f19b |
| this synchronized { | ||
| ensureRunning() | ||
| if (!isStarted) { | ||
| info("Kafka scheduler is not running at the time '%s' is scheduled.".format(name)) |
There was a problem hiding this comment.
Perhaps we could add that we are ignoring the named task.
| ensureRunning() | ||
| if (!isStarted) { | ||
| info("Kafka scheduler is not running at the time '%s' is scheduled.".format(name)) | ||
| return null |
There was a problem hiding this comment.
In UnifiedLog, we have code uses the returned future.
val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
With this change, perhaps we could return an Option and let the caller deal with it accordingly?
| * For example, if we didn't shutdown the scheduler first, when LogManager was closing | ||
| * partitions one by one, the scheduler might concurrently delete old segments due to | ||
| * retention. However, the old segments could have been closed by the LogManager, which would | ||
| * cause an exception and subsequently mark logdir as offline. As a result, the broker would |
| * cause an exception and subsequently mark logdir as offline. As a result, the broker would | ||
| * not flush the remaining partitions or write the clean shutdown marker. Ultimately, the | ||
| * broker would have to take hours to recover the log during restart and are subject to | ||
| * potential data loss. |
There was a problem hiding this comment.
I don't think unclean shutdown will cause data loss if acks = all is used. So, we can just remove that statement.
| if (logManager != null) | ||
| CoreUtils.swallow(logManager.shutdown(), this) | ||
| // be sure to shutdown scheduler after log manager | ||
| // be sure to shutdown scheduler before log manager |
There was a problem hiding this comment.
@ijuma and @ccding : The description of ScheduledThreadPoolExecutor.shutdown() says
This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.
So, we probably want to call awaitTermination() with a timeout like 10secs to make sure all existing tasks complete before shutting down other components.
We probably can't use shutdownNow() since it interrupts the task and could cause IOException when blocking operations (e.g., force) are applied on a file channel. This will then lead to unclean shutdown.
|
@ccding : For the side effect that you mentioned, a segment to be renamed to .delete, but won't be physically deleted. This seems fine since on broker restart, we have the logic to delete all .delete files. |
|
Addressed all comments above. PTAL |
| * @return A Future object to manage the task scheduled. | ||
| */ | ||
| def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_] | ||
| def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : Option[ScheduledFuture[_]] |
There was a problem hiding this comment.
Could we change the comment on return value accordingly?
There was a problem hiding this comment.
Because we decided not to throw an exception, to avoid polluting the log during broker shutdown. If we don't throw, we must return something: Option looks better than null.
There was a problem hiding this comment.
We can return a Future that does nothing too, right?
There was a problem hiding this comment.
The question is, do we need to let the caller know that the schedule call has failed? I am not sure if we will have this use case in the future, though the current codebase doesn't have such a case.
Please let me know if you still think we should return a Future that does nothing. I can update the PR.
There was a problem hiding this comment.
Yeah, it seems that we are not using this functionality at the moment so not sure if the additional complexity helps. Also, we can't guarantee that the submission will not go through just before the scheduler is closed. So, it gives a misleading impression that we can count on this behavior.
| }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) | ||
|
|
||
| if (producerExpireCheckOption.isEmpty) | ||
| throw new IllegalStateException("Failed to schedule PeriodicProducerExpirationCheck witch KafkaScheduler.") |
There was a problem hiding this comment.
If we get here, it's because we are shutting down the broker. So, it doesn't seem it's an illegal state. Perhaps we could just let it go?
| if (logManager != null) | ||
| CoreUtils.swallow(logManager.shutdown(), this) | ||
| // be sure to shutdown scheduler after log manager | ||
| // be sure to shutdown scheduler before log manager |
There was a problem hiding this comment.
@ccding : Missed that part. awaitTermination with a 1-day timeout is fine.
|
I pushed twice with the two commits. It appears Jenkins is currently running on the first commit, and the second one is pending. Unfortunately, I don't have permission to stop it from running the first one. |
There was a problem hiding this comment.
@ccding Thanks for the PR! It seems this PR also fixes KAFKA-13070. Can we keep just one of the jiras and refer to it in the PR description (KAFKA-13070 was the pre-existing jira)?
| ensureRunning() | ||
| if (!isStarted) { | ||
| info("Kafka scheduler is not running at the time task '%s' is scheduled. The task is ignored.".format(name)) | ||
| return None |
There was a problem hiding this comment.
With little bit refactor, it looks possible to avoid the return statement here.
There was a problem hiding this comment.
Thanks. Fixed it. I somehow got the habit from other languages of returning first in error cases.
| override def get(): Unit = {} | ||
| override def get(timeout: Long, unit: TimeUnit): Unit = {} | ||
| override def getDelay(unit: TimeUnit): Long = 0 | ||
| override def compareTo(o: Delayed): Int = 0 |
There was a problem hiding this comment.
Should Delayed be ScheduledFuture? Also, instead of always returning 0, it seems that it's better to return 0 if the other instance is NoOpScheduledFutureTask and return -1 or 1 otherwise?
There was a problem hiding this comment.
NoOpScheduledFutureTask extends ScheduledFuture, ScheduledFuture extends Delayed, Delayed extends Comparable<Delayed>: therefore we should use Delayed here. Also, it doesn't compile if I changed it to ScheduledFuture
Fixed the return value.
…an shutdown (#11351) This also fixes KAFKA-13070. We have seen a problem caused by shutting down the scheduler before shutting down LogManager. When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart. This PR essentially reverts #10538 Reviewers: Ismael Juma <ismael@juma.me.uk>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
|
cherry-picked to 3.0 branch too. |
…an shutdown (apache#11351) This also fixes KAFKA-13070. We have seen a problem caused by shutting down the scheduler before shutting down LogManager. When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart. This PR essentially reverts apache#10538 Reviewers: Ismael Juma <ismael@juma.me.uk>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
…an shutdown (apache#11351) This also fixes KAFKA-13070. We have seen a problem caused by shutting down the scheduler before shutting down LogManager. When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart. This PR essentially reverts apache#10538 Reviewers: Ismael Juma <ismael@juma.me.uk>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
This also fixes KAFKA-13070.
We have seen a problem caused by shutting down the scheduler before shutting down LogManager.
When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart.
This PR essentially reverts #10538
I believe the exception #10538 saw is at
kafka/core/src/main/scala/kafka/log/LocalLog.scala
Lines 895 to 903 in 5a6f19b
cc @rondagostino @ijuma @cmccabe @junrao @dhruvilshah3 as authors/reviewers of the PRs mentioned above to make sure this change look okay.
Committer Checklist (excluded from commit message)