-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Terminate partition when FASTER refuses to checkpoint for over a minute #301
Conversation
This is not to be merged as-is, it is for a private package. If this logs prove to be useful there, we can look to format them in a way that is amenable for the prod-ready package. |
async Task RunCheckpointingStateMachine() | ||
{ | ||
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) | ||
if (this.pendingStoreCheckpoint != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved the checkpoint state machine logic from Process
onto this own method. But I realize that may make this PR harder to review. If it helps, I can undo this change and delay it to a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine. It means i can't compare line by line but I think I can spot the differences since I looked at this code a lot in the last few days.
void StartCheckpointOrFailOnTimeout(Func<bool> checkpointRoutine, string messageOnError) | ||
{ | ||
var checkpointStarted = checkpointRoutine.Invoke(); | ||
if (checkpointStarted) | ||
{ | ||
this.timeOfFirstRefusedCheckpoint = null; | ||
} | ||
else | ||
{ | ||
// track start of FASTER refusal to start checkpoint | ||
var currentTime = DateTime.UtcNow; | ||
this.timeOfFirstRefusedCheckpoint ??= currentTime; | ||
|
||
// if the refusal to checkpoint started over a minute ago, terminate partition | ||
TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value; | ||
if (duration > TimeSpan.FromMinutes(1)) | ||
{ | ||
messageOnError += $" FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}"; | ||
Exception err = new Exception(messageOnError); | ||
this.partition.ErrorHandler.HandleError(nameof(StartCheckpointOrFailOnTimeout), messageOnError, err, terminatePartition: true, reportAsWarning: false); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. seems right.
if (duration > TimeSpan.FromMinutes(1)) | ||
{ | ||
messageOnError += $" FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}"; | ||
Exception err = new Exception(messageOnError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure if we had more fine grained Exception classes for this kind of error, so I just used a generic Exception
constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to pass an exception, it is optional (only meant to be used if you caught one). Just pass null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know. I'll do that and also update the HandleError
signature to flag that the Exception can be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of comments with minor suggestions.
} | ||
} | ||
|
||
async Task RunCheckpointingStateMachine() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this a ValueTask to save an allocation on the common path (where the method completes synchronously)
{ | ||
if (this.pendingIndexCheckpoint.IsCompleted == true) | ||
{ | ||
this.traceHelper.FasterProgress("Checkpointing state machine: trying to start store checkpoint"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggested wording: pendingIndexCheckpoint has completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applied in next commit
{ | ||
if (this.pendingStoreCheckpoint.IsCompleted == true) | ||
{ | ||
this.traceHelper.FasterProgress("Checkpointing state machine: store checkpoint no longer pending"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggested wording: pendingStoreCheckpoint has completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I recognize the phrasing here and in other similar logs was weird. I just wanted to avoid implying that every time this log was emitted, that the pendingStoreCheckpoint completed again, which isn't necessarily the case.
But now I think it's too minor of a concern. I'll take your suggestion.
} | ||
else | ||
{ | ||
this.traceHelper.FasterProgress("Checkpointing state machine: index checkpoint still pending"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above; may make more sense to print this info as part of the LogCheckpointStats
} | ||
else | ||
{ | ||
this.traceHelper.FasterProgress("Checkpointing state machine: store checkpoint still pending"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will again spam the logs. It may make more sense to print this info as part of the LogCheckpointStats
async Task RunCheckpointingStateMachine() | ||
{ | ||
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) | ||
if (this.pendingStoreCheckpoint != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine. It means i can't compare line by line but I think I can spot the differences since I looked at this code a lot in the last few days.
if (duration > TimeSpan.FromMinutes(1)) | ||
{ | ||
messageOnError += $" FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}"; | ||
Exception err = new Exception(messageOnError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to pass an exception, it is optional (only meant to be used if you caught one). Just pass null.
TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value; | ||
if (duration > TimeSpan.FromMinutes(1)) | ||
{ | ||
messageOnError += $" FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: string is starting with an extra space
I dont' think you need to add quotes around a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually on purpose. Since we're adding new text to the error, I need to separate it with a space. I'm adding a dot (".") in there to make this clearer.
void StartCheckpointOrFailOnTimeout(Func<bool> checkpointRoutine, string messageOnError) | ||
{ | ||
var checkpointStarted = checkpointRoutine.Invoke(); | ||
if (checkpointStarted) | ||
{ | ||
this.timeOfFirstRefusedCheckpoint = null; | ||
} | ||
else | ||
{ | ||
// track start of FASTER refusal to start checkpoint | ||
var currentTime = DateTime.UtcNow; | ||
this.timeOfFirstRefusedCheckpoint ??= currentTime; | ||
|
||
// if the refusal to checkpoint started over a minute ago, terminate partition | ||
TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value; | ||
if (duration > TimeSpan.FromMinutes(1)) | ||
{ | ||
messageOnError += $" FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}"; | ||
Exception err = new Exception(messageOnError); | ||
this.partition.ErrorHandler.HandleError(nameof(StartCheckpointOrFailOnTimeout), messageOnError, err, terminatePartition: true, reportAsWarning: false); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. seems right.
@sebastianburckhardt: I'm unable to respond to your comments to state that I've applied your feedback, so here's me stating just that. I think all your feedback has been addressed. |
If this PR isn't meant to be merged, can we switch it to being a draft PR? We do this enough that I wonder if a new label might also be helpful. |
For visibility: we discussed this offline and clarified that this PR is meant to be merged. I also updated the description to make this clearer. |
@@ -499,20 +503,39 @@ public override async Task<long> RunCompactionAsync(long target) | |||
target - this.Log.BeginAddress, | |||
this.GetElapsedCompactionMilliseconds()); | |||
|
|||
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: 10 minutes was determined to be a safe timeout window using various kusto queries to determine the expected duration of compaction. But it could be too conservative
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conservative is probably o.k. as a starting point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly pretty good but there are a few things that need fixing I think.
@@ -499,20 +503,39 @@ public override async Task<long> RunCompactionAsync(long target) | |||
target - this.Log.BeginAddress, | |||
this.GetElapsedCompactionMilliseconds()); | |||
|
|||
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conservative is probably o.k. as a starting point.
this.partition.ErrorHandler.HandleError(nameof(RunCompactionAsync), exceptionMessage, e: null, terminatePartition: true, reportAsWarning: true); | ||
|
||
// we need resolve the task to ensure the 'finally' block is executed which frees up another thread to start compating | ||
tcs.SetException(new OperationCanceledException(exceptionMessage)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should use TrySetException
here since the timeout will still happen after the compaction completed successfully and SetException
then throws an exception.
Similarly, you need to change the tcs.SetResult
(1x) and tcs.SetException
(2x) further below to tcs.TrySetResult
and tcs.TrySetException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied in latest commit: df76d16
void LogCheckpointStats() | ||
{ | ||
long inputQueuePositionLag = this.GetInputQueuePositionLag(); | ||
this.CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CheckpointDue
has several side effects, so I think it is not safe to insert an extra call to it from this location. I would just remove it. Your are already printing a LOT of tracing even without the 'trigger' information, and one can still look at the other traces to see the triggers that take place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in my latest commit: df76d16
this.pendingCheckpointTrigger = trigger; | ||
|
||
var isCompactionSkipped = !compactUntil.HasValue; | ||
this.pendingCompaction = isCompactionSkipped ? Task.FromResult((long?)null) : this.RunCompactionAsync(compactUntil.Value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by skipping the call to RunCompactionAsync
you are also eliminating the Notify()
(see comment below). However, I think that one is still needed. The point of Notify
is to tell the storeworker that there is more work to do, i.e. that it should run another iteration. That is necessary here because the next run of the storeworker is going to advance the state machine immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my latest commit, I move the Notify
method out here so it is still called in both places. Does that seem like a fair refactoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite, unfortunately. When doing the compaction, the Notify needs to be called after the compaction completes. With your change, it gets called after the compaction is started, which is too soon.
(the point of Notify is to tell the storeworker that it is possible to move the state machine forward, which is true only if (a) we skipped compaction, or (b) compaction is complete)
var isCompactionSkipped = !compactUntil.HasValue; | ||
this.pendingCompaction = isCompactionSkipped ? Task.FromResult((long?)null) : this.RunCompactionAsync(compactUntil.Value); | ||
|
||
if (isCompactionSkipped) | ||
{ | ||
this.traceHelper.FasterProgress($"RunCompactionAsync was skipped because a compaction range was not provided by trigger = {trigger}."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this log message is fully redundant with the one 8 lines earlier.
} | ||
|
||
target = await this.store.RunCompactionAsync(target); | ||
this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler); | ||
this.Notify(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.Notify should still be called even if compaction is skipped (see earlier comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address in earlier comment ,good catch!
this.pendingCheckpointTrigger = trigger; | ||
|
||
var isCompactionSkipped = !compactUntil.HasValue; | ||
this.pendingCompaction = isCompactionSkipped ? Task.FromResult((long?)null) : this.RunCompactionAsync(compactUntil.Value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite, unfortunately. When doing the compaction, the Notify needs to be called after the compaction completes. With your change, it gets called after the compaction is started, which is too soon.
(the point of Notify is to tell the storeworker that it is possible to move the state machine forward, which is true only if (a) we skipped compaction, or (b) compaction is complete)
Co-authored-by: Sebastian Burckhardt <sburckha@microsoft.com>
…icrosoft/durabletask-netherite into dajusto/add-checkpointing-logs
@sebastianburckhardt: I see now why you wrote |
…rite into dajusto/add-checkpointing-logs
…icrosoft/durabletask-netherite into dajusto/add-checkpointing-logs
@@ -504,7 +504,7 @@ public override async Task<long> RunCompactionAsync(long target) | |||
this.GetElapsedCompactionMilliseconds()); | |||
|
|||
var tokenSource = new CancellationTokenSource(); | |||
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10), tokenSource.Token); //.ContinueWith(_ => tokenSource.Dispose()); | |||
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10), tokenSource.Token).ContinueWith(_ => tokenSource.Dispose()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is the right way to dispose the timer task.
The problem is that you want to dispose it after the timeout completes AND after line 526. Otherwise line 526 may throw an exception.
That is why I suggested to put the ContinueWith on line 529.
* Update GH automation (#303) * initial commit (#290) * Revert "initial commit (#290)" (#314) This reverts commit 3a2d193. * Bump Azure.Identity from 1.7.0 to 1.10.2 in /samples/TokenCredentialDTFx (#323) Bumps [Azure.Identity](https://github.com/Azure/azure-sdk-for-net) from 1.7.0 to 1.10.2. - [Release notes](https://github.com/Azure/azure-sdk-for-net/releases) - [Commits](Azure/azure-sdk-for-net@Azure.Identity_1.7.0...Azure.Identity_1.10.2) --- updated-dependencies: - dependency-name: Azure.Identity dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Terminate partition when FASTER refuses to checkpoint for over a minute (#301) --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Varshitha Bachu <vabachu@microsoft.com> Co-authored-by: Sebastian Burckhardt <sburckha@microsoft.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Update CreationRequestReceived.cs * remove the faster-alternate data store and fix broken deserialization. (#308) * fix and instrument commitlog serialization and deserialization. (#305) * fix discard of packets so the next packet to expect is updated correctly (#302) * New EventHubs performance tests (#178) * rebase, and remove changes to scale monitor * add hello cities test that prints the history of a nested orchestration * implement a watchdog that terminates CompletePending quickly if it hangs (#318) * add unique id to scale monitor constructor (#316) * fix bugs from PR #302 (#315) * fix cache size reporting (#321) * added a comment * update durable task package references to 2.15.1 and 2.12.0 (#317) * update DT and DF package references * update to latest * sync dev w/ main (#324) * Update GH automation (#303) * initial commit (#290) * Revert "initial commit (#290)" (#314) This reverts commit 3a2d193. * Bump Azure.Identity from 1.7.0 to 1.10.2 in /samples/TokenCredentialDTFx (#323) Bumps [Azure.Identity](https://github.com/Azure/azure-sdk-for-net) from 1.7.0 to 1.10.2. - [Release notes](https://github.com/Azure/azure-sdk-for-net/releases) - [Commits](Azure/azure-sdk-for-net@Azure.Identity_1.7.0...Azure.Identity_1.10.2) --- updated-dependencies: - dependency-name: Azure.Identity dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Terminate partition when FASTER refuses to checkpoint for over a minute (#301) --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Varshitha Bachu <vabachu@microsoft.com> Co-authored-by: Sebastian Burckhardt <sburckha@microsoft.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Sebastian Burckhardt <sburckha@microsoft.com> Co-authored-by: David Justo <david.justo.1996@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Related: Azure/azure-functions-durable-extension#2534
This is a WIP, rough draft of adding logs to the checkpointing state machine so that we can better identify any bugs on this codepath.Update:
This PR contains a fix that terminates a partition when FASTER refuses to checkpoint for over a minute. In addition to this fix, we're adding more logs to the entire checkpoint state machine for easier debuggability moving forward. This PR is meant to be merged.