Skip to content

Commit

Permalink
[C#] FasterLog v2 Add ability to explicitly terminate a log (#620)
Browse files Browse the repository at this point in the history
* Add ability to explicitly terminate a log

* remove special 0-length return on eof

* Add LogCompleted in iterator

* Change comment and test

* fix async iteration logic

* fixes

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
tli2 and badrishc authored Dec 17, 2021
1 parent 7cb9f84 commit c5b3b7b
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 28 deletions.
3 changes: 1 addition & 2 deletions cs/samples/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ static void ScanThread()
{
while (!iter.GetNext(out result, out _, out _))
{
// For finite end address, check if iteration ended
// if (currentAddress >= endAddress) return;
if (iter.Ended) return;
iter.WaitAsync().AsTask().GetAwaiter().GetResult();
}

Expand Down
56 changes: 51 additions & 5 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public sealed class FasterLog : IDisposable
readonly int headerSize;
readonly LogChecksumType logChecksum;
readonly WorkQueueLIFO<CommitInfo> commitQueue;

internal readonly bool readOnlyMode;
internal readonly bool fastCommitMode;
internal readonly bool tolerateDeviceFailure;
Expand Down Expand Up @@ -132,7 +133,7 @@ public FasterLog(FasterLogSettings logSettings)
new DefaultCheckpointNamingScheme(
logSettings.LogCommitDir ??
new FileInfo(logSettings.LogDevice.FileName).Directory.FullName),
logSettings.ReadOnlyMode ? false : logSettings.RemoveOutdatedCommits);
!logSettings.ReadOnlyMode && logSettings.RemoveOutdatedCommits);

if (logSettings.LogCommitManager == null)
disposeLogCommitManager = true;
Expand Down Expand Up @@ -217,6 +218,37 @@ public void Dispose()
TrueDispose();
}

/// <summary>
/// Mark the log as complete. A completed log will no longer allow enqueues, and all currently enqueued items will
/// be immediately committed.
/// </summary>
/// <param name="spinWait"> whether to spin until log completion becomes committed </param>
public void CompleteLog(bool spinWait = false)
{

// Ensure all currently started entries will enqueue before we declare log closed
epoch.BumpCurrentEpoch(() =>
{
CommitInternal(out _, out _, false, null, long.MaxValue, null);
});

// Ensure progress even if there is no thread in epoch table
if (!epoch.ThisInstanceProtected())
{
epoch.Resume();
epoch.Suspend();
}

if (spinWait)
WaitForCommit(TailAddress, long.MaxValue);
}

/// <summary>
/// Check if the log is complete. A completed log will no longer allow enqueues, and all currently enqueued items will
/// be immediately committed.
/// </summary>
public bool LogCompleted => commitNum == long.MaxValue;

internal void TrueDispose()
{
commitQueue.Dispose();
Expand Down Expand Up @@ -285,6 +317,8 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
Expand Down Expand Up @@ -317,6 +351,8 @@ public unsafe bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
Expand Down Expand Up @@ -1463,6 +1499,7 @@ private void RestoreLatest(out Dictionary<string, long> iterators, out byte[] co
cookie = info.Cookie;
commitNum = info.CommitNum;
beginAddress = allocator.BeginAddress;
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");
if (readOnlyMode)
allocator.HeadAddress = long.MaxValue;

Expand Down Expand Up @@ -1641,6 +1678,8 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log
ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);

if (logicalAddress == 0)
Expand Down Expand Up @@ -1816,19 +1855,28 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
{
FastForwardAllowed = fastForwardAllowed,
Cookie = cookie,
Callback = callback
Callback = callback,
};
info.SnapshotIterators(PersistedIterators);
var metadataChanged = ShouldCommmitMetadata(ref info);
// Only apply commit policy if not a strong commit
if (fastForwardAllowed && !commitPolicy.AdmitCommit(TailAddress, metadataChanged))
return false;

// This critical section serializes commit record creation / commit content generation and ensures that the
// long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the
// commit code path
lock (ongoingCommitRequests)
{
if (commitCoveredAddress == TailAddress && !metadataChanged)
// Nothing to commit if no metadata update and no new entries
return false;
if (commitNum == long.MaxValue)
{
// log has been closed, throw an exception
throw new FasterException("log has already been closed");
}

// Make sure we will not be allowed to back out of a commit if AdmitCommit returns true, as the commit policy
// may need to update internal logic for every true response. We might waste some commit nums if commit
// policy filters out a lot of commits, but that's fine.
Expand All @@ -1840,9 +1888,7 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool
// Invalid commit num
return false;

// This critical section serializes commit record creation / commit content generation and ensures that the
// long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the
// commit code path
// Normally --- only need commit records if fast committing.
if (fastCommitMode)
{
// Ok to retry in critical section, any concurrently invoked commit would block, but cannot progress
Expand Down
71 changes: 54 additions & 17 deletions cs/src/core/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public sealed class FasterLogScanIterator : ScanIteratorBase, IDisposable
/// </summary>
public long CompletedUntilAddress;

/// <summary>
/// Whether iteration has ended, either because we reached the end address of iteration, or because
/// we reached the end of a completed log.
/// </summary>
public bool Ended => (currentAddress >= endAddress) || (fasterLog.LogCompleted && currentAddress == fasterLog.TailAddress);

/// <summary>
/// Constructor
/// </summary>
Expand Down Expand Up @@ -75,11 +81,11 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
long nextAddress;
while (!GetNext(out result, out length, out currentAddress, out nextAddress))
{
if (currentAddress >= endAddress)
yield break;
if (Ended) yield break;
if (!await WaitAsync(token).ConfigureAwait(false))
yield break;
}

yield return (result, length, currentAddress, nextAddress);
}
}
Expand All @@ -98,8 +104,7 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
long nextAddress;
while (!GetNext(pool, out result, out length, out currentAddress, out nextAddress))
{
if (currentAddress >= endAddress)
yield break;
if (Ended) yield break;
if (!await WaitAsync(token).ConfigureAwait(false))
yield break;
}
Expand Down Expand Up @@ -226,9 +231,20 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre
epoch.Suspend();
throw;
}

if (isCommitRecord) continue;


if (isCommitRecord)
{
FasterLogRecoveryInfo info = new();
info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength)));
if (info.CommitNum != long.MaxValue) continue;

// Otherwise, no more entries
entry = default;
entryLength = default;
epoch.Suspend();
return false;
}

if (getMemory != null)
{
// Use user delegate to allocate memory
Expand All @@ -247,7 +263,7 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre

fixed (byte* bp = entry)
Buffer.MemoryCopy((void*) (headerSize + physicalAddress), bp, entryLength, entryLength);

epoch.Suspend();
return true;
}
Expand Down Expand Up @@ -304,21 +320,32 @@ public unsafe bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry,
epoch.Suspend();
return false;
}

}
catch (Exception)
{
// Throw upwards, but first, suspend the epoch we are in
epoch.Suspend();
throw;
}

if (isCommitRecord) continue;


if (isCommitRecord)
{
FasterLogRecoveryInfo info = new();
info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength)));
if (info.CommitNum != long.MaxValue) continue;

// Otherwise, no more entries
entry = default;
entryLength = default;
epoch.Suspend();
return false;
}

entry = pool.Rent(entryLength);

fixed (byte* bp = &entry.Memory.Span.GetPinnableReference())
Buffer.MemoryCopy((void*)(headerSize + physicalAddress), bp, entryLength, entryLength);

epoch.Suspend();
return true;
}
Expand Down Expand Up @@ -368,9 +395,20 @@ public unsafe bool UnsafeGetNext(out byte* entry, out int entryLength, out long
epoch.Suspend();
throw;
}

if (isCommitRecord) continue;


if (isCommitRecord)
{
FasterLogRecoveryInfo info = new();
info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength)));
if (info.CommitNum != long.MaxValue) continue;

// Otherwise, no more entries
entry = default;
entryLength = default;
epoch.Suspend();
return false;
}

entry = (byte*)(headerSize + physicalAddress);
return true;
}
Expand Down Expand Up @@ -467,7 +505,7 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int Align(int length)
private static int Align(int length)
{
return (length + 3) & ~3;
}
Expand Down Expand Up @@ -619,7 +657,6 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt
{
if (!fasterLog.VerifyChecksum((byte*)physicalAddress, entryLength))
{
var curPage = currentAddress >> allocator.LogPageSizeBits;
currentAddress += headerSize;
if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _))
{
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/FasterLog/FasterLogRecoveryInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public struct FasterLogRecoveryInfo
/// FasterLog recovery version
/// </summary>
const int FasterLogRecoveryVersion = 1;

/// <summary>
/// Begin address
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions cs/test/DeviceFasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ private async ValueTask FasterLogTest1(LogChecksumType logChecksum, IDevice devi
{
log.Enqueue(entry);
}
log.Commit(true);

log.CompleteLog(true);

// MoveNextAsync() would hang at TailAddress, waiting for more entries (that we don't add).
// Note: If this happens and the test has to be canceled, there may be a leftover blob from the log.Commit(), because
// the log device isn't Dispose()d; the symptom is currently a numeric string format error in DefaultCheckpointNamingScheme.
using (var iter = log.Scan(0, log.TailAddress))
using (var iter = log.Scan(0, long.MaxValue))
{
var counter = new FasterLogTestBase.Counter(log);

Expand Down
1 change: 0 additions & 1 deletion cs/test/FasterLogResumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public async Task FasterLogResumePersistedReader2([Values] LogChecksumType logCh

using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(path), removeOutdated))
{

long originalCompleted;

using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
Expand Down

0 comments on commit c5b3b7b

Please sign in to comment.