diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index d3b693ca8..191ff1e27 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -241,7 +241,7 @@ public void CompleteLog(bool spinWait = false) // Ensure all currently started entries will enqueue before we declare log closed epoch.BumpCurrentEpoch(() => { - CommitInternal(out _, out _, false, null, long.MaxValue, null); + CommitInternal(out _, out _, false, Array.Empty(), long.MaxValue, null); }); // Ensure progress even if there is no thread in epoch table @@ -310,9 +310,117 @@ public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch) Thread.Yield(); return logicalAddress; } + + /// + /// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit + /// + /// Entry to be enqueued to log + /// type of entry + /// Logical address of added entry + public long Enqueue(T entry) where T : ILogEnqueueEntry + { + long logicalAddress; + while (!TryEnqueue(entry, out logicalAddress)) + Thread.Yield(); + return logicalAddress; + } + + /// + /// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit + /// + /// Batch of entries to be enqueued to log + /// type of entry + /// Logical address of added entry + public long Enqueue(IEnumerable entries) where T : ILogEnqueueEntry + { + long logicalAddress; + while (!TryEnqueue(entries, out logicalAddress)) + Thread.Yield(); + return logicalAddress; + } #endregion #region TryEnqueue + /// + /// Try to enqueue entry to log (in memory). If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// Entry to be enqueued to log + /// Logical address of added entry + /// type of entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(T entry, out long logicalAddress) where T : ILogEnqueueEntry + { + logicalAddress = 0; + var length = entry.SerializedLength; + int allocatedLength = headerSize + Align(length); + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + + if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + if (logicalAddress == 0) + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + entry.SerializeTo(new Span((void *) (headerSize + physicalAddress), length)); + SetHeader(length, (byte*)physicalAddress); + epoch.Suspend(); + return true; + } + + /// + /// Try to enqueue batch of entries as a single atomic unit (to memory). Entire + /// batch needs to fit on one log page. + /// + /// Batch to be appended to log + /// Logical address of first added entry + /// type of entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(IEnumerable entries, out long logicalAddress) where T : ILogEnqueueEntry + { + logicalAddress = 0; + + var allocatedLength = 0; + foreach (var entry in entries) + { + allocatedLength += Align(entry.SerializedLength) + headerSize; + } + + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + foreach(var entry in entries) + { + var length = entry.SerializedLength; + entry.SerializeTo(new Span((void *)(headerSize + physicalAddress), length)); + SetHeader(length, (byte*)physicalAddress); + physicalAddress += Align(length) + headerSize; + } + + epoch.Suspend(); + return true; + } + /// /// Try to enqueue entry to log (in memory). If it returns true, we are /// done. If it returns false, we need to retry. @@ -333,6 +441,7 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) + if (logicalAddress == 0) { epoch.Suspend(); if (cannedException != null) throw cannedException; @@ -419,6 +528,39 @@ public unsafe bool TryEnqueue(THeader userHeader, ref SpanByte item1, r return true; } + /// + /// Try to append a user-defined header byte and a SpanByte entry atomically to the log. If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// + /// + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(byte userHeader, ref SpanByte item, out long logicalAddress) + { + logicalAddress = 0; + var length = sizeof(byte) + item.TotalSize; + int allocatedLength = headerSize + Align(length); + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = (byte*)allocator.GetPhysicalAddress(logicalAddress); + *physicalAddress = userHeader; + item.CopyTo(physicalAddress + sizeof(byte)); + SetHeader(length, physicalAddress); + epoch.Suspend(); + return true; + } + /// /// Try to enqueue batch of entries as a single atomic unit (to memory). Entire /// batch needs to fit on one log page. @@ -537,6 +679,80 @@ private static async ValueTask SlowEnqueueAsync(FasterLog @this, IReadOnly return logicalAddress; } + + /// + /// Enqueue entry to log in memory (async) - completes after entry is + /// appended to memory, NOT committed to storage. + /// + /// Entry to enqueue + /// Cancellation token + /// type of entry + /// Logical address of added entry + public ValueTask EnqueueAsync(T entry, CancellationToken token = default) where T : ILogEnqueueEntry + { + token.ThrowIfCancellationRequested(); + if (TryEnqueue(entry, out long logicalAddress)) + return new ValueTask(logicalAddress); + + return SlowEnqueueAsync(this, entry, token); + } + + private static async ValueTask SlowEnqueueAsync(FasterLog @this, T entry, CancellationToken token) + where T : ILogEnqueueEntry + { + long logicalAddress; + while (true) + { + var flushEvent = @this.FlushEvent; + if (@this.TryEnqueue(entry, out logicalAddress)) + break; + // Wait for *some* flush - failure can be ignored except if the token was signaled (which the caller should handle correctly) + try + { + await flushEvent.WaitAsync(token).ConfigureAwait(false); + } + catch when (!token.IsCancellationRequested) { } + } + + return logicalAddress; + } + + /// + /// Enqueue batch of entries to log in memory (async) - completes after entry is + /// appended to memory, NOT committed to storage. + /// + /// Entry to enqueue + /// Cancellation token + /// type of entry + /// Logical address of first added entry + public ValueTask EnqueueAsync(IEnumerable entries, CancellationToken token = default) where T : ILogEnqueueEntry + { + token.ThrowIfCancellationRequested(); + if (TryEnqueue(entries, out long logicalAddress)) + return new ValueTask(logicalAddress); + + return SlowEnqueueAsync(this, entries, token); + } + + private static async ValueTask SlowEnqueueAsync(FasterLog @this, IEnumerable entry, CancellationToken token) + where T : ILogEnqueueEntry + { + long logicalAddress; + while (true) + { + var flushEvent = @this.FlushEvent; + if (@this.TryEnqueue(entry, out logicalAddress)) + break; + // Wait for *some* flush - failure can be ignored except if the token was signaled (which the caller should handle correctly) + try + { + await flushEvent.WaitAsync(token).ConfigureAwait(false); + } + catch when (!token.IsCancellationRequested) { } + } + + return logicalAddress; + } #endregion #region WaitForCommit and WaitForCommitAsync @@ -790,6 +1006,38 @@ public long EnqueueAndWaitForCommit(IReadOnlySpanBatch readOnlySpanBatch) WaitForCommit(logicalAddress + 1); return logicalAddress; } + + /// + /// Append entry to log - spin-waits until entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// Entry to be enqueued to log + /// type of entry + /// Logical address of added entry + public long EnqueueAndWaitForCommit(T entry) where T : ILogEnqueueEntry + { + long logicalAddress; + while (!TryEnqueue(entry, out logicalAddress)) + Thread.Yield(); + WaitForCommit(logicalAddress + 1); + return logicalAddress; + } + + /// + /// Append entry to log - spin-waits until entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// Entries to be enqueued to log + /// type of entry + /// Logical address of first added entry + public long EnqueueAndWaitForCommit(IEnumerable entries) where T : ILogEnqueueEntry + { + long logicalAddress; + while (!TryEnqueue(entries, out logicalAddress)) + Thread.Yield(); + WaitForCommit(logicalAddress + 1); + return logicalAddress; + } #endregion @@ -950,6 +1198,113 @@ public async ValueTask EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch rea return logicalAddress; } + + /// + /// Append entry to log (async) - completes after entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// Entry to enqueue + /// Cancellation token + /// type of entry + /// Logical address of added entry + public async ValueTask EnqueueAndWaitForCommitAsync(T entry, CancellationToken token = default) where T : ILogEnqueueEntry + { + token.ThrowIfCancellationRequested(); + long logicalAddress; + CompletionEvent flushEvent; + Task commitTask; + + // Phase 1: wait for commit to memory + while (true) + { + flushEvent = FlushEvent; + commitTask = CommitTask; + if (TryEnqueue(entry, out logicalAddress)) + break; + try + { + await flushEvent.WaitAsync(token).ConfigureAwait(false); + } + catch when (!token.IsCancellationRequested) { } + } + + // Phase 2: wait for commit/flush to storage + // Since the task object was read before enqueueing, there is no need for the CommittedUntilAddress >= logicalAddress check like in WaitForCommit + while (true) + { + LinkedCommitInfo linkedCommitInfo; + try + { + linkedCommitInfo = await commitTask.WithCancellationAsync(token).ConfigureAwait(false); + } + catch (CommitFailureException e) + { + linkedCommitInfo = e.LinkedCommitInfo; + if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) + throw; + } + if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) + commitTask = linkedCommitInfo.NextTask; + else + break; + } + + return logicalAddress; + } + + /// + /// Append batch of entries to log (async) - completes after batch is committed to storage. + /// Does NOT itself issue flush! + /// + /// entries to enqueue + /// Cancellation token + /// type of entry + /// Logical address of added entry + public async ValueTask EnqueueAndWaitForCommitAsync(IEnumerable entries, + CancellationToken token = default) where T : ILogEnqueueEntry + { + token.ThrowIfCancellationRequested(); + long logicalAddress; + CompletionEvent flushEvent; + Task commitTask; + + // Phase 1: wait for commit to memory + while (true) + { + flushEvent = FlushEvent; + commitTask = CommitTask; + if (TryEnqueue(entries, out logicalAddress)) + break; + try + { + await flushEvent.WaitAsync(token).ConfigureAwait(false); + } + catch when (!token.IsCancellationRequested) { } + } + + // Phase 2: wait for commit/flush to storage + // Since the task object was read before enqueueing, there is no need for the CommittedUntilAddress >= logicalAddress check like in WaitForCommit + while (true) + { + LinkedCommitInfo linkedCommitInfo; + try + { + linkedCommitInfo = await commitTask.WithCancellationAsync(token).ConfigureAwait(false); + } + catch (CommitFailureException e) + { + linkedCommitInfo = e.LinkedCommitInfo; + if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) + throw; + } + if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) + commitTask = linkedCommitInfo.NextTask; + else + break; + } + + return logicalAddress; + } #endregion /// diff --git a/cs/src/core/FasterLog/FasterLogIterator.cs b/cs/src/core/FasterLog/FasterLogIterator.cs index 7af79bb73..789503b90 100644 --- a/cs/src/core/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/FasterLog/FasterLogIterator.cs @@ -81,7 +81,6 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator + /// Asynchronously consume the log with given consumer until end of iteration or cancelled + /// + /// consumer + /// cancellation token + /// consumer type + public async Task ConsumeAllAsync(T consumer, CancellationToken token = default) where T : ILogEntryConsumer + { + while (!disposed) + { + while (!TryConsumeNext(consumer)) + { + if (!await WaitAsync(token).ConfigureAwait(false)) return; + } + } + } + /// /// Wait for iteration to be ready to continue /// @@ -144,6 +159,7 @@ private static async ValueTask SlowWaitAsync(FasterLogScanIterator @this, { if (@this.disposed) return false; + if (@this.Ended) return false; var commitTask = @this.fasterLog.CommitTask; if (@this.NextAddress < @this.fasterLog.CommittedUntilAddress) return true; @@ -163,6 +179,7 @@ private static async ValueTask SlowWaitUncommittedAsync(FasterLogScanItera { if (@this.disposed) return false; + if (@this.Ended) return false; var refreshUncommittedTask = @this.fasterLog.RefreshUncommittedTask; if (@this.NextAddress < @this.fasterLog.SafeTailAddress) return true; @@ -351,6 +368,63 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry, } } + + /// + /// Consume the next entry in the log with the given consumer + /// + /// consumer + /// concrete type of consumer + /// whether a next entry is present + public unsafe bool TryConsumeNext(T consumer) where T : ILogEntryConsumer + { + if (disposed) + { + currentAddress = default; + nextAddress = default; + return false; + } + + epoch.Resume(); + // Continue looping until we find a record that is not a commit record + while (true) + { + long physicalAddress; + bool isCommitRecord; + int entryLength; + try + { + var hasNext = GetNextInternal(out physicalAddress, out entryLength, out currentAddress, + out nextAddress, + out isCommitRecord); + if (!hasNext) + { + epoch.Suspend(); + return false; + } + } + catch (Exception) + { + // Throw upwards, but first, suspend the epoch we are in + epoch.Suspend(); + throw; + } + + if (isCommitRecord) + { + FasterLogRecoveryInfo info = new(); + info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength))); + if (info.CommitNum != long.MaxValue) continue; + + // Otherwise, no more entries + epoch.Suspend(); + return false; + } + consumer.Consume(new ReadOnlySpan((void*)(headerSize + physicalAddress), entryLength), currentAddress, nextAddress); + epoch.Suspend(); + return true; + } + } + /// /// WARNING: advanced users only. /// Get next record in iterator, accessing unsafe raw bytes and retaining epoch protection. diff --git a/cs/src/core/FasterLog/ILogEnqueueEntry.cs b/cs/src/core/FasterLog/ILogEnqueueEntry.cs new file mode 100644 index 000000000..c2f21000b --- /dev/null +++ b/cs/src/core/FasterLog/ILogEnqueueEntry.cs @@ -0,0 +1,20 @@ +using System; + +namespace FASTER.core +{ + /// + /// Represents a entry that can be serialized directly onto FasterLog when enqueuing + /// + public interface ILogEnqueueEntry + { + /// + /// the size in bytes after serialization onto FasterLog + public int SerializedLength { get; } + + /// + /// Serialize the entry onto FasterLog. + /// + /// Memory buffer of FasterLog to serialize onto. Guaranteed to have at least SerializedLength() many bytes + public void SerializeTo(Span dest); + } +} \ No newline at end of file diff --git a/cs/src/core/FasterLog/ILogEntryConsumer.cs b/cs/src/core/FasterLog/ILogEntryConsumer.cs new file mode 100644 index 000000000..cbabc2ceb --- /dev/null +++ b/cs/src/core/FasterLog/ILogEntryConsumer.cs @@ -0,0 +1,18 @@ +using System; + +namespace FASTER.core +{ + /// + /// Consumes a FasterLog entry without copying + /// + public interface ILogEntryConsumer + { + /// + /// Consumes the given entry. + /// + /// the entry to consume + /// address of the consumed entry + /// (predicted) address of the next entry + public void Consume(ReadOnlySpan entry, long currentAddress, long nextAddress); + } +} \ No newline at end of file diff --git a/cs/test/EnqueueTests.cs b/cs/test/EnqueueTests.cs index fe770d158..761e420de 100644 --- a/cs/test/EnqueueTests.cs +++ b/cs/test/EnqueueTests.cs @@ -22,7 +22,18 @@ public enum EnqueueIteratorType { Byte, SpanBatch, - SpanByte + SpanByte, + IEntry + } + + private class ByteArrayEnqueueEntry : ILogEnqueueEntry + { + public int SerializedLength => entry.Length; + + public void SerializeTo(Span dest) + { + entry.CopyTo(dest); + } } private struct ReadOnlySpanBatch : IReadOnlySpanBatch @@ -83,7 +94,7 @@ public void EnqueueBasicTest([Values] EnqueueIteratorType iteratorType, [Values] } ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(numEntries); - + var ientry = new ByteArrayEnqueueEntry(); // Enqueue but set each Entry in a way that can differentiate between entries for (int i = 0; i < numEntries; i++) { @@ -110,6 +121,9 @@ public void EnqueueBasicTest([Values] EnqueueIteratorType iteratorType, [Values] case EnqueueIteratorType.SpanBatch: log.Enqueue(spanBatch); break; + case EnqueueIteratorType.IEntry: + log.Enqueue(ientry); + break; default: Assert.Fail("Unknown EnqueueIteratorType"); break; @@ -154,7 +168,7 @@ public void EnqueueBasicTest([Values] EnqueueIteratorType iteratorType, [Values] public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType) { - const int expectedEntryCount = 10; + const int expectedEntryCount = 11; string filename = path + "EnqueueAsyncBasic" + deviceType.ToString() + ".log"; device = TestUtils.CreateTestDevice(deviceType, filename); @@ -167,6 +181,7 @@ public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType CancellationToken cancellationToken = default; ReadOnlyMemory readOnlyMemoryEntry = entry; ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(5); + var ientry = new ByteArrayEnqueueEntry(); var input1 = new byte[] { 0, 1, 2, 3 }; var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 }; @@ -177,6 +192,7 @@ public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType await log.EnqueueAsync(input2); await log.EnqueueAsync(input3); await log.EnqueueAsync(readOnlyMemoryEntry); + await log.EnqueueAsync(ientry); await log.EnqueueAsync(spanBatch); await log.CommitAsync(); @@ -206,6 +222,9 @@ public async Task EnqueueAsyncBasicTest([Values] TestUtils.DeviceType deviceType case 5: Assert.IsTrue(result.SequenceEqual(entry), "Fail - Result does not equal SpanBatchEntry. result[0]=" + result[0].ToString() + " result[1]=" + result[1].ToString()); break; + case 6: + Assert.IsTrue(result.SequenceEqual(entry), "Fail - Result does not equal SpanBatchEntry. result[0]=" + result[0].ToString() + " result[1]=" + result[1].ToString()); + break; } currentEntry++; diff --git a/cs/test/FasterLogScanTests.cs b/cs/test/FasterLogScanTests.cs index 1a2d82dab..038366919 100644 --- a/cs/test/FasterLogScanTests.cs +++ b/cs/test/FasterLogScanTests.cs @@ -1,5 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. + +using System; using FASTER.core; using NUnit.Framework; @@ -129,7 +131,45 @@ public void ScanBasicDefaultTest([Values] TestUtils.DeviceType deviceType) // Make sure expected length is same as current - also makes sure that data verification was not skipped Assert.AreEqual(entryLength, currentEntry); + } + internal class TestConsumer : ILogEntryConsumer + { + internal int currentEntry = 0; + + public void Consume(ReadOnlySpan entry, long currentAddress, long nextAddress) + { + if (currentEntry < entryLength) + { + // Span Batch only added first entry several times so have separate verification + Assert.AreEqual((byte)entryFlag, entry[currentEntry]); + currentEntry++; + } + } + } + [Test] + [Category("FasterLog")] + [Category("Smoke")] + public void ScanConsumerTest([Values] TestUtils.DeviceType deviceType) + { + // Create log and device here (not in setup) because using DeviceType Enum which can't be used in Setup + string filename = path + "LogScanDefault" + deviceType.ToString() + ".log"; + device = TestUtils.CreateTestDevice(deviceType, filename); + log = new FasterLog(new FasterLogSettings { LogDevice = device, SegmentSizeBits = 22, LogCommitDir = path }); + PopulateLog(log); + + // Basic default scan from start to end + // Indirectly used in other tests, but good to have the basic test here for completeness + + // Read the log - Look for the flag so know each entry is unique + var consumer = new TestConsumer(); + using (var iter = log.Scan(0, 100_000_000)) + { + while (iter.TryConsumeNext(consumer)) {} + } + + // Make sure expected length is same as current - also makes sure that data verification was not skipped + Assert.AreEqual(entryLength, consumer.currentEntry); } [Test] diff --git a/cs/test/FasterLogTests.cs b/cs/test/FasterLogTests.cs index 60ba8a154..9b6c0d67b 100644 --- a/cs/test/FasterLogTests.cs +++ b/cs/test/FasterLogTests.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using FASTER.core; +using Microsoft.VisualStudio.TestPlatform.ObjectModel; using NUnit.Framework; namespace FASTER.test @@ -119,7 +120,7 @@ public enum IteratorType { AsyncByteVector, AsyncMemoryOwner, - Sync + Sync, } internal static bool IsAsync(IteratorType iterType) => iterType == IteratorType.AsyncByteVector || iterType == IteratorType.AsyncMemoryOwner; @@ -235,6 +236,79 @@ public async ValueTask FasterLogTest1([Values] LogChecksumType logChecksum, [Val } Assert.AreEqual(numEntries, counter.count); } + + + internal class TestConsumer : ILogEntryConsumer + { + private Counter counter; + private byte[] entry; + + internal TestConsumer(Counter counter, byte[] entry) + { + this.counter = counter; + this.entry = entry; + } + + public void Consume(ReadOnlySpan result, long currentAddress, long nextAddress) + { + Assert.IsTrue(result.SequenceEqual(entry)); + counter.IncrementAndMaybeTruncateUntil(nextAddress); + + } + } + + [Test] + [Category("FasterLog")] + public async ValueTask FasterLogConsumerTest([Values] LogChecksumType logChecksum) + { + device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true); + var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false }; + log = new FasterLog(logSettings); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + for (int i = 0; i < numEntries; i++) + { + log.Enqueue(entry); + } + log.Commit(true); + + using var iter = log.Scan(0, long.MaxValue); + var counter = new Counter(log); + var consumer = new TestConsumer(counter, entry); + + while (iter.TryConsumeNext(consumer)) {} + + Assert.AreEqual(numEntries, counter.count); + } + + [Test] + [Category("FasterLog")] + public async ValueTask FasterLogAsyncConsumerTest([Values] LogChecksumType logChecksum) + { + device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true); + var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false }; + log = await FasterLog.CreateAsync(logSettings); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + for (int i = 0; i < numEntries; i++) + { + log.Enqueue(entry); + } + log.Commit(true); + log.CompleteLog(true); + + using var iter = log.Scan(0, long.MaxValue); + var counter = new Counter(log); + var consumer = new TestConsumer(counter, entry); + await iter.ConsumeAllAsync(consumer); + Assert.AreEqual(numEntries, counter.count); + } }