Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Clarify and enforce BumpEpoch protection assumptions #892

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 72 additions & 25 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public bool ThisInstanceProtected()
}

/// <summary>
/// Enter the thread into the protected code region
/// Enter the thread into the protected code region.
/// </summary>
/// <returns>Current epoch</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -218,42 +218,59 @@ public void Resume()
Acquire();
ProtectAndDrain();
}

/// <summary>
/// Increment global current epoch
/// Increment current epoch.
/// </summary>
/// <returns></returns>
long BumpCurrentEpoch()
/// <returns>new epoch of the system</returns>
public long BumpCurrentEpoch()
{
Debug.Assert(this.ThisInstanceProtected(), "BumpCurrentEpoch must be called on a protected thread");
long nextEpoch = Interlocked.Increment(ref CurrentEpoch);

var nextEpoch = Interlocked.Increment(ref CurrentEpoch);

if (drainCount > 0)
Drain(nextEpoch);

{
// track whether we acquired protection when calling from unprotected thread, so we restore the thread to
// its pre-call protection status after we are done
if (!ThisInstanceProtected())
{
Resume();
Release();
}
else
{
ProtectAndDrain();
}
}

return nextEpoch;
}

/// <summary>
/// Increment current epoch and associate trigger action
/// with the prior epoch
/// Increment current epoch and associate trigger action with the prior epoch. The trigger action will execute
/// on a protected thread only after the prior epoch is safe (i.e., after all active threads have advanced past it)
/// </summary>
/// <param name="onDrain">Trigger action</param>
/// <returns></returns>
public void BumpCurrentEpoch(Action onDrain)
/// <returns>new epoch of the system</returns>
public long BumpCurrentEpoch(Action onDrain)
{
long PriorEpoch = BumpCurrentEpoch() - 1;

int i = 0;
while (true)
Debug.Assert(onDrain != null);

var nextEpoch = Interlocked.Increment(ref CurrentEpoch);
var priorEpoch = nextEpoch - 1;
// track whether we acquired protection when calling from unprotected thread, so we restore the thread to
// its pre-call protection status after we are done
var acquiredProtection = false;

for (int i = 0;;)
{
if (drainList[i].epoch == long.MaxValue)
{
// This was an empty slot. If it still is, assign this action/epoch to the slot.
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == long.MaxValue)
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) ==
long.MaxValue)
{
drainList[i].action = onDrain;
drainList[i].epoch = PriorEpoch;
drainList[i].epoch = priorEpoch;
Interlocked.Increment(ref drainCount);
break;
}
Expand All @@ -264,12 +281,20 @@ public void BumpCurrentEpoch(Action onDrain)

if (triggerEpoch <= SafeToReclaimEpoch)
{
// Protection is required whenever we may execute a trigger action
if (!acquiredProtection && !ThisInstanceProtected())
{
acquiredProtection = true;
Resume();
}

// This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot.
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == triggerEpoch)
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) ==
triggerEpoch)
{
var triggerAction = drainList[i].action;
drainList[i].action = onDrain;
drainList[i].epoch = PriorEpoch;
drainList[i].epoch = priorEpoch;
triggerAction();
break;
}
Expand All @@ -279,14 +304,36 @@ public void BumpCurrentEpoch(Action onDrain)
if (++i == kDrainListSize)
{
// We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots.
ProtectAndDrain();
if (!acquiredProtection && !ThisInstanceProtected())
{
acquiredProtection = true;
Resume();
}
else
{
ProtectAndDrain();
}
i = 0;
Thread.Yield();
}
}

// Now ProtectAndDrain, which may execute the action we just added.
ProtectAndDrain();

if (!acquiredProtection && !ThisInstanceProtected())
{
acquiredProtection = true;
Resume();
}
else
{
// Now ProtectAndDrain, which may execute the action we just added.
ProtectAndDrain();
}

if (acquiredProtection)
Release();

return nextEpoch;
}

/// <summary>
Expand Down
70 changes: 32 additions & 38 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public sealed class FasterLog : IDisposable
readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests;
readonly List<FasterLogRecoveryInfo> coveredCommits = new();
long commitNum, commitCoveredAddress;
private bool logClosing = false;

readonly LogCommitPolicy commitPolicy;

Expand Down Expand Up @@ -74,6 +75,7 @@ public sealed class FasterLog : IDisposable
/// </summary>
public long SafeTailAddress;


/// <summary>
/// Dictionary of recovered iterators and their committed until addresses
/// </summary>
Expand Down Expand Up @@ -231,6 +233,7 @@ public void Reset()
CommittedUntilAddress = beginAddress;
CommittedBeginAddress = beginAddress;
SafeTailAddress = beginAddress;
logClosing = false;

commitNum = 0;
this.beginAddress = beginAddress;
Expand Down Expand Up @@ -268,6 +271,8 @@ public void Initialize(long beginAddress, long committedUntilAddress, long lastC

commitNum = lastCommitNum;
this.beginAddress = beginAddress;
logClosing = false;


if (lastCommitNum > 0) logCommitManager.OnRecovery(lastCommitNum);
}
Expand Down Expand Up @@ -322,24 +327,15 @@ public void Dispose()
/// <param name="spinWait"> whether to spin until log completion becomes committed </param>
public void CompleteLog(bool spinWait = false)
{
// Ensure progress even if there is no thread in epoch table. Also, BumpCurrentEpoch must be done on a protected thread.
bool isProtected = epoch.ThisInstanceProtected();
if (!isProtected)
epoch.Resume();
try
{
// Ensure all currently started entries will enqueue before we declare log closed
epoch.BumpCurrentEpoch(() =>
{
CommitInternal(out _, out _, false, Array.Empty<byte>(), long.MaxValue, null);
});
}
finally
// Use this to signal to future enqueue calls that they should stop as we are closing the log
logClosing = true;
// use a bump to ensure that any concurrent enqueues that have marched passed the check will finish before
// we close the log
epoch.BumpCurrentEpoch(() =>
{
if (!isProtected)
epoch.Suspend();
}

CommitInternal(out _, out _, false, Array.Empty<byte>(), long.MaxValue, null);
});

if (spinWait)
WaitForCommit(TailAddress, long.MaxValue);
}
Expand Down Expand Up @@ -480,6 +476,7 @@ public long Enqueue<T>(IEnumerable<T> entries) where T : ILogEnqueueEntry
#endregion

#region TryEnqueue

/// <summary>
/// Try to enqueue entry to log (in memory). If it returns true, we are
/// done. If it returns false, we need to retry.
Expand All @@ -497,7 +494,7 @@ public unsafe bool TryEnqueue<T>(T entry, out long logicalAddress) where T : ILo

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -538,7 +535,7 @@ public unsafe bool TryEnqueue<T>(IEnumerable<T> entries, out long logicalAddress
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);

Expand Down Expand Up @@ -578,8 +575,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -619,8 +615,7 @@ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan<byte> entryBytes, bool noCom
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -654,8 +649,7 @@ public unsafe bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -689,6 +683,7 @@ public unsafe void Enqueue<THeader>(THeader userHeader, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand All @@ -715,6 +710,7 @@ public unsafe void Enqueue<THeader>(THeader userHeader, ref SpanByte item, out l
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand Down Expand Up @@ -743,6 +739,7 @@ public unsafe void Enqueue<THeader>(THeader userHeader, ref SpanByte item1, ref
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand Down Expand Up @@ -772,8 +769,9 @@ public unsafe void Enqueue<THeader>(THeader userHeader, ref SpanByte item1, ref
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress);
Expand Down Expand Up @@ -801,6 +799,7 @@ public unsafe void Enqueue(byte userHeader, ref SpanByte item, out long logicalA
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand Down Expand Up @@ -862,6 +861,7 @@ public unsafe bool TryEnqueue<THeader>(THeader userHeader, ref SpanByte item1, r
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -901,6 +901,7 @@ public unsafe bool TryEnqueue<THeader>(THeader userHeader, ref SpanByte item1, r
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -938,6 +939,7 @@ public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logic
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -2533,7 +2535,7 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);

Expand Down Expand Up @@ -2779,16 +2781,8 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
}

// Otherwise, move to set read-only tail and flush
try
{
epoch.Resume();
if (!allocator.ShiftReadOnlyToTail(out _, out _))
CommitMetadataOnly(ref info);
}
finally
{
epoch.Suspend();
}
if (!allocator.ShiftReadOnlyToTail(out _, out _))
CommitMetadataOnly(ref info);
return true;
}

Expand Down
Loading