Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] V2 Custom Commit Strategy #599

Merged
merged 8 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 76 additions & 55 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace FASTER.core
/// <summary>
/// FASTER log
/// </summary>
public sealed class FasterLog : IDisposable
public sealed partial class FasterLog : IDisposable
{
private readonly BlittableAllocator<Empty, byte> allocator;
private readonly LightEpoch epoch;
Expand All @@ -36,7 +36,10 @@ public sealed class FasterLog : IDisposable

// Offsets for all currently unprocessed commit records
private readonly Queue<(long, FasterLogRecoveryInfo)> ongoingCommitRequests;
private long commitNum;
private readonly List<FasterLogRecoveryInfo> coveredCommits = new List<FasterLogRecoveryInfo>();
private long commitNum, commitCoveredAddress;

private IFasterLogCommitPolicy commitPolicy;

/// <summary>
/// Beginning address of log
Expand All @@ -52,10 +55,6 @@ public sealed class FasterLog : IDisposable
/// </summary>
public long TailAddress => allocator.GetTailAddress();

// Used to track the last commit record and commits that have been issued, to stop commits from committing
// without any user records
private long commitCoveredAddress;

/// <summary>
/// Log flushed until address
/// </summary>
Expand Down Expand Up @@ -147,6 +146,7 @@ public static async ValueTask<FasterLog> CreateAsync(FasterLogSettings logSettin
var (it, cookie) = await fasterLog.RestoreLatestAsync(false, cancellationToken).ConfigureAwait(false);
fasterLog.RecoveredIterators = it;
fasterLog.RecoveredCookie = cookie;

return fasterLog;
}

Expand Down Expand Up @@ -189,6 +189,8 @@ private FasterLog(FasterLogSettings logSettings, bool oldCommitManager)
fastCommitMode = logSettings.FastCommitMode;

ongoingCommitRequests = new Queue<(long, FasterLogRecoveryInfo)>();
commitPolicy = logSettings.CommitPolicy ?? new DefaultCommitPolicy();
commitPolicy.OnAttached(this);
}

/// <summary>
Expand Down Expand Up @@ -486,11 +488,22 @@ public async ValueTask WaitForCommitAsync(long untilAddress = 0, CancellationTok

public void Commit(bool spinWait = false)
{
CommitInternal(out _, out _, spinWait, true, null, -1);
// Take a lower-bound of the content of this commit in case our request is filtered but we need to spin
var tail = TailAddress;
var lastCommit = commitNum;

var success = CommitInternal(out var actualTail, out var actualCommitNum, true, null, -1, null);
if (!spinWait) return;
if (success)
SpinWaitForCommit(actualTail, actualCommitNum);
else
// Still need to imitate semantics to spin until all previous enqueues are committed when commit has been filtered
SpinWaitForCommit(tail, lastCommit);
}

/// <summary>
/// Issue a strong commit request for log (until tail) with the given commitNum
/// Issue a strong commit request for log (until tail) with the given commitNum. Strong commits bypass commit policies
/// and will never be compressed with other concurrent commit requests.
/// </summary>
/// <param name="commitTail">The tail committed by this call</param>
/// <param name="actualCommitNum">
Expand All @@ -507,9 +520,13 @@ public void Commit(bool spinWait = false)
/// a non -1 value, commit is guaranteed to have the supplied identifier if commit call is successful
/// </param>
/// <returns>Whether commit is successful </returns>
public bool CommitStrongly(out long commitTail, out long actualCommitNum, bool spinWait = false, byte[] cookie = null, long proposedCommitNum = -1)
public bool CommitStrongly(out long commitTail, out long actualCommitNum, bool spinWait = false, byte[] cookie = null, long proposedCommitNum = -1, Action callback = null)
{
return CommitInternal(out commitTail, out actualCommitNum, spinWait, false, cookie, proposedCommitNum);
if (!CommitInternal(out commitTail, out actualCommitNum, false, cookie, proposedCommitNum, callback))
return false;
if (spinWait)
SpinWaitForCommit(commitTail, actualCommitNum);
return true;
}

/// <summary>
Expand All @@ -522,7 +539,7 @@ public async ValueTask CommitAsync(CancellationToken token = default)
{
token.ThrowIfCancellationRequested();
var task = CommitTask;
if (!CommitInternal(out var tailAddress, out var actualCommitNum, false, true, null, -1))
if (!CommitInternal(out var tailAddress, out var actualCommitNum, false, null, -1, null))
return;

while (CommittedUntilAddress < tailAddress || persistedCommitNum < actualCommitNum)
Expand All @@ -543,7 +560,7 @@ public async ValueTask<Task<LinkedCommitInfo>> CommitAsync(Task<LinkedCommitInfo
token.ThrowIfCancellationRequested();
if (prevCommitTask == null) prevCommitTask = CommitTask;

if (!CommitInternal(out var tailAddress, out var actualCommitNum, false, true, null, -1))
if (!CommitInternal(out var tailAddress, out var actualCommitNum, true, null, -1, null))
return prevCommitTask;

while (CommittedUntilAddress < tailAddress || persistedCommitNum < actualCommitNum)
Expand Down Expand Up @@ -576,7 +593,7 @@ public async ValueTask<Task<LinkedCommitInfo>> CommitAsync(Task<LinkedCommitInfo
{
token.ThrowIfCancellationRequested();
var task = CommitTask;
if (!CommitInternal(out var commitTail, out var actualCommitNum, false, false, cookie, proposedCommitNum))
if (!CommitInternal(out var commitTail, out var actualCommitNum, false, cookie, proposedCommitNum, null))
return (false, commitTail, actualCommitNum);

while (CommittedUntilAddress < commitTail || persistedCommitNum < actualCommitNum)
Expand Down Expand Up @@ -1032,7 +1049,7 @@ private bool ShouldCommmitMetadata(ref FasterLogRecoveryInfo info)
return beginAddress > CommittedBeginAddress || IteratorsChanged(ref info) || info.Cookie != null;
}

private void CommitMetadataOnly(ref FasterLogRecoveryInfo info, bool spinWait)
private void CommitMetadataOnly(ref FasterLogRecoveryInfo info)
{
var fromAddress = CommittedUntilAddress > info.BeginAddress ? CommittedUntilAddress : info.BeginAddress;
var untilAddress = FlushedUntilAddress > info.BeginAddress ? FlushedUntilAddress : info.BeginAddress;
Expand All @@ -1043,12 +1060,6 @@ private void CommitMetadataOnly(ref FasterLogRecoveryInfo info, bool spinWait)
UntilAddress = untilAddress,
ErrorCode = 0,
});

if (spinWait)
{
while (info.CommitNum < persistedCommitNum)
Thread.Yield();
}
}

private void UpdateCommittedState(FasterLogRecoveryInfo recoveryInfo)
Expand Down Expand Up @@ -1076,8 +1087,8 @@ private void SerialCommitCallbackWorker(CommitInfo commitInfo)
{
if (commitInfo.ErrorCode == 0)
{
var coveredCommits = new List<FasterLogRecoveryInfo>();
// Check for the commit records included in this flush
coveredCommits.Clear();
lock (ongoingCommitRequests)
{
while (ongoingCommitRequests.Count != 0)
Expand All @@ -1088,20 +1099,32 @@ private void SerialCommitCallbackWorker(CommitInfo commitInfo)
ongoingCommitRequests.Dequeue();
}
}

// Nothing was committed --- this was probably au auto-flush. Return now without touching any
// commit task tracking.
if (coveredCommits.Count == 0) return;

var latestCommit = coveredCommits[coveredCommits.Count - 1];
if (fastCommitMode)
// In fast commit mode, can safely set committed state to the latest flushed
{
// In fast commit mode, can safely set committed state to the latest flushed and invoke callbacks early
UpdateCommittedState(latestCommit);
foreach (var recoveryInfo in coveredCommits)
{
recoveryInfo.Callback?.Invoke();
commitPolicy.OnCommitFinished(recoveryInfo);
}
}

foreach (var recoveryInfo in coveredCommits)
{
// Only write out commit metadata if user cares about this as a distinct recoverable point
if (!recoveryInfo.FastForwardAllowed) WriteCommitMetadata(recoveryInfo);
if (!fastCommitMode)
{
recoveryInfo.Callback?.Invoke();
commitPolicy.OnCommitFinished(recoveryInfo);
}
}

// We fast-forwarded commits earlier, so write it out if not covered by another commit
Expand Down Expand Up @@ -1626,36 +1649,43 @@ private int GetRecordLengthAndFree(SectorAlignedMemory record)
return length;
}

private bool CommitInternal(out long commitTail, out long actualCommitNum, bool spinWait, bool allowFastForward, byte[] cookie, long proposedCommitNum)
private void SpinWaitForCommit(long address, long commitNum)
{
while (commitNum > persistedCommitNum || address > CommittedUntilAddress)
Thread.Yield();
}

private bool CommitInternal(out long commitTail, out long actualCommitNum, bool fastForwardAllowed, byte[] cookie, long proposedCommitNum, Action callback)
{
commitTail = actualCommitNum = 0;

if (readOnlyMode)
throw new FasterException("Cannot commit in read-only mode");

if (allowFastForward && (cookie != null || proposedCommitNum != -1))
if (fastForwardAllowed && (cookie != null || proposedCommitNum != -1 || callback != null))
throw new FasterException(
"Fast forwarding a commit is only allowed when no cookie and not commit num is specified");

"Fast forwarding a commit is only allowed when no cookie, commit num, or callback is specified");
var info = new FasterLogRecoveryInfo
{
FastForwardAllowed = allowFastForward
FastForwardAllowed = fastForwardAllowed,
Cookie = cookie,
Callback = callback
};
info.SnapshotIterators(PersistedIterators);
var metadataChanged = ShouldCommmitMetadata(ref info);
// Only apply commit policy if not a strong commit
if (!fastForwardAllowed && !commitPolicy.AdmitCommit(TailAddress, metadataChanged))
return false;

// This critical section serializes commit record creation / commit content generation and ensures that the
// long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the
// commit code path
lock (ongoingCommitRequests)
{
// Compute regular information about the commit
info.Cookie = cookie;
info.SnapshotIterators(PersistedIterators);

if (commitCoveredAddress == TailAddress && !ShouldCommmitMetadata(ref info))
if (commitCoveredAddress == TailAddress && metadataChanged)
// Nothing to commit if no metadata update and no new entries
return false;


// Make sure we will not be allowed to back out of a commit of AdmitCommit returns true, as the strategy
// may need to update internal logic for every true response. We might waste some commit nums if commit
// strategy filters out a lot of commits, but that's fine.
if (proposedCommitNum == -1)
info.CommitNum = actualCommitNum = ++commitNum;
else if (proposedCommitNum > commitNum)
Expand All @@ -1664,6 +1694,9 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
// Invalid commit num
return false;

// This critical section serializes commit record creation / commit content generation and ensures that the
// long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the
// commit code path
if (fastCommitMode)
{
// Ok to retry in critical section, any concurrently invoked commit would block, but cannot progress
Expand All @@ -1679,40 +1712,28 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
info.BeginAddress = BeginAddress;
info.UntilAddress = commitTail = TailAddress;
}
Utility.MonotonicUpdate(ref commitCoveredAddress, commitTail, out _);

commitPolicy.OnCommitCreated(info);
// Enqueue the commit record's content and offset into the queue so it can be picked up by the next flush
// At this point, we expect the commit record to be flushed out as a distinct recovery point
ongoingCommitRequests.Enqueue((commitTail, info));
}


// As an optimization, if a concurrent flush has already advanced FlushedUntilAddress
// past this commit, we can manually trigger a commit callback for safety, and return.
if (commitTail <= FlushedUntilAddress)
{
CommitMetadataOnly(ref info, spinWait);
CommitMetadataOnly(ref info);
return true;
}

// Otherwise, move to set read-only tail and flush
try
{
epoch.Resume();
if (allocator.ShiftReadOnlyToTail(out _, out _))
{
if (spinWait)
{
while (CommittedUntilAddress < commitTail)
{
epoch.ProtectAndDrain();
Thread.Yield();
}
}
}
else
{
CommitMetadataOnly(ref info, spinWait);
}
if (!allocator.ShiftReadOnlyToTail(out _, out _))
CommitMetadataOnly(ref info);
}
finally
{
Expand Down
Loading