Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] FasterLog v2 advanced user extensions #621

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,112 @@ public unsafe bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)
return true;
}

/// <summary>
/// Try to append a user-defined header byte and three SpanByte entries entries atomically to the log. If it returns true, we are
/// done. If it returns false, we need to retry.
/// </summary>
/// <param name="userHeader"></param>
/// <param name="item1"></param>
/// <param name="item2"></param>
/// <param name="item3"></param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item1, ref SpanByte item2, ref SpanByte item3, out long logicalAddress)
{
logicalAddress = 0;
var length = sizeof(byte) + item1.TotalSize + item2.TotalSize + item3.TotalSize;
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress);
*physicalAddress = userHeader;
item1.CopyTo(physicalAddress + sizeof(byte));
item2.CopyTo(physicalAddress + sizeof(byte) + item1.TotalSize);
item3.CopyTo(physicalAddress + sizeof(byte) + item1.TotalSize + item2.TotalSize);
SetHeader(length, physicalAddress);
epoch.Suspend();
return true;
}

/// <summary>
/// Try to append a user-defined header byte and two SpanByte entries entries atomically to the log. If it returns true, we are
/// done. If it returns false, we need to retry.
/// </summary>
/// <param name="userHeader"></param>
/// <param name="item1"></param>
/// <param name="item2"></param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item1, ref SpanByte item2, out long logicalAddress)
{
logicalAddress = 0;
var length = sizeof(byte) + item1.TotalSize + item2.TotalSize;
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress);
*physicalAddress = userHeader;
item1.CopyTo(physicalAddress + sizeof(byte));
item2.CopyTo(physicalAddress + sizeof(byte) + item1.TotalSize);
SetHeader(length, physicalAddress);
epoch.Suspend();
return true;
}

/// <summary>
/// Try to append a user-defined header byte and a SpanByte entry atomically to the log. If it returns true, we are
/// done. If it returns false, we need to retry.
/// </summary>
/// <param name="userHeader"></param>
/// <param name="item"></param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logicalAddress)
{
logicalAddress = 0;
var length = sizeof(byte) + item.TotalSize;
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress);
*physicalAddress = userHeader;
item.CopyTo(physicalAddress + sizeof(byte));
SetHeader(length, physicalAddress);
epoch.Suspend();
return true;
}


/// <summary>
/// Try to enqueue batch of entries as a single atomic unit (to memory). Entire
/// batch needs to fit on one log page.
Expand Down
58 changes: 58 additions & 0 deletions cs/src/core/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,64 @@ public unsafe bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry,
}
}

/// <summary>
/// WARNING: advanced users only.
/// Get next record in iterator, accessing unsafe raw bytes and retaining epoch protection.
/// Make sure to call UnsafeRelease when done processing the raw bytes (without delay).
/// </summary>
/// <param name="entry">Copy of entry, if found</param>
/// <param name="entryLength">Actual length of entry</param>
/// <param name="currentAddress">Logical address of entry</param>
/// <param name="nextAddress">Logical address of next entry</param>
/// <returns></returns>
public unsafe bool UnsafeGetNext(out byte* entry, out int entryLength, out long currentAddress, out long nextAddress)
{
if (disposed)
{
entry = default;
entryLength = default;
currentAddress = default;
nextAddress = default;
return false;
}
epoch.Resume();
// Continue looping until we find a record that is not a commit record
while (true)
{
long physicalAddress;
bool isCommitRecord;
try
{
var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
out nextAddress,
out isCommitRecord);
if (!hasNext)
{
entry = default;
epoch.Suspend();
return false;
}
}
catch (Exception)
{
// Throw upwards, but first, suspend the epoch we are in
epoch.Suspend();
throw;
}

if (isCommitRecord) continue;

entry = (byte*)(headerSize + physicalAddress);
return true;
}
}

/// <summary>
/// WARNING: advanced users only.
/// Release a native memory reference obtained via a successful UnsafeGetNext.
/// </summary>
public void UnsafeRelease() => epoch.Suspend();

/// <summary>
/// Mark iterator complete until specified address. Info is not
/// persisted until a subsequent commit operation on the log.
Expand Down