Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug hangs in CI #423

Merged
merged 5 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public enum CacheEvent
PostCopyUpdate,
SingleWriterUpsert,
SingleWriterCopyToTail,
SingleWriterCopyToTailFromOutput,
SingleWriterCopyToReadCache,
SingleWriterCompaction,
PostSingleWriterUpsert,
Expand Down Expand Up @@ -424,7 +425,7 @@ internal void ValidateObjectVersion(FasterKV.Value val, TrackedObjectKey key)
if (val.Version != versionOfObject)
{
var info = this.GetObjectInfo(key);
this.Fail($"incorrect version: model=v{val.Version} actual=v{versionOfObject} obj={val.Val} cacheEvents={info.PrintCacheEvents()}");
this.Fail($"incorrect version: field=v{val.Version} object=v{versionOfObject} obj={val.Val} cacheEvents={info.PrintCacheEvents()}");
}
}

Expand Down
69 changes: 47 additions & 22 deletions src/DurableTask.Netherite/StorageLayer/Faster/CheckpointInjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,50 +32,75 @@ public CheckpointInjector(TestHooks testHooks)
this.testHooks = testHooks;
}

internal bool CheckpointDue(LogAccessor<FasterKV.Key, FasterKV.Value> log, out StoreWorker.CheckpointTrigger trigger, out long? compactUntil)
internal bool CheckpointDue(LogAccessor<FasterKV.Key, FasterKV.Value> log, out StoreWorker.CheckpointTrigger trigger, out long? compactUntil, FasterTraceHelper traceHelper)
{
if (this.handler != null)
{
try
{
traceHelper.FasterProgress("CheckpointInjector: running handler");

(trigger, compactUntil) = this.handler(log);
this.handler = null;
this.handler = null; // do not run the same handler again

traceHelper.FasterProgress($"CheckpointInjector: trigger={trigger} compactUntil={compactUntil}");

if (trigger == StoreWorker.CheckpointTrigger.None)
{
this.SequenceComplete(log);
this.SequenceComplete(log, traceHelper);
}
}
catch(Exception e)

return (trigger != StoreWorker.CheckpointTrigger.None);
}
catch (Exception e)
{
this.continuation.SetException(e);
this.continuation = null;
throw;
traceHelper.FasterProgress($"CheckpointInjector: handler faulted: {e}");

if (this.continuation.TrySetException(e))
{
traceHelper.FasterProgress("CheckpointInjector: handler continuation released with exception");
}
else
{
traceHelper.FasterProgress("CheckpointInjector: handler continuation already progressed");
}
}
}

trigger = StoreWorker.CheckpointTrigger.None;
compactUntil = null;
return false;
}

internal void SequenceComplete(LogAccessor<FasterKV.Key, FasterKV.Value> log, FasterTraceHelper traceHelper)
{
traceHelper.FasterProgress("CheckpointInjector: sequence complete");

if (this.continuation.TrySetResult(log))
{
traceHelper.FasterProgress("CheckpointInjector: handler continuation released");
}
else
{
trigger = StoreWorker.CheckpointTrigger.None;
compactUntil = null;
traceHelper.FasterProgress("CheckpointInjector: handler continuation already progressed");
}

return (trigger != StoreWorker.CheckpointTrigger.None);
}

internal void SequenceComplete(LogAccessor<FasterKV.Key, FasterKV.Value> log)
{
this.continuation?.SetResult(log);
this.continuation = null;
}

internal void CompactionComplete(IPartitionErrorHandler errorHandler)
internal void CompactionComplete(IPartitionErrorHandler errorHandler, FasterTraceHelper traceHelper)
{
if (this.InjectFaultAfterCompaction)
{
errorHandler.HandleError("CheckpointInjector", "inject failure after compaction", null, true, false);
this.InjectFaultAfterCompaction = false;
this.continuation?.SetResult(null);
this.continuation = null;
this.InjectFaultAfterCompaction = false; // do not do this again unless requested again

if (this.continuation.TrySetResult(null))
{
traceHelper.FasterProgress("CheckpointInjector: handler continuation released");
}
else
{
traceHelper.FasterProgress("CheckpointInjector: handler continuation already progressed");
}
}
}

Expand Down
30 changes: 27 additions & 3 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,7 @@ public override void Serialize(ref Value obj)
public struct Output
{
public object Val;
public int Version;

public TrackedObject Read(FasterKV store, string eventId)
{
Expand Down Expand Up @@ -2024,6 +2025,7 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.SingleReader(ref Key
dst.Val = trackedObject;
}

dst.Version = src.Version;
this.stats.Read++;
return true;
}
Expand Down Expand Up @@ -2054,12 +2056,15 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.ConcurrentReader(ref
}

dst.Val = trackedObject;
dst.Version = value.Version;
this.stats.Read++;
return true;
}

bool IFunctions<Key, Value, EffectTracker, Output, object>.SingleWriter(ref Key key, ref EffectTracker input, ref Value src, ref Value dst, ref Output output, ref UpsertInfo info, WriteReason reason)
{
bool takeValueFromOutput = false;

switch (reason)
{
case WriteReason.Upsert:
Expand All @@ -2076,7 +2081,18 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.SingleWriter(ref Key
break;

case WriteReason.CopyToTail:
this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.SingleWriterCopyToTail, src.Version, default, info.Address);
// we have empirically observed that src does sometimes not contain the correct value (is null)
// we are not sure if this is a bug in FASTER or intended behavior
// as a workaround for those situations, we are passing the source value in the output parameter, which seems to work o.k.
takeValueFromOutput = (output.Val != null);
if (takeValueFromOutput)
{
this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.SingleWriterCopyToTailFromOutput, output.Version, default, info.Address);
}
else
{
this.cacheDebugger?.Record(key.Val, CacheDebugger.CacheEvent.SingleWriterCopyToTail, src.Version, default, info.Address);
}
break;

case WriteReason.Compaction:
Expand All @@ -2088,8 +2104,16 @@ bool IFunctions<Key, Value, EffectTracker, Output, object>.SingleWriter(ref Key
this.cacheDebugger?.Fail("Invalid WriteReason in SingleWriter", key);
break;
}
dst.Val = output.Val ?? src.Val;
dst.Version = src.Version;
if (takeValueFromOutput)
{
dst.Val = output.Val;
dst.Version = output.Version;
}
else
{
dst.Val = src.Val;
dst.Version = src.Version;
}
this.cacheDebugger?.ValidateObjectVersion(dst, key.Val);
return true;
}
Expand Down
8 changes: 5 additions & 3 deletions src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil)
// in a test setting, let the test decide when to checkpoint or compact
if (this.partition.Settings.TestHooks?.CheckpointInjector != null)
{
return this.partition.Settings.TestHooks.CheckpointInjector.CheckpointDue((this.store as FasterKV).Log, out trigger, out compactUntil);
return this.partition.Settings.TestHooks.CheckpointInjector.CheckpointDue((this.store as FasterKV).Log, out trigger, out compactUntil, this.traceHelper);
}

trigger = CheckpointTrigger.None;
Expand Down Expand Up @@ -446,11 +446,13 @@ async ValueTask RunCheckpointingStateMachine()
GC.Collect();

this.traceHelper.FasterProgress("Checkpointing state machine: resetting to initial state");

this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log, this.traceHelper);

// we have reached the end of the state machine transitions
this.pendingStoreCheckpoint = null;
this.pendingCheckpointTrigger = CheckpointTrigger.None;
this.ScheduleNextIdleCheckpointTime();
this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log);
}
}
else if (this.pendingIndexCheckpoint != null)
Expand Down Expand Up @@ -672,7 +674,7 @@ protected override async Task Process(IList<PartitionEvent> batch)
if (target.HasValue)
{
target = await this.store.RunCompactionAsync(target.Value);
this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler);
this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler, this.traceHelper);
}

this.Notify();
Expand Down
2 changes: 1 addition & 1 deletion test/DurableTask.Netherite.Tests/ConcurrentTestsFaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async Task RunAsync()

var tests = new List<(string, Task)>();

foreach ((string name, Task task) in scenarios.StartAllScenarios(includeTimers: true, includeLarge: true))
foreach ((string name, Task task) in scenarios.StartAllScenarios(includeTimers: false, includeLarge: true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw timer tests failing spuriously (because of large time delays leading to unexpected results) so I did not think it was worth keeping them in there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you're seeing that timers make creating a testing spec difficult (as in it's difficult to assert what exactly will happen when). Ok, that makes sense

{
Trace.WriteLine($"TestProgress: Adding {name}");
tests.Add((name, task));
Expand Down
Loading