Skip to content

Commit

Permalink
[C#] FasterLogIterator.CompleteUntilRecordAtAsync() (#558)
Browse files Browse the repository at this point in the history
* MemoryPool-based override for FasterLog.ReadAsync()

* FasterLogIterator.CompleteUntilRecordAtAsync() added.

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
mito-csod and badrishc authored Sep 22, 2021
1 parent 8207000 commit 9fef961
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
74 changes: 74 additions & 0 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,36 @@ public FasterLogScanIterator Scan(long beginAddress, long endAddress, string nam
return GetRecordAsMemoryOwnerAndFree(ctx.record, memoryPool);
}

/// <summary>
/// Random read record from log, at given address
/// </summary>
/// <param name="address">Logical address to read from</param>
/// <param name="token">Cancellation token</param>
/// <returns></returns>
public async ValueTask<int> ReadRecordLengthAsync(long address, CancellationToken token = default)
{
token.ThrowIfCancellationRequested();
epoch.Resume();
if (address >= CommittedUntilAddress || address < BeginAddress)
{
epoch.Suspend();
return default;
}
var ctx = new SimpleReadContext
{
logicalAddress = address,
completedRead = new SemaphoreSlim(0)
};
unsafe
{
allocator.AsyncReadRecordToMemory(address, headerSize, AsyncGetHeaderOnlyFromDiskCallback, ref ctx);
}
epoch.Suspend();
await ctx.completedRead.WaitAsync(token).ConfigureAwait(false);

return GetRecordLengthAndFree(ctx.record);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int Align(int length)
{
Expand Down Expand Up @@ -1204,6 +1234,29 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje
}
}

private void AsyncGetHeaderOnlyFromDiskCallback(uint errorCode, uint numBytes, object context)
{
var ctx = (SimpleReadContext)context;

if (errorCode != 0)
{
Trace.TraceError("AsyncGetFromDiskCallback error: {0}", errorCode);
ctx.record.Return();
ctx.record = null;
ctx.completedRead.Release();
}
else
{
if (ctx.record.available_bytes < headerSize)
{
Debug.WriteLine("No record header present at address: " + ctx.logicalAddress);
ctx.record.Return();
ctx.record = null;
}
ctx.completedRead.Release();
}
}

private (byte[], int) GetRecordAndFree(SectorAlignedMemory record)
{
if (record == null)
Expand Down Expand Up @@ -1256,6 +1309,27 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje
return (result, length);
}

private int GetRecordLengthAndFree(SectorAlignedMemory record)
{
if (record == null)
return 0;

int length;
unsafe
{
var ptr = record.GetValidPointer();
length = GetLength(ptr);

if (!VerifyChecksum(ptr, length))
{
throw new FasterException("Checksum failed for read");
}
}

record.Return();
return length;
}

private long CommitInternal(bool spinWait = false)
{
if (readOnlyMode)
Expand Down
14 changes: 14 additions & 0 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,20 @@ public void CompleteUntil(long address)
Utility.MonotonicUpdate(ref requestedCompletedUntilAddress, address, out _);
}

/// <summary>
/// Mark iterator complete until the end of the record at specified
/// address. Info is not persisted until a subsequent commit operation
/// on the log. Note: this is slower than CompleteUntil() because the
/// record's length needs to be looked up first.
/// </summary>
/// <param name="recordStartAddress"></param>
/// <param name="token"></param>
public async ValueTask CompleteUntilRecordAtAsync(long recordStartAddress, CancellationToken token = default)
{
int len = await fasterLog.ReadRecordLengthAsync(recordStartAddress, token: token);
CompleteUntil(recordStartAddress + headerSize + len);
}

internal void UpdateCompletedUntilAddress(long address)
{
Utility.MonotonicUpdate(ref CompletedUntilAddress, address, out _);
Expand Down
33 changes: 33 additions & 0 deletions cs/test/FasterLogResumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,39 @@ public async Task FasterLogResumePersistedReaderSpec([Values] LogChecksumType lo
}
}

[Test]
[Category("FasterLog")]
public async Task FasterLogResumeViaCompleteUntilRecordAtSpec([Values] LogChecksumType logChecksum)
{
CancellationToken cancellationToken = default;

var input1 = new byte[] { 0, 1, 2, 3 };
var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
var input3 = new byte[] { 11, 12 };
string readerName = "abc";

using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum }))
{
await l.EnqueueAsync(input1, cancellationToken);
await l.EnqueueAsync(input2);
await l.EnqueueAsync(input3);
await l.CommitAsync();

using var originalIterator = l.Scan(0, long.MaxValue, readerName);
Assert.IsTrue(originalIterator.GetNext(out _, out _, out long recordAddress, out _));
await originalIterator.CompleteUntilRecordAtAsync(recordAddress);
Assert.IsTrue(originalIterator.GetNext(out _, out _, out _, out _)); // move the reader ahead
await l.CommitAsync();
}

using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum }))
{
using var recoveredIterator = l.Scan(0, long.MaxValue, readerName);
Assert.IsTrue(recoveredIterator.GetNext(out byte[] outBuf, out _, out _, out _));
Assert.True(input2.SequenceEqual(outBuf)); // we should have read in input2, not input1 or input3
}
}

[Test]
[Category("FasterLog")]
public async Task FasterLogResumePersistedReader2([Values] LogChecksumType logChecksum, [Values] bool overwriteLogCommits, [Values] bool removeOutdated)
Expand Down

0 comments on commit 9fef961

Please sign in to comment.