Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] FasterLog v2 Add ability to explicitly terminate a log #620

Merged
merged 7 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cs/samples/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ static void ScanThread()
{
while (!iter.GetNext(out result, out _, out _))
{
// For finite end address, check if iteration ended
// if (currentAddress >= endAddress) return;
if (iter.Ended) return;
iter.WaitAsync().AsTask().GetAwaiter().GetResult();
}

Expand Down
56 changes: 51 additions & 5 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public sealed class FasterLog : IDisposable
readonly int headerSize;
readonly LogChecksumType logChecksum;
readonly WorkQueueLIFO<CommitInfo> commitQueue;

internal readonly bool readOnlyMode;
internal readonly bool fastCommitMode;
internal readonly bool tolerateDeviceFailure;
Expand Down Expand Up @@ -132,7 +133,7 @@ public FasterLog(FasterLogSettings logSettings)
new DefaultCheckpointNamingScheme(
logSettings.LogCommitDir ??
new FileInfo(logSettings.LogDevice.FileName).Directory.FullName),
logSettings.ReadOnlyMode ? false : logSettings.RemoveOutdatedCommits);
!logSettings.ReadOnlyMode && logSettings.RemoveOutdatedCommits);

if (logSettings.LogCommitManager == null)
disposeLogCommitManager = true;
Expand Down Expand Up @@ -217,6 +218,37 @@ public void Dispose()
TrueDispose();
}

/// <summary>
/// Mark the log as complete. A completed log will no longer allow enqueues, and all currently enqueued items will
/// be immediately committed.
/// </summary>
/// <param name="spinWait"> whether to spin until log completion becomes committed </param>
public void CompleteLog(bool spinWait = false)
{

// Ensure all currently started entries will enqueue before we declare log closed
epoch.BumpCurrentEpoch(() =>
{
CommitInternal(out _, out _, false, null, long.MaxValue, null);
});

// Ensure progress even if there is no thread in epoch table
if (!epoch.ThisInstanceProtected())
{
epoch.Resume();
epoch.Suspend();
}

if (spinWait)
WaitForCommit(TailAddress, long.MaxValue);
}

/// <summary>
/// Check if the log is complete. A completed log will no longer allow enqueues, and all currently enqueued items will
/// be immediately committed.
/// </summary>
public bool LogCompleted => commitNum == long.MaxValue;

internal void TrueDispose()
{
commitQueue.Dispose();
Expand Down Expand Up @@ -285,6 +317,8 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
Expand Down Expand Up @@ -317,6 +351,8 @@ public unsafe bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
Expand Down Expand Up @@ -1463,6 +1499,7 @@ private void RestoreLatest(out Dictionary<string, long> iterators, out byte[] co
cookie = info.Cookie;
commitNum = info.CommitNum;
beginAddress = allocator.BeginAddress;
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (readOnlyMode)
allocator.HeadAddress = long.MaxValue;

Expand Down Expand Up @@ -1641,6 +1678,8 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);

if (logicalAddress == 0)
Expand Down Expand Up @@ -1816,19 +1855,28 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
{
FastForwardAllowed = fastForwardAllowed,
Cookie = cookie,
Callback = callback
Callback = callback,
};
info.SnapshotIterators(PersistedIterators);
var metadataChanged = ShouldCommmitMetadata(ref info);
// Only apply commit policy if not a strong commit
if (fastForwardAllowed && !commitPolicy.AdmitCommit(TailAddress, metadataChanged))
return false;

// This critical section serializes commit record creation / commit content generation and ensures that the
// long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the
// commit code path
lock (ongoingCommitRequests)
{
if (commitCoveredAddress == TailAddress && !metadataChanged)
// Nothing to commit if no metadata update and no new entries
return false;
if (commitNum == long.MaxValue)
{
// log has been closed, throw an exception
throw new FasterException("log has already been closed");
}

// Make sure we will not be allowed to back out of a commit if AdmitCommit returns true, as the commit policy
// may need to update internal logic for every true response. We might waste some commit nums if commit
// policy filters out a lot of commits, but that's fine.
Expand All @@ -1840,9 +1888,7 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
// Invalid commit num
return false;

// This critical section serializes commit record creation / commit content generation and ensures that the
// long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the
// commit code path
// Normally --- only need commit records if fast committing.
if (fastCommitMode)
{
// Ok to retry in critical section, any concurrently invoked commit would block, but cannot progress
Expand Down
71 changes: 54 additions & 17 deletions cs/src/core/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public sealed class FasterLogScanIterator : ScanIteratorBase, IDisposable
/// </summary>
public long CompletedUntilAddress;

/// <summary>
/// Whether iteration has ended, either because we reached the end address of iteration, or because
/// we reached the end of a completed log.
/// </summary>
public bool Ended => (currentAddress >= endAddress) || (fasterLog.LogCompleted && currentAddress == fasterLog.TailAddress);

/// <summary>
/// Constructor
/// </summary>
Expand Down Expand Up @@ -75,11 +81,11 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
long nextAddress;
while (!GetNext(out result, out length, out currentAddress, out nextAddress))
{
if (currentAddress >= endAddress)
yield break;
if (Ended) yield break;
if (!await WaitAsync(token).ConfigureAwait(false))
yield break;
}

yield return (result, length, currentAddress, nextAddress);
}
}
Expand All @@ -98,8 +104,7 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
long nextAddress;
while (!GetNext(pool, out result, out length, out currentAddress, out nextAddress))
{
if (currentAddress >= endAddress)
yield break;
if (Ended) yield break;
if (!await WaitAsync(token).ConfigureAwait(false))
yield break;
}
Expand Down Expand Up @@ -226,9 +231,20 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre
epoch.Suspend();
throw;
}

if (isCommitRecord) continue;


if (isCommitRecord)
{
FasterLogRecoveryInfo info = new();
info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength)));
if (info.CommitNum != long.MaxValue) continue;

// Otherwise, no more entries
entry = default;
entryLength = default;
epoch.Suspend();
return false;
}

if (getMemory != null)
{
// Use user delegate to allocate memory
Expand All @@ -247,7 +263,7 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre

fixed (byte* bp = entry)
Buffer.MemoryCopy((void*) (headerSize + physicalAddress), bp, entryLength, entryLength);

epoch.Suspend();
return true;
}
Expand Down Expand Up @@ -304,21 +320,32 @@ public unsafe bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry,
epoch.Suspend();
return false;
}

}
catch (Exception)
{
// Throw upwards, but first, suspend the epoch we are in
epoch.Suspend();
throw;
}

if (isCommitRecord) continue;


if (isCommitRecord)
{
FasterLogRecoveryInfo info = new();
info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength)));
if (info.CommitNum != long.MaxValue) continue;

// Otherwise, no more entries
entry = default;
entryLength = default;
epoch.Suspend();
return false;
}

entry = pool.Rent(entryLength);

fixed (byte* bp = &entry.Memory.Span.GetPinnableReference())
Buffer.MemoryCopy((void*)(headerSize + physicalAddress), bp, entryLength, entryLength);

epoch.Suspend();
return true;
}
Expand Down Expand Up @@ -368,9 +395,20 @@ public unsafe bool UnsafeGetNext(out byte* entry, out int entryLength, out long
epoch.Suspend();
throw;
}

if (isCommitRecord) continue;


if (isCommitRecord)
{
FasterLogRecoveryInfo info = new();
info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength)));
if (info.CommitNum != long.MaxValue) continue;

// Otherwise, no more entries
entry = default;
entryLength = default;
epoch.Suspend();
return false;
}

entry = (byte*)(headerSize + physicalAddress);
return true;
}
Expand Down Expand Up @@ -467,7 +505,7 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int Align(int length)
private static int Align(int length)
{
return (length + 3) & ~3;
}
Expand Down Expand Up @@ -619,7 +657,6 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
{
if (!fasterLog.VerifyChecksum((byte*)physicalAddress, entryLength))
{
var curPage = currentAddress >> allocator.LogPageSizeBits;
currentAddress += headerSize;
if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _))
{
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/FasterLog/FasterLogRecoveryInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public struct FasterLogRecoveryInfo
/// FasterLog recovery version
/// </summary>
const int FasterLogRecoveryVersion = 1;

/// <summary>
/// Begin address
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions cs/test/DeviceFasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ private async ValueTask FasterLogTest1(LogChecksumType logChecksum, IDevice devi
{
log.Enqueue(entry);
}
log.Commit(true);

log.CompleteLog(true);

// MoveNextAsync() would hang at TailAddress, waiting for more entries (that we don't add).
// Note: If this happens and the test has to be canceled, there may be a leftover blob from the log.Commit(), because
// the log device isn't Dispose()d; the symptom is currently a numeric string format error in DefaultCheckpointNamingScheme.
using (var iter = log.Scan(0, log.TailAddress))
using (var iter = log.Scan(0, long.MaxValue))
{
var counter = new FasterLogTestBase.Counter(log);

Expand Down
1 change: 0 additions & 1 deletion cs/test/FasterLogResumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public async Task FasterLogResumePersistedReader2([Values] LogChecksumType logCh

using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(path), removeOutdated))
{

long originalCompleted;

using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
Expand Down