diff --git a/cs/samples/FasterLogSample/Program.cs b/cs/samples/FasterLogSample/Program.cs index 12b815982..a00ca4d2b 100644 --- a/cs/samples/FasterLogSample/Program.cs +++ b/cs/samples/FasterLogSample/Program.cs @@ -156,8 +156,7 @@ static void ScanThread() { while (!iter.GetNext(out result, out _, out _)) { - // For finite end address, check if iteration ended - // if (currentAddress >= endAddress) return; + if (iter.Ended) return; iter.WaitAsync().AsTask().GetAwaiter().GetResult(); } diff --git a/cs/src/core/FasterLog/FasterLog.cs b/cs/src/core/FasterLog/FasterLog.cs index ae3e1ac55..475644ffe 100644 --- a/cs/src/core/FasterLog/FasterLog.cs +++ b/cs/src/core/FasterLog/FasterLog.cs @@ -29,6 +29,7 @@ public sealed class FasterLog : IDisposable readonly int headerSize; readonly LogChecksumType logChecksum; readonly WorkQueueLIFO commitQueue; + internal readonly bool readOnlyMode; internal readonly bool fastCommitMode; internal readonly bool tolerateDeviceFailure; @@ -132,7 +133,7 @@ public FasterLog(FasterLogSettings logSettings) new DefaultCheckpointNamingScheme( logSettings.LogCommitDir ?? new FileInfo(logSettings.LogDevice.FileName).Directory.FullName), - logSettings.ReadOnlyMode ? false : logSettings.RemoveOutdatedCommits); + !logSettings.ReadOnlyMode && logSettings.RemoveOutdatedCommits); if (logSettings.LogCommitManager == null) disposeLogCommitManager = true; @@ -217,6 +218,37 @@ public void Dispose() TrueDispose(); } + /// + /// Mark the log as complete. A completed log will no longer allow enqueues, and all currently enqueued items will + /// be immediately committed. + /// + /// whether to spin until log completion becomes committed + public void CompleteLog(bool spinWait = false) + { + + // Ensure all currently started entries will enqueue before we declare log closed + epoch.BumpCurrentEpoch(() => + { + CommitInternal(out _, out _, false, null, long.MaxValue, null); + }); + + // Ensure progress even if there is no thread in epoch table + if (!epoch.ThisInstanceProtected()) + { + epoch.Resume(); + epoch.Suspend(); + } + + if (spinWait) + WaitForCommit(TailAddress, long.MaxValue); + } + + /// + /// Check if the log is complete. A completed log will no longer allow enqueues, and all currently enqueued items will + /// be immediately committed. + /// + public bool LogCompleted => commitNum == long.MaxValue; + internal void TrueDispose() { commitQueue.Dispose(); @@ -285,6 +317,8 @@ public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) epoch.Resume(); + if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) { @@ -317,6 +351,8 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) epoch.Resume(); + if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) { @@ -1463,6 +1499,7 @@ private void RestoreLatest(out Dictionary iterators, out byte[] co cookie = info.Cookie; commitNum = info.CommitNum; beginAddress = allocator.BeginAddress; + if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); if (readOnlyMode) allocator.HeadAddress = long.MaxValue; @@ -1641,6 +1678,8 @@ private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long log ValidateAllocatedLength(allocatedLength); epoch.Resume(); + if (commitNum == long.MaxValue) throw new FasterException("Attempting to enqueue into a completed log"); + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); if (logicalAddress == 0) @@ -1816,7 +1855,7 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool { FastForwardAllowed = fastForwardAllowed, Cookie = cookie, - Callback = callback + Callback = callback, }; info.SnapshotIterators(PersistedIterators); var metadataChanged = ShouldCommmitMetadata(ref info); @@ -1824,11 +1863,20 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool if (fastForwardAllowed && !commitPolicy.AdmitCommit(TailAddress, metadataChanged)) return false; + // This critical section serializes commit record creation / commit content generation and ensures that the + // long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the + // commit code path lock (ongoingCommitRequests) { if (commitCoveredAddress == TailAddress && !metadataChanged) // Nothing to commit if no metadata update and no new entries return false; + if (commitNum == long.MaxValue) + { + // log has been closed, throw an exception + throw new FasterException("log has already been closed"); + } + // Make sure we will not be allowed to back out of a commit if AdmitCommit returns true, as the commit policy // may need to update internal logic for every true response. We might waste some commit nums if commit // policy filters out a lot of commits, but that's fine. @@ -1840,9 +1888,7 @@ private bool CommitInternal(out long commitTail, out long actualCommitNum, bool // Invalid commit num return false; - // This critical section serializes commit record creation / commit content generation and ensures that the - // long address are sorted in outstandingCommitRecords. Ok because we do not expect heavy contention on the - // commit code path + // Normally --- only need commit records if fast committing. if (fastCommitMode) { // Ok to retry in critical section, any concurrently invoked commit would block, but cannot progress diff --git a/cs/src/core/FasterLog/FasterLogIterator.cs b/cs/src/core/FasterLog/FasterLogIterator.cs index 946d2998f..496c8be37 100644 --- a/cs/src/core/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/FasterLog/FasterLogIterator.cs @@ -32,6 +32,12 @@ public sealed class FasterLogScanIterator : ScanIteratorBase, IDisposable /// public long CompletedUntilAddress; + /// + /// Whether iteration has ended, either because we reached the end address of iteration, or because + /// we reached the end of a completed log. + /// + public bool Ended => (currentAddress >= endAddress) || (fasterLog.LogCompleted && currentAddress == fasterLog.TailAddress); + /// /// Constructor /// @@ -75,11 +81,11 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator= endAddress) - yield break; + if (Ended) yield break; if (!await WaitAsync(token).ConfigureAwait(false)) yield break; } + yield return (result, length, currentAddress, nextAddress); } } @@ -98,8 +104,7 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator= endAddress) - yield break; + if (Ended) yield break; if (!await WaitAsync(token).ConfigureAwait(false)) yield break; } @@ -226,9 +231,20 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre epoch.Suspend(); throw; } - - if (isCommitRecord) continue; - + + if (isCommitRecord) + { + FasterLogRecoveryInfo info = new(); + info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength))); + if (info.CommitNum != long.MaxValue) continue; + + // Otherwise, no more entries + entry = default; + entryLength = default; + epoch.Suspend(); + return false; + } + if (getMemory != null) { // Use user delegate to allocate memory @@ -247,7 +263,7 @@ public unsafe bool GetNext(out byte[] entry, out int entryLength, out long curre fixed (byte* bp = entry) Buffer.MemoryCopy((void*) (headerSize + physicalAddress), bp, entryLength, entryLength); - + epoch.Suspend(); return true; } @@ -304,6 +320,7 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry, epoch.Suspend(); return false; } + } catch (Exception) { @@ -311,14 +328,24 @@ public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry, epoch.Suspend(); throw; } - - if (isCommitRecord) continue; - + + if (isCommitRecord) + { + FasterLogRecoveryInfo info = new(); + info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength))); + if (info.CommitNum != long.MaxValue) continue; + + // Otherwise, no more entries + entry = default; + entryLength = default; + epoch.Suspend(); + return false; + } + entry = pool.Rent(entryLength); fixed (byte* bp = &entry.Memory.Span.GetPinnableReference()) Buffer.MemoryCopy((void*)(headerSize + physicalAddress), bp, entryLength, entryLength); - epoch.Suspend(); return true; } @@ -368,9 +395,20 @@ public unsafe bool UnsafeGetNext(out byte* entry, out int entryLength, out long epoch.Suspend(); throw; } - - if (isCommitRecord) continue; - + + if (isCommitRecord) + { + FasterLogRecoveryInfo info = new(); + info.Initialize(new BinaryReader(new UnmanagedMemoryStream((byte *)physicalAddress, entryLength))); + if (info.CommitNum != long.MaxValue) continue; + + // Otherwise, no more entries + entry = default; + entryLength = default; + epoch.Suspend(); + return false; + } + entry = (byte*)(headerSize + physicalAddress); return true; } @@ -467,7 +505,7 @@ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private int Align(int length) + private static int Align(int length) { return (length + 3) & ~3; } @@ -619,7 +657,6 @@ private unsafe bool GetNextInternal(out long physicalAddress, out int entryLengt { if (!fasterLog.VerifyChecksum((byte*)physicalAddress, entryLength)) { - var curPage = currentAddress >> allocator.LogPageSizeBits; currentAddress += headerSize; if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _)) { diff --git a/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs b/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs index 38674bce4..fd043a2fe 100644 --- a/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs +++ b/cs/src/core/FasterLog/FasterLogRecoveryInfo.cs @@ -18,7 +18,7 @@ public struct FasterLogRecoveryInfo /// FasterLog recovery version /// const int FasterLogRecoveryVersion = 1; - + /// /// Begin address /// diff --git a/cs/test/DeviceFasterLogTests.cs b/cs/test/DeviceFasterLogTests.cs index 93188e179..4d294034b 100644 --- a/cs/test/DeviceFasterLogTests.cs +++ b/cs/test/DeviceFasterLogTests.cs @@ -107,12 +107,13 @@ private async ValueTask FasterLogTest1(LogChecksumType logChecksum, IDevice devi { log.Enqueue(entry); } - log.Commit(true); + + log.CompleteLog(true); // MoveNextAsync() would hang at TailAddress, waiting for more entries (that we don't add). // Note: If this happens and the test has to be canceled, there may be a leftover blob from the log.Commit(), because // the log device isn't Dispose()d; the symptom is currently a numeric string format error in DefaultCheckpointNamingScheme. - using (var iter = log.Scan(0, log.TailAddress)) + using (var iter = log.Scan(0, long.MaxValue)) { var counter = new FasterLogTestBase.Counter(log); diff --git a/cs/test/FasterLogResumeTests.cs b/cs/test/FasterLogResumeTests.cs index efb1fc3a2..94a6bf787 100644 --- a/cs/test/FasterLogResumeTests.cs +++ b/cs/test/FasterLogResumeTests.cs @@ -112,7 +112,6 @@ public async Task FasterLogResumePersistedReader2([Values] LogChecksumType logCh using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(path), removeOutdated)) { - long originalCompleted; using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))