diff --git a/cs/samples/FasterLogPubSub/Program.cs b/cs/samples/FasterLogPubSub/Program.cs index b2bf3b480..29afae23f 100644 --- a/cs/samples/FasterLogPubSub/Program.cs +++ b/cs/samples/FasterLogPubSub/Program.cs @@ -12,10 +12,12 @@ namespace FasterLogPubSub { class Program { + const int commitPeriodMs = 2000; + const int restorePeriodMs = 1000; + static string path = Path.GetTempPath() + "FasterLogPubSub\\"; + static async Task Main() { - var path = Path.GetTempPath() + "FasterLogPubSub\\"; - var device = Devices.CreateLogDevice(path + "mylog"); var log = new FasterLog(new FasterLogSettings { LogDevice = device, MemorySizeBits = 11, PageSizeBits = 9, MutableFraction = 0.5, SegmentSizeBits = 9 }); @@ -23,9 +25,14 @@ static async Task Main() using var cts = new CancellationTokenSource(); var producer = ProducerAsync(log, cts.Token); - var commiter = CommiterAsync(log, cts.Token); - var consumer = ConsumerAsync(log, cts.Token); + var commiter = CommitterAsync(log, cts.Token); + + // Consumer on SAME FasterLog instance + var consumer = ConsumerAsync(log, true, cts.Token); + // Uncomment below to run consumer on SEPARATE read-only FasterLog instance + // var consumer = SeparateConsumerAsync(cts.Token); + Console.CancelKeyPress += (o, eventArgs) => { Console.WriteLine("Cancelling program..."); @@ -43,11 +50,11 @@ static async Task Main() try { new DirectoryInfo(path).Delete(true); } catch { } } - static async Task CommiterAsync(FasterLog log, CancellationToken cancellationToken) + static async Task CommitterAsync(FasterLog log, CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { - await Task.Delay(TimeSpan.FromMilliseconds(5000), cancellationToken); + await Task.Delay(TimeSpan.FromMilliseconds(commitPeriodMs), cancellationToken); Console.WriteLine("Committing..."); @@ -60,7 +67,7 @@ static async Task ProducerAsync(FasterLog log, CancellationToken cancellationTok var i = 0L; while (!cancellationToken.IsCancellationRequested) { - Console.WriteLine($"Producing {i}"); + // Console.WriteLine($"Producing {i}"); log.Enqueue(Encoding.UTF8.GetBytes(i.ToString())); log.RefreshUncommitted(); @@ -71,9 +78,9 @@ static async Task ProducerAsync(FasterLog log, CancellationToken cancellationTok } } - static async Task ConsumerAsync(FasterLog log, CancellationToken cancellationToken) + static async Task ConsumerAsync(FasterLog log, bool scanUncommitted, CancellationToken cancellationToken) { - using var iter = log.Scan(log.BeginAddress, long.MaxValue, "foo", true, ScanBufferingMode.DoublePageBuffering, scanUncommitted: true); + using var iter = log.Scan(log.BeginAddress, long.MaxValue, "foo", true, ScanBufferingMode.DoublePageBuffering, scanUncommitted); int count = 0; await foreach (var (result, length, currentAddress, nextAddress) in iter.GetAsyncEnumerable(cancellationToken)) @@ -89,5 +96,32 @@ static async Task ConsumerAsync(FasterLog log, CancellationToken cancellationTok } } + static async Task SeparateConsumerAsync(CancellationToken cancellationToken) + { + var device = Devices.CreateLogDevice(path + "mylog"); + var log = new FasterLog(new FasterLogSettings { LogDevice = device, ReadOnlyMode = true, PageSizeBits = 9, SegmentSizeBits = 9 }); + var _ = RecoverAsync(log, cancellationToken); + + using var iter = log.Scan(log.BeginAddress, long.MaxValue); + + await foreach (var (result, length, currentAddress, nextAddress) in iter.GetAsyncEnumerable(cancellationToken)) + { + Console.WriteLine($"Consuming {Encoding.UTF8.GetString(result)}"); + iter.CompleteUntil(nextAddress); + } + } + + static async Task RecoverAsync(FasterLog log, CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromMilliseconds(restorePeriodMs), cancellationToken); + + Console.WriteLine("Restoring ..."); + + log.RecoverReadOnly(); + } + } + } } diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index d46da3244..25aba8cfb 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -537,11 +537,6 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer if (SegmentSize < PageSize) throw new FasterException("Segment must be at least of page size"); - if (BufferSize < 1) - { - throw new FasterException("Log buffer must be of size at least 1 page"); - } - PageStatusIndicator = new FullPageStatus[BufferSize]; PendingFlush = new PendingFlushList[BufferSize]; for (int i = 0; i < BufferSize; i++) @@ -562,15 +557,18 @@ protected void Initialize(long firstValidAddress) bufferPool = new SectorAlignedBufferPool(1, sectorSize); - long tailPage = firstValidAddress >> LogPageSizeBits; - int tailPageIndex = (int)(tailPage % BufferSize); - AllocatePage(tailPageIndex); - - // Allocate next page as well - int nextPageIndex = (int)(tailPage + 1) % BufferSize; - if ((!IsAllocated(nextPageIndex))) + if (BufferSize > 0) { - AllocatePage(nextPageIndex); + long tailPage = firstValidAddress >> LogPageSizeBits; + int tailPageIndex = (int)(tailPage % BufferSize); + AllocatePage(tailPageIndex); + + // Allocate next page as well + int nextPageIndex = (int)(tailPage + 1) % BufferSize; + if ((!IsAllocated(nextPageIndex))) + { + AllocatePage(nextPageIndex); + } } if (PreallocateLog) diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index ca224c021..f1e7d3b8c 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -27,6 +27,8 @@ public class FasterLog : IDisposable private readonly GetMemory getMemory; private readonly int headerSize; private readonly LogChecksumType logChecksum; + internal readonly bool readOnlyMode; + private TaskCompletionSource commitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); private TaskCompletionSource refreshUncommittedTcs @@ -80,7 +82,7 @@ private TaskCompletionSource refreshUncommittedTcs /// /// Table of persisted iterators /// - internal readonly ConcurrentDictionary PersistedIterators + internal readonly ConcurrentDictionary PersistedIterators = new ConcurrentDictionary(); /// @@ -128,6 +130,14 @@ public FasterLog(FasterLogSettings logSettings) logSettings.GetLogSettings(), null, null, epoch, CommitCallback); allocator.Initialize(); + + // FasterLog is used as a read-only iterator + if (logSettings.ReadOnlyMode) + { + readOnlyMode = true; + allocator.HeadAddress = long.MaxValue; + } + Restore(out RecoveredIterators); } @@ -754,6 +764,16 @@ public void TruncateUntilPageStart(long untilAddress) /// public FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = null, bool recover = true, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering, bool scanUncommitted = false) { + if (readOnlyMode) + { + scanBufferingMode = ScanBufferingMode.SinglePageBuffering; + + if (name != null) + throw new FasterException("Cannot used named iterators with read-only FasterLog"); + if (scanUncommitted) + throw new FasterException("Cannot used scanUncommitted with read-only FasterLog"); + } + FasterLogScanIterator iter; if (recover && name != null && RecoveredIterators != null && RecoveredIterators.ContainsKey(name)) iter = new FasterLogScanIterator(this, allocator, RecoveredIterators[name], endAddress, getMemory, scanBufferingMode, epoch, headerSize, name, scanUncommitted); @@ -881,6 +901,31 @@ private void UpdateTailCallback(long tailAddress) } } + /// + /// Recover instance to FasterLog's latest commit, when being used as a readonly log iterator + /// + public void RecoverReadOnly() + { + if (!readOnlyMode) + throw new FasterException("This method can only be used with a read-only FasterLog instance used for iteration. Set FasterLogSettings.ReadOnlyMode to true during creation to indicate this."); + + Restore(out _); + + var _commitTcs = commitTcs; + if (commitTcs.Task.Status != TaskStatus.Faulted || commitTcs.Task.Exception.InnerException as CommitFailureException != null) + { + commitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + // Update commit to release pending iterators + var lci = new LinkedCommitInfo + { + CommitInfo = new CommitInfo { BeginAddress = BeginAddress, FromAddress = BeginAddress, UntilAddress = FlushedUntilAddress }, + NextTask = commitTcs.Task + }; + _commitTcs?.TrySetResult(lci); + } + /// /// Restore log /// @@ -904,15 +949,18 @@ private void Restore(out Dictionary recoveredIterators) info.Initialize(r); } - var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress); - if (info.BeginAddress > headAddress) - headAddress = info.BeginAddress; + recoveredIterators = info.Iterators; - if (headAddress == 0) headAddress = Constants.kFirstValidAddress; + if (!readOnlyMode) + { + var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress); + if (info.BeginAddress > headAddress) + headAddress = info.BeginAddress; - recoveredIterators = info.Iterators; + if (headAddress == 0) headAddress = Constants.kFirstValidAddress; - allocator.RestoreHybridLog(info.BeginAddress, headAddress, info.FlushedUntilAddress, info.FlushedUntilAddress); + allocator.RestoreHybridLog(info.BeginAddress, headAddress, info.FlushedUntilAddress, info.FlushedUntilAddress); + } CommittedUntilAddress = info.FlushedUntilAddress; CommittedBeginAddress = info.BeginAddress; SafeTailAddress = info.FlushedUntilAddress; @@ -1041,6 +1089,9 @@ private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, obje private long CommitInternal(bool spinWait = false) { + if (readOnlyMode) + throw new FasterException("Cannot commit in read-only mode"); + epoch.Resume(); if (allocator.ShiftReadOnlyToTail(out long tailAddress, out _)) { diff --git a/cs/src/core/Index/FasterLog/FasterLogIterator.cs b/cs/src/core/Index/FasterLog/FasterLogIterator.cs index 8eac85f40..deba470fb 100644 --- a/cs/src/core/Index/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/Index/FasterLog/FasterLogIterator.cs @@ -354,22 +354,34 @@ private unsafe bool BufferAndLoad(long currentAddress, long currentPage, long cu { var nextPage = currentPage + i; + var pageEndAddress = (nextPage + 1 ) << allocator.LogPageSizeBits; + + if (fasterLog.readOnlyMode) + { + // Support partial page reads of committed data + var _flush = fasterLog.CommittedUntilAddress; + if (_flush < pageEndAddress) + { + pageEndAddress = _flush; + } + } + // Cannot load page if its not fully written to storage - if (headAddress < (nextPage + 1) << allocator.LogPageSizeBits) + if (headAddress < pageEndAddress) continue; var nextFrame = (currentFrame + i) % frameSize; long val; - while ((val = nextLoadedPage[nextFrame]) < nextPage || loadedPage[nextFrame] < nextPage) + while ((val = nextLoadedPage[nextFrame]) < pageEndAddress || loadedPage[nextFrame] < pageEndAddress) { - if (val < nextPage && Interlocked.CompareExchange(ref nextLoadedPage[nextFrame], nextPage, val) == val) + if (val < pageEndAddress && Interlocked.CompareExchange(ref nextLoadedPage[nextFrame], pageEndAddress, val) == val) { var tmp_i = i; epoch.BumpCurrentEpoch(() => { allocator.AsyncReadPagesFromDeviceToFrame(tmp_i + (currentAddress >> allocator.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[nextFrame], 0, null, null, loadedCancel[nextFrame]); - loadedPage[nextFrame] = nextPage; + loadedPage[nextFrame] = pageEndAddress; }); } else diff --git a/cs/src/core/Index/FasterLog/FasterLogSettings.cs b/cs/src/core/Index/FasterLog/FasterLogSettings.cs index 9e2ae3978..b76a1721d 100644 --- a/cs/src/core/Index/FasterLog/FasterLogSettings.cs +++ b/cs/src/core/Index/FasterLog/FasterLogSettings.cs @@ -82,6 +82,11 @@ public class FasterLogSettings /// public double MutableFraction = 0; + /// + /// Use FasterLog as read-only iterator/viewer of log being committed by another instance + /// + public bool ReadOnlyMode = false; + internal LogSettings GetLogSettings() { return new LogSettings @@ -89,7 +94,7 @@ internal LogSettings GetLogSettings() LogDevice = LogDevice, PageSizeBits = PageSizeBits, SegmentSizeBits = SegmentSizeBits, - MemorySizeBits = MemorySizeBits, + MemorySizeBits = ReadOnlyMode ? 0 : MemorySizeBits, CopyReadsToTail = false, MutableFraction = MutableFraction, ObjectLogDevice = null,