diff --git a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs index 7d5a74de..85380ff3 100644 --- a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs +++ b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs @@ -61,7 +61,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL this.host = host; } - public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning) + public void HandleError(string context, string message, Exception? exception, bool terminatePartition, bool isWarning) { bool isFatal = exception != null && Utils.IsFatal(exception); @@ -94,7 +94,7 @@ public void TerminateNormally() } } - void TraceError(bool isWarning, string context, string message, Exception exception, bool terminatePartition) + void TraceError(bool isWarning, string context, string message, Exception? exception, bool terminatePartition) { var logLevel = isWarning ? LogLevel.Warning : LogLevel.Error; if (this.logLevelLimit <= logLevel) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 65e607da..e9abcc14 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -484,7 +484,11 @@ long MinimalLogSize public override async Task RunCompactionAsync(long target) { string id = DateTime.UtcNow.ToString("O"); // for tracing purposes + + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is requesting to enter semaphore with {maxCompactionThreads.CurrentCount} threads available"); await maxCompactionThreads.WaitAsync(); + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} entered semaphore"); + try { long beginAddressBeforeCompaction = this.Log.BeginAddress; @@ -499,20 +503,47 @@ public override async Task RunCompactionAsync(long target) target - this.Log.BeginAddress, this.GetElapsedCompactionMilliseconds()); + var tokenSource = new CancellationTokenSource(); + var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10), tokenSource.Token); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var thread = TrackedThreads.MakeTrackedThread(RunCompaction, $"Compaction.{id}"); thread.Start(); + + var winner = await Task.WhenAny(tcs.Task, timeoutTask); + + if (winner == timeoutTask) + { + // compaction timed out. Terminate partition + var exceptionMessage = $"Compaction {id} time out"; + this.partition.ErrorHandler.HandleError(nameof(RunCompactionAsync), exceptionMessage, e: null, terminatePartition: true, reportAsWarning: true); + + // we need resolve the task to ensure the 'finally' block is executed which frees up another thread to start compating + tcs.TrySetException(new OperationCanceledException(exceptionMessage)); + } + else + { + // cancel the timeout task since compaction completed + tokenSource.Cancel(); + } + + await timeoutTask.ContinueWith(_ => tokenSource.Dispose()); + + // return result of compaction task return await tcs.Task; void RunCompaction() { try { + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} started"); var session = this.CreateASession($"compaction-{id}", true); + + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} obtained a FASTER session"); using (this.TrackTemporarySession(session)) { + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is invoking FASTER's compaction routine"); long compactedUntil = session.Compact(target, CompactionType.Scan); this.TraceHelper.FasterCompactionProgress( @@ -525,17 +556,17 @@ void RunCompaction() this.Log.BeginAddress - beginAddressBeforeCompaction, this.GetElapsedCompactionMilliseconds()); - tcs.SetResult(compactedUntil); + tcs.TrySetResult(compactedUntil); } } catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) { - tcs.SetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken)); + tcs.TrySetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken)); } catch (Exception e) { - tcs.SetException(e); + tcs.TrySetException(e); } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs index b83f34d8..63d0bb86 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs @@ -128,6 +128,15 @@ public void FasterProgress(string details) } } + public void FasterProgress(Func constructString) + { + if (this.logLevelLimit <= LogLevel.Debug) + { + var details = constructString(); + this.FasterProgress(details); + } + } + public void FasterStorageProgress(string details) { if (this.logLevelLimit <= LogLevel.Trace) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs index d608d4ab..116c6245 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs @@ -21,6 +21,7 @@ class StoreWorker : BatchWorker readonly EffectTracker effectTracker; bool isShuttingDown; + DateTime? timeOfFirstRefusedCheckpoint; public string InputQueueFingerprint { get; private set; } public (long,int) InputQueuePosition { get; private set; } @@ -30,15 +31,15 @@ class StoreWorker : BatchWorker // periodic index and store checkpointing CheckpointTrigger pendingCheckpointTrigger; - Task pendingIndexCheckpoint; - Task<(long, (long,int))> pendingStoreCheckpoint; + Task? pendingIndexCheckpoint; + Task<(long, (long,int))>? pendingStoreCheckpoint; (long,int) lastCheckpointedInputQueuePosition; long lastCheckpointedCommitLogPosition; long numberEventsSinceLastCheckpoint; DateTime timeOfNextIdleCheckpoint; // periodic compaction - Task pendingCompaction; + Task? pendingCompaction; // periodic load publishing PartitionLoadInfo loadInfo; @@ -281,6 +282,50 @@ internal enum CheckpointTrigger Idle } + void LogCheckpointStats() + { + long inputQueuePositionLag = this.GetInputQueuePositionLag(); + + // since this is a pure function, we declare it as local static for improved performance + static string ReportNullableTaskStatus(Task? t) + { + if (t == null) + { + return "is null"; + } + else + { + return t.IsCompleted ? "is completed" : "is not completed"; + } + }; + + // since the statistics log is only emitted if the trace level is at least "Debug", + // we defer the construction of the string to relieve GC pressure + string ConstructLogString() + { + + var log = $"Checkpoint statistics: " + + $"LastCheckpointedCommitLogPosition={this.lastCheckpointedCommitLogPosition}, " + + $"MaxNumberBytesBetweenCheckpoints={this.partition.Settings.MaxNumberBytesBetweenCheckpoints}, " + + $"CommitLogPosition={this.CommitLogPosition}, " + + $"NumberEventsSinceLastCheckpoint={this.numberEventsSinceLastCheckpoint}, " + + $"MaxNumberEventsBetweenCheckpoints={this.partition.Settings.MaxNumberEventsBetweenCheckpoints}, " + + $"InputQueuePositionLag={inputQueuePositionLag}," + + $"TimeOfNextIdleCheckpoint={this.timeOfNextIdleCheckpoint}, " + + $"TimeOfFirstRefusedCheckpoint={this.timeOfFirstRefusedCheckpoint}, " + + $"PendingCompaction status={ReportNullableTaskStatus(this.pendingCompaction)}, " + + $"PendingIndexCheckpoint status={ReportNullableTaskStatus(this.pendingIndexCheckpoint)}, " + + $"PendingStoreCheckpoint status={ReportNullableTaskStatus(this.pendingStoreCheckpoint)}"; + return log; + } + this.traceHelper.FasterProgress(ConstructLogString); + } + + long GetInputQueuePositionLag() + { + return this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition); + } + bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil) { // in a test setting, let the test decide when to checkpoint or compact @@ -292,8 +337,7 @@ bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil) trigger = CheckpointTrigger.None; compactUntil = null; - long inputQueuePositionLag = - this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition); + long inputQueuePositionLag = this.GetInputQueuePositionLag(); if (this.lastCheckpointedCommitLogPosition + this.partition.Settings.MaxNumberBytesBetweenCheckpoints <= this.CommitLogPosition) { @@ -349,7 +393,114 @@ void ScheduleNextIdleCheckpointTime() var actual = (((earliest - offset - 1) / period) + 1) * period + offset; this.timeOfNextIdleCheckpoint = new DateTime(actual, DateTimeKind.Utc); } - + + void StartCheckpointOrFailOnTimeout(Func checkpointRoutine, string messageOnError) + { + var checkpointStarted = checkpointRoutine.Invoke(); + if (checkpointStarted) + { + this.timeOfFirstRefusedCheckpoint = null; + } + else + { + // track start of FASTER refusal to start checkpoint + var currentTime = DateTime.UtcNow; + this.timeOfFirstRefusedCheckpoint ??= currentTime; + + // if the refusal to checkpoint started over a minute ago, terminate partition + TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value; + if (duration > TimeSpan.FromMinutes(1)) + { + messageOnError += $". FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}. Terminating partition."; + this.partition.ErrorHandler.HandleError(nameof(StartCheckpointOrFailOnTimeout), messageOnError, e: null, terminatePartition: true, reportAsWarning: false); + } + } + } + + async ValueTask RunCheckpointingStateMachine() + { + // handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) + if (this.pendingStoreCheckpoint != null) + { + if (this.pendingStoreCheckpoint.IsCompleted == true) + { + this.traceHelper.FasterProgress("Checkpointing state machine: pendingStorecheckpoint has completed."); + (this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition) + = await this.pendingStoreCheckpoint; // observe exceptions here + + // force collection of memory used during checkpointing + GC.Collect(); + + this.traceHelper.FasterProgress("Checkpointing state machine: resetting to initial state"); + // we have reached the end of the state machine transitions + this.pendingStoreCheckpoint = null; + this.pendingCheckpointTrigger = CheckpointTrigger.None; + this.ScheduleNextIdleCheckpointTime(); + this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log); + } + } + else if (this.pendingIndexCheckpoint != null) + { + if (this.pendingIndexCheckpoint.IsCompleted == true) + { + this.traceHelper.FasterProgress("Checkpointing state machine: pendingIndexCheckpoint has completed"); + await this.pendingIndexCheckpoint; // observe exceptions here + + // the store checkpoint is next + this.StartCheckpointOrFailOnTimeout( + checkpointRoutine: () => + { + var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null); + if (token.HasValue) + { + this.traceHelper.FasterProgress("Checkpointing state machine: store checkpoint started"); + this.pendingIndexCheckpoint = null; + this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true); + this.numberEventsSinceLastCheckpoint = 0; + } + var checkpointStarted = token.HasValue; + return checkpointStarted; + + }, + messageOnError: "Could not start store checkpoint before timeout"); + } + } + else if (this.pendingCompaction != null) + { + if (this.pendingCompaction.IsCompleted == true) + { + this.traceHelper.FasterProgress("Checkpointing state machine: pendingCompaction has completed"); + await this.pendingCompaction; // observe exceptions here + + // force collection of memory used during compaction + GC.Collect(); + + // the index checkpoint is next + this.StartCheckpointOrFailOnTimeout( + checkpointRoutine: () => + { + var token = this.store.StartIndexCheckpoint(); + if (token.HasValue) + { + this.traceHelper.FasterProgress("Checkpointing state machine: index checkpoint started"); + this.pendingCompaction = null; + this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false); + } + var checkpointStarted = token.HasValue; + return checkpointStarted; + }, + messageOnError: "Could not start index checkpoint before timeout"); + } + } + else if (this.CheckpointDue(out var trigger, out long? compactUntil)) + { + this.traceHelper.FasterProgress($"Checkpointing state machine: checkpoint is due. Trigger='{trigger}'. compactUntil='{compactUntil}'"); + this.pendingCheckpointTrigger = trigger; + + this.pendingCompaction = this.RunCompactionAsync(compactUntil); + } + } + protected override async Task Process(IList batch) { try @@ -420,68 +571,15 @@ protected override async Task Process(IList batch) this.store.AdjustCacheSize(); // handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) - if (this.pendingStoreCheckpoint != null) - { - if (this.pendingStoreCheckpoint.IsCompleted == true) - { - (this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition) - = await this.pendingStoreCheckpoint; // observe exceptions here - - // force collection of memory used during checkpointing - GC.Collect(); - - // we have reached the end of the state machine transitions - this.pendingStoreCheckpoint = null; - this.pendingCheckpointTrigger = CheckpointTrigger.None; - this.ScheduleNextIdleCheckpointTime(); - this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log); - } - } - else if (this.pendingIndexCheckpoint != null) - { - if (this.pendingIndexCheckpoint.IsCompleted == true) - { - await this.pendingIndexCheckpoint; // observe exceptions here - - // the store checkpoint is next - var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null); - if (token.HasValue) - { - this.pendingIndexCheckpoint = null; - this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true); - this.numberEventsSinceLastCheckpoint = 0; - } - } - } - else if (this.pendingCompaction != null) - { - if (this.pendingCompaction.IsCompleted == true) - { - await this.pendingCompaction; // observe exceptions here - - // force collection of memory used during compaction - GC.Collect(); - - // the index checkpoint is next - var token = this.store.StartIndexCheckpoint(); - if (token.HasValue) - { - this.pendingCompaction = null; - this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false); - } - } - } - else if (this.CheckpointDue(out var trigger, out long? compactUntil)) - { - this.pendingCheckpointTrigger = trigger; - this.pendingCompaction = this.RunCompactionAsync(compactUntil); - } + await this.RunCheckpointingStateMachine(); // periodically publish the partition load information and the send/receive positions + // also report checkpointing stats if (this.lastPublished + PublishInterval < DateTime.UtcNow) { this.lastPublished = DateTime.UtcNow; await this.PublishLoadAndPositions(); + this.LogCheckpointStats(); } if (this.partition.NumberPartitions() > 1 && this.partition.Settings.ActivityScheduler == ActivitySchedulerOptions.Locavore) @@ -558,7 +656,6 @@ protected override async Task Process(IList batch) if (target.HasValue) { target = await this.store.RunCompactionAsync(target.Value); - this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler); }