From 7d5d5ff583a9b9b921f2f465357025e53cec3b6a Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 24 May 2023 19:34:09 -0700 Subject: [PATCH] Unsafe API to access fasterlog internals (#837) * Unsafe API to access fasterlog internals * updates --- cs/src/core/Allocator/ScanIteratorBase.cs | 29 +++ cs/src/core/FasterLog/FasterLog.cs | 73 +++++++ cs/src/core/FasterLog/FasterLogIterator.cs | 212 ++++++++++++++++++++- cs/src/core/FasterLog/ILogEntryConsumer.cs | 16 ++ 4 files changed, 324 insertions(+), 6 deletions(-) diff --git a/cs/src/core/Allocator/ScanIteratorBase.cs b/cs/src/core/Allocator/ScanIteratorBase.cs index db3b87124..8ccdbc9b0 100644 --- a/cs/src/core/Allocator/ScanIteratorBase.cs +++ b/cs/src/core/Allocator/ScanIteratorBase.cs @@ -178,6 +178,35 @@ protected unsafe bool BufferAndLoad(long currentAddress, long currentPage, long return WaitForFrameLoad(currentAddress, currentFrame); } + /// + /// Whether we need to buffer new page from disk + /// + protected unsafe bool NeedBufferAndLoad(long currentAddress, long currentPage, long currentFrame, long headAddress, long endAddress) + { + for (int i = 0; i < frameSize; i++) + { + var nextPage = currentPage + i; + + var pageStartAddress = nextPage << logPageSizeBits; + + // Cannot load page if it is entirely in memory or beyond the end address + if (pageStartAddress >= headAddress || pageStartAddress >= endAddress) + continue; + + var pageEndAddress = (nextPage + 1) << logPageSizeBits; + if (endAddress < pageEndAddress) + pageEndAddress = endAddress; + if (headAddress < pageEndAddress) + pageEndAddress = headAddress; + + var nextFrame = (currentFrame + i) % frameSize; + + if (nextLoadedPage[nextFrame] < pageEndAddress || loadedPage[nextFrame] < pageEndAddress) + return true; + } + return false; + } + internal abstract void AsyncReadPagesFromDeviceToFrame(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null); private bool WaitForFrameLoad(long currentAddress, long currentFrame) diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index 8235a4672..78e2d4ae7 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -94,6 +94,11 @@ public sealed class FasterLog : IDisposable /// public byte[] RecoveredCookie; + /// + /// Header size used by FasterLog + /// + public int HeaderSize => headerSize; + /// /// Task notifying commit completions /// @@ -373,6 +378,20 @@ public long Enqueue(ReadOnlySpan entry) return logicalAddress; } + /// + /// Enqueue raw pre-formatted bytes with headers to the log (in memory). + /// + /// Raw bytes to be enqueued to log + /// First logical address of added entries + public long UnsafeEnqueueRaw(ReadOnlySpan entryBytes) + { + long logicalAddress; + while (!UnsafeTryEnqueueRaw(entryBytes, out logicalAddress)) + Thread.Yield(); + return logicalAddress; + + } + /// /// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit /// @@ -536,6 +555,44 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) return true; } + /// + /// Try to enqueue raw pre-formatted bytes with headers to the log (in memory). If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// Entry bytes to be enqueued to log + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool UnsafeTryEnqueueRaw(ReadOnlySpan entryBytes, out long logicalAddress) + { + int length = entryBytes.Length; + + // Length should be pre-aligned + Debug.Assert(length == Align(length)); + logicalAddress = 0; + int allocatedLength = length; + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + + if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + if (logicalAddress == 0) + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + entryBytes.CopyTo(new Span((byte*)physicalAddress, length)); + if (AutoRefreshSafeTailAddress) DoAutoRefreshSafeTailAddress(); + epoch.Suspend(); + if (AutoCommit) Commit(); + return true; + } + /// /// Try to append entry to log. If it returns true, we are /// done. If it returns false, we need to retry. @@ -2610,6 +2667,22 @@ internal unsafe int GetLength(byte* ptr) return 0; } + /// + /// Get length of entry from pointer to header + /// + /// + /// + public unsafe int UnsafeGetLength(byte* headerPtr) + => GetLength(headerPtr); + + /// + /// Get aligned version of record length + /// + /// + /// + public int UnsafeAlign(int length) + => Align(length); + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal unsafe bool VerifyChecksum(byte* ptr, int length) { diff --git a/cs/src/core/FasterLog/FasterLogIterator.cs b/cs/src/core/FasterLog/FasterLogIterator.cs index 92260b12d..f4d8559a4 100644 --- a/cs/src/core/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/FasterLog/FasterLogIterator.cs @@ -131,6 +131,28 @@ public async Task ConsumeAllAsync(T consumer, CancellationToken token = defau } } + /// + /// Asynchronously consume the log with given consumer until end of iteration or cancelled + /// + /// consumer + /// throttle the iteration speed + /// max size of returned chunk + /// cancellation token + /// consumer type + public async Task BulkConsumeAllAsync(T consumer, int throttleMs = 0, int maxChunkSize = 0, CancellationToken token = default) where T : IBulkLogEntryConsumer + { + while (!disposed) + { + // TryConsumeNext returns false if we have to wait for the next record. + while (!TryBulkConsumeNext(consumer, maxChunkSize)) + { + if (!await WaitAsync(token).ConfigureAwait(false)) + return; + if (throttleMs > 0) await Task.Delay(throttleMs, token).ConfigureAwait(false); + } + } + } + /// /// Wait for iteration to be ready to continue /// @@ -242,7 +264,7 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre { var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress, out nextAddress, - out isCommitRecord); + out isCommitRecord, out _); if (!hasNext) { entry = default; @@ -337,7 +359,7 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry, { var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress, out nextAddress, - out isCommitRecord); + out isCommitRecord, out _); if (!hasNext) { entry = default; @@ -403,7 +425,7 @@ public unsafe bool TryConsumeNext(T consumer) where T : ILogEntryConsumer { var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress, out nextAddress, - out isCommitRecord); + out isCommitRecord, out _); if (!hasNext) { epoch.Suspend(); @@ -433,6 +455,82 @@ public unsafe bool TryConsumeNext(T consumer) where T : ILogEntryConsumer } } + /// + /// Consume the next entry in the log with the given consumer + /// + /// consumer + /// + /// concrete type of consumer + /// whether a next entry is present + public unsafe bool TryBulkConsumeNext(T consumer, int maxChunkSize = 0) where T : IBulkLogEntryConsumer + { + if (maxChunkSize == 0) maxChunkSize = allocator.PageSize; + + if (disposed) + { + currentAddress = default; + nextAddress = default; + return false; + } + + bool retVal; + + epoch.Resume(); + + // Find a contiguous set of log entries + try + { + while (true) + { + var hasNext = GetNextInternal(out long startPhysicalAddress, out int newEntryLength, out long startLogicalAddress, out long endLogicalAddress, out bool isCommitRecord, out bool onFrame); + + if (!hasNext) + { + retVal = false; + break; + } + + // GetNextInternal returns only the payload length, so adjust the totalLength + int totalLength = headerSize + Align(newEntryLength); + + // Expand the records in iteration, as long as as they are on the same physical page + while (ExpandGetNextInternal(startPhysicalAddress, ref totalLength, out long newCurrentAddress, out endLogicalAddress, out isCommitRecord)) + { + if (totalLength > maxChunkSize) + break; + } + + // Consume the chunk + if (onFrame) + { + // Record in frame, so we do no need epoch protection to access it + epoch.Suspend(); + try + { + consumer.Consume((byte*)startPhysicalAddress, totalLength, startLogicalAddress, endLogicalAddress); + } + finally + { + epoch.Resume(); + } + } + else + { + // Consume the chunk (warning: we are under epoch protection here, as we are consuming directly from main memory log buffer) + consumer.Consume((byte*)startPhysicalAddress, totalLength, startLogicalAddress, endLogicalAddress); + + // Refresh epoch to maintain liveness of log append + epoch.ProtectAndDrain(); + } + } + } + finally + { + epoch.Suspend(); + } + return retVal; + } + /// /// WARNING: advanced users only. /// Get next record in iterator, accessing unsafe raw bytes and retaining epoch protection. @@ -463,7 +561,7 @@ public unsafe bool UnsafeGetNext(out byte* entry, out int entryLength, out long { var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress, out nextAddress, - out isCommitRecord); + out isCommitRecord, out _); if (!hasNext) { entry = default; @@ -603,7 +701,7 @@ internal unsafe bool ScanForwardForCommit(ref FasterLogRecoveryInfo info, long c // Continue looping until we find a record that is a commit record while (GetNextInternal(out long physicalAddress, out var entryLength, out long currentAddress, out long nextAddress, - out var isCommitRecord)) + out var isCommitRecord, out _)) { if (!isCommitRecord) continue; @@ -648,8 +746,9 @@ internal unsafe bool ScanForwardForCommit(ref FasterLogRecoveryInfo info, long c /// /// /// + /// /// - private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out long currentAddress, out long outNextAddress, out bool commitRecord) + private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out long currentAddress, out long outNextAddress, out bool commitRecord, out bool onFrame) { while (true) { @@ -658,6 +757,7 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt currentAddress = nextAddress; outNextAddress = currentAddress; commitRecord = false; + onFrame = false; // Check for boundary conditions if (currentAddress < allocator.BeginAddress) @@ -692,6 +792,7 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt if (BufferAndLoad(currentAddress, _currentPage, _currentFrame, _headAddress, _endAddress)) continue; physicalAddress = frame.GetPhysicalAddress(_currentFrame, _currentOffset); + onFrame = true; } else { @@ -758,5 +859,104 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt } } } + + private unsafe bool ExpandGetNextInternal(long startPhysicalAddress, ref int totalEntryLength, out long currentAddress, out long outNextAddress, out bool commitRecord) + { + while (true) + { + long physicalAddress; + int entryLength; + currentAddress = nextAddress; + outNextAddress = currentAddress; + commitRecord = false; + + // Check for boundary conditions + if (currentAddress < allocator.BeginAddress) + { + Utility.MonotonicUpdate(ref nextAddress, allocator.BeginAddress, out _); + currentAddress = nextAddress; + outNextAddress = currentAddress; + } + + var _currentPage = currentAddress >> allocator.LogPageSizeBits; + var _currentFrame = _currentPage % frameSize; + var _currentOffset = currentAddress & allocator.PageSizeMask; + var _headAddress = allocator.HeadAddress; + + if (disposed) + return false; + + if ((currentAddress >= endAddress) || (currentAddress >= (scanUncommitted ? fasterLog.SafeTailAddress : fasterLog.CommittedUntilAddress))) + return false; + + if (currentAddress < _headAddress) + { + var _endAddress = endAddress; + if (fasterLog.readOnlyMode) + { + // Support partial page reads of committed data + var _flush = fasterLog.CommittedUntilAddress; + if (_flush < endAddress) + _endAddress = _flush; + } + + if (NeedBufferAndLoad(currentAddress, _currentPage, _currentFrame, _headAddress, _endAddress)) + return false; + + physicalAddress = frame.GetPhysicalAddress(_currentFrame, _currentOffset); + } + else + { + physicalAddress = allocator.GetPhysicalAddress(currentAddress); + } + + if (physicalAddress != startPhysicalAddress + totalEntryLength) + return false; + + // Get and check entry length + entryLength = fasterLog.GetLength((byte*)physicalAddress); + + // We may encounter zeroed out bits at the end of page in a normal log, therefore, we need to check whether that is the case + if (entryLength == 0) + { + return false; + } + + // commit records have negative length fields + if (entryLength < 0) + { + commitRecord = true; + entryLength = -entryLength; + } + + int recordSize = headerSize + Align(entryLength); + if (_currentOffset + recordSize > allocator.PageSize) + { + return false; + } + + // Verify checksum if needed + if (currentAddress < _headAddress) + { + if (!fasterLog.VerifyChecksum((byte*)physicalAddress, entryLength)) + { + return false; + } + } + + if ((currentAddress & allocator.PageSizeMask) + recordSize == allocator.PageSize) + currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits; + else + currentAddress += recordSize; + + if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out long oldCurrentAddress)) + { + totalEntryLength += recordSize; + outNextAddress = currentAddress; + currentAddress = oldCurrentAddress; + return true; + } + } + } } } diff --git a/cs/src/core/FasterLog/ILogEntryConsumer.cs b/cs/src/core/FasterLog/ILogEntryConsumer.cs index cbabc2ceb..e8f07dde4 100644 --- a/cs/src/core/FasterLog/ILogEntryConsumer.cs +++ b/cs/src/core/FasterLog/ILogEntryConsumer.cs @@ -15,4 +15,20 @@ public interface ILogEntryConsumer /// (predicted) address of the next entry public void Consume(ReadOnlySpan entry, long currentAddress, long nextAddress); } + + /// + /// Consumes FasterLog entries in bulk (raw data) without copying + /// + public interface IBulkLogEntryConsumer + { + /// + /// Consumes the given bulk entries (raw data). + /// + /// + /// + /// address of the consumed entry + /// (predicted) address of the next entry + unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress); + } + } \ No newline at end of file