Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] IEntry interface for FasterLog #639

Merged
merged 8 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
357 changes: 356 additions & 1 deletion cs/src/core/FasterLog/FasterLog.cs

Large diffs are not rendered by default.

78 changes: 76 additions & 2 deletions cs/src/core/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
long nextAddress;
while (!GetNext(out result, out length, out currentAddress, out nextAddress))
{
if (Ended) yield break;
if (!await WaitAsync(token).ConfigureAwait(false))
yield break;
}
Expand All @@ -104,14 +103,30 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
long nextAddress;
while (!GetNext(pool, out result, out length, out currentAddress, out nextAddress))
{
if (Ended) yield break;
if (!await WaitAsync(token).ConfigureAwait(false))
yield break;
}
yield return (result, length, currentAddress, nextAddress);
}
}

/// <summary>
/// Asynchronously consume the log with given consumer until end of iteration or cancelled
/// </summary>
/// <param name="consumer"> consumer </param>
/// <param name="token"> cancellation token </param>
/// <typeparam name="T"> consumer type </typeparam>
public async Task ConsumeAllAsync<T>(T consumer, CancellationToken token = default) where T : ILogEntryConsumer
{
while (!disposed)
{
while (!TryConsumeNext(consumer))
{
if (!await WaitAsync(token).ConfigureAwait(false)) return;
}
}
}

/// <summary>
/// Wait for iteration to be ready to continue
/// </summary>
Expand Down Expand Up @@ -144,6 +159,7 @@ private static async ValueTask<bool> SlowWaitAsync(FasterLogScanIterator @this,
{
if (@this.disposed)
return false;
if (@this.Ended) return false;
var commitTask = @this.fasterLog.CommitTask;
if (@this.NextAddress < @this.fasterLog.CommittedUntilAddress)
return true;
Expand All @@ -163,6 +179,7 @@ private static async ValueTask<bool> SlowWaitUncommittedAsync(FasterLogScanItera
{
if (@this.disposed)
return false;
if (@this.Ended) return false;
var refreshUncommittedTask = @this.fasterLog.RefreshUncommittedTask;
if (@this.NextAddress < @this.fasterLog.SafeTailAddress)
return true;
Expand Down Expand Up @@ -351,6 +368,63 @@ public unsafe bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry,
}
}


/// <summary>
/// Consume the next entry in the log with the given consumer
/// </summary>
/// <param name="consumer">consumer</param>
/// <typeparam name="T">concrete type of consumer</typeparam>
/// <returns>whether a next entry is present</returns>
public unsafe bool TryConsumeNext<T>(T consumer) where T : ILogEntryConsumer
{
if (disposed)
{
currentAddress = default;
nextAddress = default;
return false;
}

epoch.Resume();
// Continue looping until we find a record that is not a commit record
while (true)
{
long physicalAddress;
bool isCommitRecord;
int entryLength;
try
{
var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress,
out nextAddress,
out isCommitRecord);
if (!hasNext)
{
epoch.Suspend();
return false;
}
}
catch (Exception)
{
// Throw upwards, but first, suspend the epoch we are in
epoch.Suspend();
throw;
}

if (isCommitRecord)
{
FasterLogRecoveryInfo info = new();
info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength)));
if (info.CommitNum != long.MaxValue) continue;

// Otherwise, no more entries
epoch.Suspend();
return false;
}
consumer.Consume(new ReadOnlySpan<byte>((void*)(headerSize + physicalAddress), entryLength), currentAddress, nextAddress);
epoch.Suspend();
return true;
}
}

/// <summary>
/// WARNING: advanced users only.
/// Get next record in iterator, accessing unsafe raw bytes and retaining epoch protection.
Expand Down
20 changes: 20 additions & 0 deletions cs/src/core/FasterLog/ILogEnqueueEntry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

namespace FASTER.core
{
/// <summary>
/// Represents a entry that can be serialized directly onto FasterLog when enqueuing
/// </summary>
public interface ILogEnqueueEntry
{
/// <summary></summary>
/// <returns> the size in bytes after serialization onto FasterLog</returns>
public int SerializedLength { get; }

/// <summary>
/// Serialize the entry onto FasterLog.
/// </summary>
/// <param name="dest">Memory buffer of FasterLog to serialize onto. Guaranteed to have at least SerializedLength() many bytes</param>
public void SerializeTo(Span<byte> dest);
}
}
18 changes: 18 additions & 0 deletions cs/src/core/FasterLog/ILogEntryConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace FASTER.core
{
/// <summary>
/// Consumes a FasterLog entry without copying
/// </summary>
public interface ILogEntryConsumer
{
/// <summary>
/// Consumes the given entry.
/// </summary>
/// <param name="entry"> the entry to consume </param>
/// <param name="currentAddress"> address of the consumed entry </param>
/// <param name="nextAddress"> (predicted) address of the next entry </param>
public void Consume(ReadOnlySpan<byte> entry, long currentAddress, long nextAddress);
}
}
25 changes: 22 additions & 3 deletions cs/test/EnqueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,18 @@ public enum EnqueueIteratorType
{
Byte,
SpanBatch,
SpanByte
SpanByte,
IEntry
}

private class ByteArrayEnqueueEntry : ILogEnqueueEntry
{
public int SerializedLength => entry.Length;

public void SerializeTo(Span<byte> dest)
{
entry.CopyTo(dest);
}
}

private struct ReadOnlySpanBatch : IReadOnlySpanBatch
Expand Down Expand Up @@ -83,7 +94,7 @@ public void EnqueueBasicTest([Values] EnqueueIteratorType iteratorType, [Values]
}

ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(numEntries);

var ientry = new ByteArrayEnqueueEntry();
// Enqueue but set each Entry in a way that can differentiate between entries
for (int i = 0; i < numEntries; i++)
{
Expand All @@ -110,6 +121,9 @@ public void EnqueueBasicTest([Values] EnqueueIteratorType iteratorType, [Values]
case EnqueueIteratorType.SpanBatch:
log.Enqueue(spanBatch);
break;
case EnqueueIteratorType.IEntry:
log.Enqueue(ientry);
break;
default:
Assert.Fail("Unknown EnqueueIteratorType");
break;
Expand Down Expand Up @@ -154,7 +168,7 @@ public void EnqueueBasicTest([Values] EnqueueIteratorType iteratorType, [Values]
public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType)
{

const int expectedEntryCount = 10;
const int expectedEntryCount = 11;

string filename = path + "EnqueueAsyncBasic" + deviceType.ToString() + ".log";
device = TestUtils.CreateTestDevice(deviceType, filename);
Expand All @@ -167,6 +181,7 @@ public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType
CancellationToken cancellationToken = default;
ReadOnlyMemory<byte> readOnlyMemoryEntry = entry;
ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(5);
var ientry = new ByteArrayEnqueueEntry();

var input1 = new byte[] { 0, 1, 2, 3 };
var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
Expand All @@ -177,6 +192,7 @@ public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType
await log.EnqueueAsync(input2);
await log.EnqueueAsync(input3);
await log.EnqueueAsync(readOnlyMemoryEntry);
await log.EnqueueAsync(ientry);
await log.EnqueueAsync(spanBatch);
await log.CommitAsync();

Expand Down Expand Up @@ -206,6 +222,9 @@ public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType
case 5:
Assert.IsTrue(result.SequenceEqual(entry), "Fail - Result does not equal SpanBatchEntry. result[0]=" + result[0].ToString() + " result[1]=" + result[1].ToString());
break;
case 6:
Assert.IsTrue(result.SequenceEqual(entry), "Fail - Result does not equal SpanBatchEntry. result[0]=" + result[0].ToString() + " result[1]=" + result[1].ToString());
break;

}
currentEntry++;
Expand Down
40 changes: 40 additions & 0 deletions cs/test/FasterLogScanTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using FASTER.core;
using NUnit.Framework;

Expand Down Expand Up @@ -129,7 +131,45 @@ public void ScanBasicDefaultTest([Values] TestUtils.DeviceType deviceType)

// Make sure expected length is same as current - also makes sure that data verification was not skipped
Assert.AreEqual(entryLength, currentEntry);
}

internal class TestConsumer : ILogEntryConsumer
{
internal int currentEntry = 0;

public void Consume(ReadOnlySpan<byte> entry, long currentAddress, long nextAddress)
{
if (currentEntry < entryLength)
{
// Span Batch only added first entry several times so have separate verification
Assert.AreEqual((byte)entryFlag, entry[currentEntry]);
currentEntry++;
}
}
}
[Test]
[Category("FasterLog")]
[Category("Smoke")]
public void ScanConsumerTest([Values] TestUtils.DeviceType deviceType)
{
// Create log and device here (not in setup) because using DeviceType Enum which can't be used in Setup
string filename = path + "LogScanDefault" + deviceType.ToString() + ".log";
device = TestUtils.CreateTestDevice(deviceType, filename);
log = new FasterLog(new FasterLogSettings { LogDevice = device, SegmentSizeBits = 22, LogCommitDir = path });
PopulateLog(log);

// Basic default scan from start to end
// Indirectly used in other tests, but good to have the basic test here for completeness

// Read the log - Look for the flag so know each entry is unique
var consumer = new TestConsumer();
using (var iter = log.Scan(0, 100_000_000))
{
while (iter.TryConsumeNext(consumer)) {}
}

// Make sure expected length is same as current - also makes sure that data verification was not skipped
Assert.AreEqual(entryLength, consumer.currentEntry);
}

[Test]
Expand Down
76 changes: 75 additions & 1 deletion cs/test/FasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using NUnit.Framework;

namespace FASTER.test
Expand Down Expand Up @@ -119,7 +120,7 @@ public enum IteratorType
{
AsyncByteVector,
AsyncMemoryOwner,
Sync
Sync,
}

internal static bool IsAsync(IteratorType iterType) => iterType == IteratorType.AsyncByteVector || iterType == IteratorType.AsyncMemoryOwner;
Expand Down Expand Up @@ -235,6 +236,79 @@ public async ValueTask FasterLogTest1([Values] LogChecksumType logChecksum, [Val
}
Assert.AreEqual(numEntries, counter.count);
}


internal class TestConsumer : ILogEntryConsumer
{
private Counter counter;
private byte[] entry;

internal TestConsumer(Counter counter, byte[] entry)
{
this.counter = counter;
this.entry = entry;
}

public void Consume(ReadOnlySpan<byte> result, long currentAddress, long nextAddress)
{
Assert.IsTrue(result.SequenceEqual(entry));
counter.IncrementAndMaybeTruncateUntil(nextAddress);

}
}

[Test]
[Category("FasterLog")]
public async ValueTask FasterLogConsumerTest([Values] LogChecksumType logChecksum)
{
device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true);
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false };
log = new FasterLog(logSettings);

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
entry[i] = (byte)i;

for (int i = 0; i < numEntries; i++)
{
log.Enqueue(entry);
}
log.Commit(true);

using var iter = log.Scan(0, long.MaxValue);
var counter = new Counter(log);
var consumer = new TestConsumer(counter, entry);

while (iter.TryConsumeNext(consumer)) {}

Assert.AreEqual(numEntries, counter.count);
}

[Test]
[Category("FasterLog")]
public async ValueTask FasterLogAsyncConsumerTest([Values] LogChecksumType logChecksum)
{
device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true);
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false };
log = await FasterLog.CreateAsync(logSettings);

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
entry[i] = (byte)i;

for (int i = 0; i < numEntries; i++)
{
log.Enqueue(entry);
}
log.Commit(true);
log.CompleteLog(true);

using var iter = log.Scan(0, long.MaxValue);
var counter = new Counter(log);
var consumer = new TestConsumer(counter, entry);
await iter.ConsumeAllAsync(consumer);
Assert.AreEqual(numEntries, counter.count);
}
}


Expand Down