diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/CacheDebugger.cs b/src/DurableTask.Netherite/StorageLayer/Faster/CacheDebugger.cs index 6f39d6fc..b76e1179 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/CacheDebugger.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/CacheDebugger.cs @@ -49,6 +49,7 @@ public enum CacheEvent PostCopyUpdate, SingleWriterUpsert, SingleWriterCopyToTail, + SingleWriterCopyToTailFromOutput, SingleWriterCopyToReadCache, SingleWriterCompaction, PostSingleWriterUpsert, @@ -424,7 +425,7 @@ internal void ValidateObjectVersion(FasterKV.Value val, TrackedObjectKey key) if (val.Version != versionOfObject) { var info = this.GetObjectInfo(key); - this.Fail($"incorrect version: model=v{val.Version} actual=v{versionOfObject} obj={val.Val} cacheEvents={info.PrintCacheEvents()}"); + this.Fail($"incorrect version: field=v{val.Version} object=v{versionOfObject} obj={val.Val} cacheEvents={info.PrintCacheEvents()}"); } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs b/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs index c441df7a..87918e19 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs @@ -32,50 +32,75 @@ public CheckpointInjector(TestHooks testHooks) this.testHooks = testHooks; } - internal bool CheckpointDue(LogAccessor log, out StoreWorker.CheckpointTrigger trigger, out long? compactUntil) + internal bool CheckpointDue(LogAccessor log, out StoreWorker.CheckpointTrigger trigger, out long? compactUntil, FasterTraceHelper traceHelper) { if (this.handler != null) { try { + traceHelper.FasterProgress("CheckpointInjector: running handler"); + (trigger, compactUntil) = this.handler(log); - this.handler = null; + this.handler = null; // do not run the same handler again + + traceHelper.FasterProgress($"CheckpointInjector: trigger={trigger} compactUntil={compactUntil}"); if (trigger == StoreWorker.CheckpointTrigger.None) { - this.SequenceComplete(log); + this.SequenceComplete(log, traceHelper); } - } - catch(Exception e) + + return (trigger != StoreWorker.CheckpointTrigger.None); + } + catch (Exception e) { - this.continuation.SetException(e); - this.continuation = null; - throw; + traceHelper.FasterProgress($"CheckpointInjector: handler faulted: {e}"); + + if (this.continuation.TrySetException(e)) + { + traceHelper.FasterProgress("CheckpointInjector: handler continuation released with exception"); + } + else + { + traceHelper.FasterProgress("CheckpointInjector: handler continuation already progressed"); + } } } + + trigger = StoreWorker.CheckpointTrigger.None; + compactUntil = null; + return false; + } + + internal void SequenceComplete(LogAccessor log, FasterTraceHelper traceHelper) + { + traceHelper.FasterProgress("CheckpointInjector: sequence complete"); + + if (this.continuation.TrySetResult(log)) + { + traceHelper.FasterProgress("CheckpointInjector: handler continuation released"); + } else { - trigger = StoreWorker.CheckpointTrigger.None; - compactUntil = null; + traceHelper.FasterProgress("CheckpointInjector: handler continuation already progressed"); } - - return (trigger != StoreWorker.CheckpointTrigger.None); - } - - internal void SequenceComplete(LogAccessor log) - { - this.continuation?.SetResult(log); - this.continuation = null; } - internal void CompactionComplete(IPartitionErrorHandler errorHandler) + internal void CompactionComplete(IPartitionErrorHandler errorHandler, FasterTraceHelper traceHelper) { if (this.InjectFaultAfterCompaction) { errorHandler.HandleError("CheckpointInjector", "inject failure after compaction", null, true, false); - this.InjectFaultAfterCompaction = false; - this.continuation?.SetResult(null); - this.continuation = null; + this.InjectFaultAfterCompaction = false; // do not do this again unless requested again + + if (this.continuation.TrySetResult(null)) + { + traceHelper.FasterProgress("CheckpointInjector: handler continuation released"); + } + else + { + traceHelper.FasterProgress("CheckpointInjector: handler continuation already progressed"); + } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 1ae1b218..a7b323c0 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -1828,6 +1828,7 @@ public override void Serialize(ref Value obj) public struct Output { public object Val; + public int Version; public TrackedObject Read(FasterKV store, string eventId) { @@ -2024,6 +2025,7 @@ bool IFunctions.SingleReader(ref Key dst.Val = trackedObject; } + dst.Version = src.Version; this.stats.Read++; return true; } @@ -2054,12 +2056,15 @@ bool IFunctions.ConcurrentReader(ref } dst.Val = trackedObject; + dst.Version = value.Version; this.stats.Read++; return true; } bool IFunctions.SingleWriter(ref Key key, ref EffectTracker input, ref Value src, ref Value dst, ref Output output, ref UpsertInfo info, WriteReason reason) { + bool takeValueFromOutput = false; + switch (reason) { case WriteReason.Upsert: @@ -2076,7 +2081,18 @@ bool IFunctions.SingleWriter(ref Key break; case WriteReason.CopyToTail: - this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.SingleWriterCopyToTail, src.Version, default, info.Address); + // we have empirically observed that src does sometimes not contain the correct value (is null) + // we are not sure if this is a bug in FASTER or intended behavior + // as a workaround for those situations, we are passing the source value in the output parameter, which seems to work o.k. + takeValueFromOutput = (output.Val != null); + if (takeValueFromOutput) + { + this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.SingleWriterCopyToTailFromOutput, output.Version, default, info.Address); + } + else + { + this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.SingleWriterCopyToTail, src.Version, default, info.Address); + } break; case WriteReason.Compaction: @@ -2088,8 +2104,16 @@ bool IFunctions.SingleWriter(ref Key this.cacheDebugger?.Fail("Invalid WriteReason in SingleWriter", key); break; } - dst.Val = output.Val ?? src.Val; - dst.Version = src.Version; + if (takeValueFromOutput) + { + dst.Val = output.Val; + dst.Version = output.Version; + } + else + { + dst.Val = src.Val; + dst.Version = src.Version; + } this.cacheDebugger?.ValidateObjectVersion(dst, key.Val); return true; } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs index 99bb2f71..4b0238b9 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs @@ -345,7 +345,7 @@ bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil) // in a test setting, let the test decide when to checkpoint or compact if (this.partition.Settings.TestHooks?.CheckpointInjector != null) { - return this.partition.Settings.TestHooks.CheckpointInjector.CheckpointDue((this.store as FasterKV).Log, out trigger, out compactUntil); + return this.partition.Settings.TestHooks.CheckpointInjector.CheckpointDue((this.store as FasterKV).Log, out trigger, out compactUntil, this.traceHelper); } trigger = CheckpointTrigger.None; @@ -446,11 +446,13 @@ async ValueTask RunCheckpointingStateMachine() GC.Collect(); this.traceHelper.FasterProgress("Checkpointing state machine: resetting to initial state"); + + this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log, this.traceHelper); + // we have reached the end of the state machine transitions this.pendingStoreCheckpoint = null; this.pendingCheckpointTrigger = CheckpointTrigger.None; this.ScheduleNextIdleCheckpointTime(); - this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log); } } else if (this.pendingIndexCheckpoint != null) @@ -672,7 +674,7 @@ protected override async Task Process(IList batch) if (target.HasValue) { target = await this.store.RunCompactionAsync(target.Value); - this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler); + this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler, this.traceHelper); } this.Notify(); diff --git a/test/DurableTask.Netherite.Tests/ConcurrentTestsFaster.cs b/test/DurableTask.Netherite.Tests/ConcurrentTestsFaster.cs index 7ed1c752..d5322fdf 100644 --- a/test/DurableTask.Netherite.Tests/ConcurrentTestsFaster.cs +++ b/test/DurableTask.Netherite.Tests/ConcurrentTestsFaster.cs @@ -109,7 +109,7 @@ async Task RunAsync() var tests = new List<(string, Task)>(); - foreach ((string name, Task task) in scenarios.StartAllScenarios(includeTimers: true, includeLarge: true)) + foreach ((string name, Task task) in scenarios.StartAllScenarios(includeTimers: false, includeLarge: true)) { Trace.WriteLine($"TestProgress: Adding {name}"); tests.Add((name, task));