Skip to content

Commit

Permalink
[C#] Clarify and enforce BumpEpoch protection assumptions (#892)
Browse files Browse the repository at this point in the history
* Revise BumpEpoch to accommodate calling from both protected and unprotected contexts. Revise FasterLog completion logic to avoid epoch double-protect.

* API change for BCE to add long return value and a default parameter variant

* add else for drain count check

* Split BCE into two variants
  • Loading branch information
tli2 authored Mar 13, 2024
1 parent a093e30 commit 0f5781b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 63 deletions.
97 changes: 72 additions & 25 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public bool ThisInstanceProtected()
}

/// <summary>
/// Enter the thread into the protected code region
/// Enter the thread into the protected code region.
/// </summary>
/// <returns>Current epoch</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -218,42 +218,59 @@ public void Resume()
Acquire();
ProtectAndDrain();
}

/// <summary>
/// Increment global current epoch
/// Increment current epoch.
/// </summary>
/// <returns></returns>
long BumpCurrentEpoch()
/// <returns>new epoch of the system</returns>
public long BumpCurrentEpoch()
{
Debug.Assert(this.ThisInstanceProtected(), "BumpCurrentEpoch must be called on a protected thread");
long nextEpoch = Interlocked.Increment(ref CurrentEpoch);

var nextEpoch = Interlocked.Increment(ref CurrentEpoch);

if (drainCount > 0)
Drain(nextEpoch);

{
// track whether we acquired protection when calling from unprotected thread, so we restore the thread to
// its pre-call protection status after we are done
if (!ThisInstanceProtected())
{
Resume();
Release();
}
else
{
ProtectAndDrain();
}
}

return nextEpoch;
}

/// <summary>
/// Increment current epoch and associate trigger action
/// with the prior epoch
/// Increment current epoch and associate trigger action with the prior epoch. The trigger action will execute
/// on a protected thread only after the prior epoch is safe (i.e., after all active threads have advanced past it)
/// </summary>
/// <param name="onDrain">Trigger action</param>
/// <returns></returns>
public void BumpCurrentEpoch(Action onDrain)
/// <returns>new epoch of the system</returns>
public long BumpCurrentEpoch(Action onDrain)
{
long PriorEpoch = BumpCurrentEpoch() - 1;

int i = 0;
while (true)
Debug.Assert(onDrain != null);

var nextEpoch = Interlocked.Increment(ref CurrentEpoch);
var priorEpoch = nextEpoch - 1;
// track whether we acquired protection when calling from unprotected thread, so we restore the thread to
// its pre-call protection status after we are done
var acquiredProtection = false;

for (int i = 0;;)
{
if (drainList[i].epoch == long.MaxValue)
{
// This was an empty slot. If it still is, assign this action/epoch to the slot.
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) == long.MaxValue)
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, long.MaxValue) ==
long.MaxValue)
{
drainList[i].action = onDrain;
drainList[i].epoch = PriorEpoch;
drainList[i].epoch = priorEpoch;
Interlocked.Increment(ref drainCount);
break;
}
Expand All @@ -264,12 +281,20 @@ public void BumpCurrentEpoch(Action onDrain)

if (triggerEpoch <= SafeToReclaimEpoch)
{
// Protection is required whenever we may execute a trigger action
if (!acquiredProtection && !ThisInstanceProtected())
{
acquiredProtection = true;
Resume();
}

// This was a slot with an epoch that was safe to reclaim. If it still is, execute its trigger, then assign this action/epoch to the slot.
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) == triggerEpoch)
if (Interlocked.CompareExchange(ref drainList[i].epoch, long.MaxValue - 1, triggerEpoch) ==
triggerEpoch)
{
var triggerAction = drainList[i].action;
drainList[i].action = onDrain;
drainList[i].epoch = PriorEpoch;
drainList[i].epoch = priorEpoch;
triggerAction();
break;
}
Expand All @@ -279,14 +304,36 @@ public void BumpCurrentEpoch(Action onDrain)
if (++i == kDrainListSize)
{
// We are at the end of the drain list and found no empty or reclaimable slot. ProtectAndDrain, which should clear one or more slots.
ProtectAndDrain();
if (!acquiredProtection && !ThisInstanceProtected())
{
acquiredProtection = true;
Resume();
}
else
{
ProtectAndDrain();
}
i = 0;
Thread.Yield();
}
}

// Now ProtectAndDrain, which may execute the action we just added.
ProtectAndDrain();

if (!acquiredProtection && !ThisInstanceProtected())
{
acquiredProtection = true;
Resume();
}
else
{
// Now ProtectAndDrain, which may execute the action we just added.
ProtectAndDrain();
}

if (acquiredProtection)
Release();

return nextEpoch;
}

/// <summary>
Expand Down
70 changes: 32 additions & 38 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public sealed class FasterLog : IDisposable
readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests;
readonly List<FasterLogRecoveryInfo> coveredCommits = new();
long commitNum, commitCoveredAddress;
private bool logClosing = false;

readonly LogCommitPolicy commitPolicy;

Expand Down Expand Up @@ -74,6 +75,7 @@ public sealed class FasterLog : IDisposable
/// </summary>
public long SafeTailAddress;


/// <summary>
/// Dictionary of recovered iterators and their committed until addresses
/// </summary>
Expand Down Expand Up @@ -231,6 +233,7 @@ public void Reset()
CommittedUntilAddress = beginAddress;
CommittedBeginAddress = beginAddress;
SafeTailAddress = beginAddress;
logClosing = false;

commitNum = 0;
this.beginAddress = beginAddress;
Expand Down Expand Up @@ -268,6 +271,8 @@ public void Initialize(long beginAddress, long committedUntilAddress, long lastC

commitNum = lastCommitNum;
this.beginAddress = beginAddress;
logClosing = false;


if (lastCommitNum > 0) logCommitManager.OnRecovery(lastCommitNum);
}
Expand Down Expand Up @@ -322,24 +327,15 @@ public void Dispose()
/// <param name="spinWait"> whether to spin until log completion becomes committed </param>
public void CompleteLog(bool spinWait = false)
{
// Ensure progress even if there is no thread in epoch table. Also, BumpCurrentEpoch must be done on a protected thread.
bool isProtected = epoch.ThisInstanceProtected();
if (!isProtected)
epoch.Resume();
try
{
// Ensure all currently started entries will enqueue before we declare log closed
epoch.BumpCurrentEpoch(() =>
{
CommitInternal(out _, out _, false, Array.Empty<byte>(), long.MaxValue, null);
});
}
finally
// Use this to signal to future enqueue calls that they should stop as we are closing the log
logClosing = true;
// use a bump to ensure that any concurrent enqueues that have marched passed the check will finish before
// we close the log
epoch.BumpCurrentEpoch(() =>
{
if (!isProtected)
epoch.Suspend();
}

CommitInternal(out _, out _, false, Array.Empty<byte>(), long.MaxValue, null);
});

if (spinWait)
WaitForCommit(TailAddress, long.MaxValue);
}
Expand Down Expand Up @@ -480,6 +476,7 @@ public long Enqueue<T>(IEnumerable<T> entries) where T : ILogEnqueueEntry
#endregion

#region TryEnqueue

/// <summary>
/// Try to enqueue entry to log (in memory). If it returns true, we are
/// done. If it returns false, we need to retry.
Expand All @@ -497,7 +494,7 @@ public unsafe bool TryEnqueue<T>(T entry, out long logicalAddress) where T : ILo

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -538,7 +535,7 @@ public unsafe bool TryEnqueue<T>(IEnumerable<T> entries, out long logicalAddress
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);

Expand Down Expand Up @@ -578,8 +575,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -619,8 +615,7 @@ public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan<byte> entryBytes, bool noCom
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -654,8 +649,7 @@ public unsafe bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -689,6 +683,7 @@ public unsafe void Enqueue<THeader>(THeader userHeader, out long logicalAddress)
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand All @@ -715,6 +710,7 @@ public unsafe void Enqueue<THeader>(THeader userHeader, ref SpanByte item, out l
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand Down Expand Up @@ -743,6 +739,7 @@ public unsafe void Enqueue<THeader>(THeader userHeader, ref SpanByte item1, ref
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand Down Expand Up @@ -772,8 +769,9 @@ public unsafe void Enqueue<THeader>(THeader userHeader, ref SpanByte item1, ref
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress);
Expand Down Expand Up @@ -801,6 +799,7 @@ public unsafe void Enqueue(byte userHeader, ref SpanByte item, out long logicalA
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = AllocateBlock(allocatedLength);

Expand Down Expand Up @@ -862,6 +861,7 @@ public unsafe bool TryEnqueue<THeader>(THeader userHeader, ref SpanByte item1, r
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -901,6 +901,7 @@ public unsafe bool TryEnqueue<THeader>(THeader userHeader, ref SpanByte item1, r
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -938,6 +939,7 @@ public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logic
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
Expand Down Expand Up @@ -2533,7 +2535,7 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (logClosing || commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);

Expand Down Expand Up @@ -2779,16 +2781,8 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
}

// Otherwise, move to set read-only tail and flush
try
{
epoch.Resume();
if (!allocator.ShiftReadOnlyToTail(out _, out _))
CommitMetadataOnly(ref info);
}
finally
{
epoch.Suspend();
}
if (!allocator.ShiftReadOnlyToTail(out _, out _))
CommitMetadataOnly(ref info);
return true;
}

Expand Down

0 comments on commit 0f5781b

Please sign in to comment.