diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index 57a13d1fe..ae3e1ac55 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -333,6 +333,112 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) return true; } + /// + /// Try to append a user-defined header byte and three SpanByte entries entries atomically to the log. If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// + /// + /// + /// + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item1, ref SpanByte item2, ref SpanByte item3, out long logicalAddress) + { + logicalAddress = 0; + var length = sizeof(byte) + item1.TotalSize + item2.TotalSize + item3.TotalSize; + int allocatedLength = headerSize + Align(length); + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); + *physicalAddress = userHeader; + item1.CopyTo(physicalAddress + sizeof(byte)); + item2.CopyTo(physicalAddress + sizeof(byte) + item1.TotalSize); + item3.CopyTo(physicalAddress + sizeof(byte) + item1.TotalSize + item2.TotalSize); + SetHeader(length, physicalAddress); + epoch.Suspend(); + return true; + } + + /// + /// Try to append a user-defined header byte and two SpanByte entries entries atomically to the log. If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// + /// + /// + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item1, ref SpanByte item2, out long logicalAddress) + { + logicalAddress = 0; + var length = sizeof(byte) + item1.TotalSize + item2.TotalSize; + int allocatedLength = headerSize + Align(length); + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); + *physicalAddress = userHeader; + item1.CopyTo(physicalAddress + sizeof(byte)); + item2.CopyTo(physicalAddress + sizeof(byte) + item1.TotalSize); + SetHeader(length, physicalAddress); + epoch.Suspend(); + return true; + } + + /// + /// Try to append a user-defined header byte and a SpanByte entry atomically to the log. If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// + /// + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logicalAddress) + { + logicalAddress = 0; + var length = sizeof(byte) + item.TotalSize; + int allocatedLength = headerSize + Align(length); + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); + *physicalAddress = userHeader; + item.CopyTo(physicalAddress + sizeof(byte)); + SetHeader(length, physicalAddress); + epoch.Suspend(); + return true; + } + + /// /// Try to enqueue batch of entries as a single atomic unit (to memory). Entire /// batch needs to fit on one log page. diff --git a/cs/src/core/FasterLog/FasterLogIterator.cs b/cs/src/core/FasterLog/FasterLogIterator.cs index 6e0f24613..946d2998f 100644 --- a/cs/src/core/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/FasterLog/FasterLogIterator.cs @@ -324,6 +324,64 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry, } } + /// + /// WARNING: advanced users only. + /// Get next record in iterator, accessing unsafe raw bytes and retaining epoch protection. + /// Make sure to call UnsafeRelease when done processing the raw bytes (without delay). + /// + /// Copy of entry, if found + /// Actual length of entry + /// Logical address of entry + /// Logical address of next entry + /// + public unsafe bool UnsafeGetNext(out byte* entry, out int entryLength, out long currentAddress, out long nextAddress) + { + if (disposed) + { + entry = default; + entryLength = default; + currentAddress = default; + nextAddress = default; + return false; + } + epoch.Resume(); + // Continue looping until we find a record that is not a commit record + while (true) + { + long physicalAddress; + bool isCommitRecord; + try + { + var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress, + out nextAddress, + out isCommitRecord); + if (!hasNext) + { + entry = default; + epoch.Suspend(); + return false; + } + } + catch (Exception) + { + // Throw upwards, but first, suspend the epoch we are in + epoch.Suspend(); + throw; + } + + if (isCommitRecord) continue; + + entry = (byte*)(headerSize + physicalAddress); + return true; + } + } + + /// + /// WARNING: advanced users only. + /// Release a native memory reference obtained via a successful UnsafeGetNext. + /// + public void UnsafeRelease() => epoch.Suspend(); + /// /// Mark iterator complete until specified address. Info is not /// persisted until a subsequent commit operation on the log.