diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index c4b559542..99a3536db 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -875,6 +875,36 @@ public FasterLogScanIterator Scan(long beginAddress, long endAddress, string nam return GetRecordAsMemoryOwnerAndFree(ctx.record, memoryPool); } + /// + /// Random read record from log, at given address + /// + /// Logical address to read from + /// Cancellation token + /// + public async ValueTask ReadRecordLengthAsync(long address, CancellationToken token = default) + { + token.ThrowIfCancellationRequested(); + epoch.Resume(); + if (address >= CommittedUntilAddress || address < BeginAddress) + { + epoch.Suspend(); + return default; + } + var ctx = new SimpleReadContext + { + logicalAddress = address, + completedRead = new SemaphoreSlim(0) + }; + unsafe + { + allocator.AsyncReadRecordToMemory(address, headerSize, AsyncGetHeaderOnlyFromDiskCallback, ref ctx); + } + epoch.Suspend(); + await ctx.completedRead.WaitAsync(token).ConfigureAwait(false); + + return GetRecordLengthAndFree(ctx.record); + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private int Align(int length) { @@ -1204,6 +1234,29 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje } } + private void AsyncGetHeaderOnlyFromDiskCallback(uint errorCode, uint numBytes, object context) + { + var ctx = (SimpleReadContext)context; + + if (errorCode != 0) + { + Trace.TraceError("AsyncGetFromDiskCallback error: {0}", errorCode); + ctx.record.Return(); + ctx.record = null; + ctx.completedRead.Release(); + } + else + { + if (ctx.record.available_bytes < headerSize) + { + Debug.WriteLine("No record header present at address: " + ctx.logicalAddress); + ctx.record.Return(); + ctx.record = null; + } + ctx.completedRead.Release(); + } + } + private (byte[], int) GetRecordAndFree(SectorAlignedMemory record) { if (record == null) @@ -1256,6 +1309,27 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje return (result, length); } + private int GetRecordLengthAndFree(SectorAlignedMemory record) + { + if (record == null) + return 0; + + int length; + unsafe + { + var ptr = record.GetValidPointer(); + length = GetLength(ptr); + + if (!VerifyChecksum(ptr, length)) + { + throw new FasterException("Checksum failed for read"); + } + } + + record.Return(); + return length; + } + private long CommitInternal(bool spinWait = false) { if (readOnlyMode) diff --git a/cs/src/core/Index/FasterLog/FasterLogIterator.cs b/cs/src/core/Index/FasterLog/FasterLogIterator.cs index 707e916cc..77395d25e 100644 --- a/cs/src/core/Index/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/Index/FasterLog/FasterLogIterator.cs @@ -291,6 +291,20 @@ public void CompleteUntil(long address) Utility.MonotonicUpdate(ref requestedCompletedUntilAddress, address, out _); } + /// + /// Mark iterator complete until the end of the record at specified + /// address. Info is not persisted until a subsequent commit operation + /// on the log. Note: this is slower than CompleteUntil() because the + /// record's length needs to be looked up first. + /// + /// + /// + public async ValueTask CompleteUntilRecordAtAsync(long recordStartAddress, CancellationToken token = default) + { + int len = await fasterLog.ReadRecordLengthAsync(recordStartAddress, token: token); + CompleteUntil(recordStartAddress + headerSize + len); + } + internal void UpdateCompletedUntilAddress(long address) { Utility.MonotonicUpdate(ref CompletedUntilAddress, address, out _); diff --git a/cs/test/FasterLogResumeTests.cs b/cs/test/FasterLogResumeTests.cs index 4e6d1e503..8ba7a7d21 100644 --- a/cs/test/FasterLogResumeTests.cs +++ b/cs/test/FasterLogResumeTests.cs @@ -68,6 +68,39 @@ public async Task FasterLogResumePersistedReaderSpec([Values] LogChecksumType lo } } + [Test] + [Category("FasterLog")] + public async Task FasterLogResumeViaCompleteUntilRecordAtSpec([Values] LogChecksumType logChecksum) + { + CancellationToken cancellationToken = default; + + var input1 = new byte[] { 0, 1, 2, 3 }; + var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 }; + var input3 = new byte[] { 11, 12 }; + string readerName = "abc"; + + using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum })) + { + await l.EnqueueAsync(input1, cancellationToken); + await l.EnqueueAsync(input2); + await l.EnqueueAsync(input3); + await l.CommitAsync(); + + using var originalIterator = l.Scan(0, long.MaxValue, readerName); + Assert.IsTrue(originalIterator.GetNext(out _, out _, out long recordAddress, out _)); + await originalIterator.CompleteUntilRecordAtAsync(recordAddress); + Assert.IsTrue(originalIterator.GetNext(out _, out _, out _, out _)); // move the reader ahead + await l.CommitAsync(); + } + + using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum })) + { + using var recoveredIterator = l.Scan(0, long.MaxValue, readerName); + Assert.IsTrue(recoveredIterator.GetNext(out byte[] outBuf, out _, out _, out _)); + Assert.True(input2.SequenceEqual(outBuf)); // we should have read in input2, not input1 or input3 + } + } + [Test] [Category("FasterLog")] public async Task FasterLogResumePersistedReader2([Values] LogChecksumType logChecksum, [Values] bool overwriteLogCommits, [Values] bool removeOutdated)