Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] IEntry interface for FasterLog #639

Merged
merged 8 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 338 additions & 1 deletion cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ namespace FASTER.core
/// </summary>
public sealed class FasterLog : IDisposable
{
/// <summary>
/// Represents a entry that can be serialized directly onto FasterLog when enqueuing
/// </summary>
public interface IEntry
{
/// <summary></summary>
/// <returns> the size in bytes after serialization onto FasterLog</returns>
public int SerializedLength();
tli2 marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Serialize the entry onto FasterLog.
/// </summary>
/// <param name="dest">Memory buffer of FasterLog to serialize onto. Guaranteed to have at least SerializedLength() many bytes</param>
public void SerializeTo(Span<byte> dest);
}

private Exception cannedException = null;

readonly BlittableAllocator<Empty, byte> allocator;
Expand Down Expand Up @@ -310,9 +326,117 @@ public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch)
Thread.Yield();
return logicalAddress;
}

/// <summary>
/// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit
/// </summary>
/// <param name="entry">Entry to be enqueued to log</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of added entry</returns>
public long Enqueue<T>(T entry) where T : IEntry
{
long logicalAddress;
while (!TryEnqueue(entry, out logicalAddress))
Thread.Yield();
return logicalAddress;
}

/// <summary>
/// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit
/// </summary>
/// <param name="entries">Batch of entries to be enqueued to log</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of added entry</returns>
public long Enqueue<T>(IEnumerable<T> entries) where T : IEntry
{
long logicalAddress;
while (!TryEnqueue(entries, out logicalAddress))
Thread.Yield();
return logicalAddress;
}
#endregion

#region TryEnqueue
/// <summary>
/// Try to enqueue entry to log (in memory). If it returns true, we are
/// done. If it returns false, we need to retry.
/// </summary>
/// <param name="entry">Entry to be enqueued to log</param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryEnqueue<T>(T entry, out long logicalAddress) where T : IEntry
{
logicalAddress = 0;
var length = entry.SerializedLength();
int allocatedLength = headerSize + Align(length);
ValidateAllocatedLength(allocatedLength);

epoch.Resume();

if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
entry.SerializeTo(new Span<byte>((void *) (headerSize + physicalAddress), length));
SetHeader(length, (byte*)physicalAddress);
epoch.Suspend();
return true;
}

/// <summary>
/// Try to enqueue batch of entries as a single atomic unit (to memory). Entire
/// batch needs to fit on one log page.
/// </summary>
/// <param name="entries">Batch to be appended to log</param>
/// <param name="logicalAddress">Logical address of first added entry</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryEnqueue<T>(IEnumerable<T> entries, out long logicalAddress) where T : IEntry
{
logicalAddress = 0;

var allocatedLength = 0;
foreach (var entry in entries)
{
allocatedLength += Align(entry.SerializedLength()) + headerSize;
}

ValidateAllocatedLength(allocatedLength);

epoch.Resume();
if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log");

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);

if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
return false;
}

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
foreach(var entry in entries)
{
var length = entry.SerializedLength();
entry.SerializeTo(new Span<byte>((void *)(headerSize + physicalAddress), length));
SetHeader(length, (byte*)physicalAddress);
physicalAddress += Align(length) + headerSize;
}

epoch.Suspend();
return true;
}

/// <summary>
/// Try to enqueue entry to log (in memory). If it returns true, we are
/// done. If it returns false, we need to retry.
Expand All @@ -333,6 +457,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress)

logicalAddress = allocator.TryAllocateRetryNow(allocatedLength);
if (logicalAddress == 0)
if (logicalAddress == 0)
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
Expand Down Expand Up @@ -486,7 +611,6 @@ public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logic
return true;
}


/// <summary>
/// Try to enqueue batch of entries as a single atomic unit (to memory). Entire
/// batch needs to fit on one log page.
Expand Down Expand Up @@ -605,6 +729,80 @@ private static async ValueTask<long> SlowEnqueueAsync(FasterLog @this, IReadOnly

return logicalAddress;
}

/// <summary>
/// Enqueue entry to log in memory (async) - completes after entry is
/// appended to memory, NOT committed to storage.
/// </summary>
/// <param name="entry">Entry to enqueue</param>
/// <param name="token">Cancellation token</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of added entry</returns>
public ValueTask<long> EnqueueAsync<T>(T entry, CancellationToken token = default) where T : IEntry
{
token.ThrowIfCancellationRequested();
if (TryEnqueue(entry, out long logicalAddress))
return new ValueTask<long>(logicalAddress);

return SlowEnqueueAsync(this, entry, token);
}

private static async ValueTask<long> SlowEnqueueAsync<T>(FasterLog @this, T entry, CancellationToken token)
where T : IEntry
{
long logicalAddress;
while (true)
{
var flushEvent = @this.FlushEvent;
if (@this.TryEnqueue(entry, out logicalAddress))
break;
// Wait for *some* flush - failure can be ignored except if the token was signaled (which the caller should handle correctly)
try
{
await flushEvent.WaitAsync(token).ConfigureAwait(false);
}
catch when (!token.IsCancellationRequested) { }
}

return logicalAddress;
}

/// <summary>
/// Enqueue batch of entries to log in memory (async) - completes after entry is
/// appended to memory, NOT committed to storage.
/// </summary>
/// <param name="entries">Entry to enqueue</param>
/// <param name="token">Cancellation token</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of first added entry</returns>
public ValueTask<long> EnqueueAsync<T>(IEnumerable<T> entries, CancellationToken token = default) where T : IEntry
{
token.ThrowIfCancellationRequested();
if (TryEnqueue(entries, out long logicalAddress))
return new ValueTask<long>(logicalAddress);

return SlowEnqueueAsync(this, entries, token);
}

private static async ValueTask<long> SlowEnqueueAsync<T>(FasterLog @this, IEnumerable<T> entry, CancellationToken token)
where T : IEntry
{
long logicalAddress;
while (true)
{
var flushEvent = @this.FlushEvent;
if (@this.TryEnqueue(entry, out logicalAddress))
break;
// Wait for *some* flush - failure can be ignored except if the token was signaled (which the caller should handle correctly)
try
{
await flushEvent.WaitAsync(token).ConfigureAwait(false);
}
catch when (!token.IsCancellationRequested) { }
}

return logicalAddress;
}
#endregion

#region WaitForCommit and WaitForCommitAsync
Expand Down Expand Up @@ -858,6 +1056,38 @@ public long EnqueueAndWaitForCommit(IReadOnlySpanBatch readOnlySpanBatch)
WaitForCommit(logicalAddress + 1);
return logicalAddress;
}

/// <summary>
/// Append entry to log - spin-waits until entry is committed to storage.
/// Does NOT itself issue flush!
/// </summary>
/// <param name="entry">Entry to be enqueued to log</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of added entry</returns>
public long EnqueueAndWaitForCommit<T>(T entry) where T : IEntry
{
long logicalAddress;
while (!TryEnqueue(entry, out logicalAddress))
Thread.Yield();
WaitForCommit(logicalAddress + 1);
return logicalAddress;
}

/// <summary>
/// Append entry to log - spin-waits until entry is committed to storage.
/// Does NOT itself issue flush!
/// </summary>
/// <param name="entries">Entries to be enqueued to log</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of first added entry</returns>
public long EnqueueAndWaitForCommit<T>(IEnumerable<T> entries) where T : IEntry
{
long logicalAddress;
while (!TryEnqueue(entries, out logicalAddress))
Thread.Yield();
WaitForCommit(logicalAddress + 1);
return logicalAddress;
}

#endregion

Expand Down Expand Up @@ -1018,6 +1248,113 @@ public async ValueTask<long> EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch rea

return logicalAddress;
}

/// <summary>
/// Append entry to log (async) - completes after entry is committed to storage.
/// Does NOT itself issue flush!
/// </summary>
/// <param name="entry">Entry to enqueue</param>
/// <param name="token">Cancellation token</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of added entry</returns>
public async ValueTask<long> EnqueueAndWaitForCommitAsync<T>(T entry, CancellationToken token = default) where T : IEntry
{
token.ThrowIfCancellationRequested();
long logicalAddress;
CompletionEvent flushEvent;
Task<LinkedCommitInfo> commitTask;

// Phase 1: wait for commit to memory
while (true)
{
flushEvent = FlushEvent;
commitTask = CommitTask;
if (TryEnqueue(entry, out logicalAddress))
break;
try
{
await flushEvent.WaitAsync(token).ConfigureAwait(false);
}
catch when (!token.IsCancellationRequested) { }
}

// Phase 2: wait for commit/flush to storage
// Since the task object was read before enqueueing, there is no need for the CommittedUntilAddress >= logicalAddress check like in WaitForCommit
while (true)
{
LinkedCommitInfo linkedCommitInfo;
try
{
linkedCommitInfo = await commitTask.WithCancellationAsync(token).ConfigureAwait(false);
}
catch (CommitFailureException e)
{
linkedCommitInfo = e.LinkedCommitInfo;
if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress)
throw;
}
if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1)
commitTask = linkedCommitInfo.NextTask;
else
break;
}

return logicalAddress;
}

/// <summary>
/// Append batch of entries to log (async) - completes after batch is committed to storage.
/// Does NOT itself issue flush!
/// </summary>
/// <param name="entries"> entries to enqueue</param>
/// <param name="token">Cancellation token</param>
/// <typeparam name="T">type of entry</typeparam>
/// <returns>Logical address of added entry</returns>
public async ValueTask<long> EnqueueAndWaitForCommitAsync<T>(IEnumerable<T> entries,
CancellationToken token = default) where T : IEntry
{
token.ThrowIfCancellationRequested();
long logicalAddress;
CompletionEvent flushEvent;
Task<LinkedCommitInfo> commitTask;

// Phase 1: wait for commit to memory
while (true)
{
flushEvent = FlushEvent;
commitTask = CommitTask;
if (TryEnqueue(entries, out logicalAddress))
break;
try
{
await flushEvent.WaitAsync(token).ConfigureAwait(false);
}
catch when (!token.IsCancellationRequested) { }
}

// Phase 2: wait for commit/flush to storage
// Since the task object was read before enqueueing, there is no need for the CommittedUntilAddress >= logicalAddress check like in WaitForCommit
while (true)
{
LinkedCommitInfo linkedCommitInfo;
try
{
linkedCommitInfo = await commitTask.WithCancellationAsync(token).ConfigureAwait(false);
}
catch (CommitFailureException e)
{
linkedCommitInfo = e.LinkedCommitInfo;
if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress)
throw;
}
if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1)
commitTask = linkedCommitInfo.NextTask;
else
break;
}

return logicalAddress;
}
#endregion

/// <summary>
Expand Down
Loading