From ddcc3382dd7318691b83efd9fbb9582ad9bf21df Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 19 Sep 2019 10:48:53 -0700 Subject: [PATCH] Added TryAppend so users can implement log throttling. --- cs/playground/FasterLogSample/Program.cs | 12 ++ cs/src/core/Index/FasterLog/FasterLog.cs | 154 ++++++++++++------ .../core/Index/FasterLog/ILogCommitManager.cs | 5 +- .../Index/FasterLog/LocalLogCommitManager.cs | 7 +- 4 files changed, 125 insertions(+), 53 deletions(-) diff --git a/cs/playground/FasterLogSample/Program.cs b/cs/playground/FasterLogSample/Program.cs index 0ca4005b4..6ba53ffff 100644 --- a/cs/playground/FasterLogSample/Program.cs +++ b/cs/playground/FasterLogSample/Program.cs @@ -51,6 +51,14 @@ static void AppendThread() while (true) { log.Append(entry); + + // We also support a Span-based version of Append + + // We also support TryAppend to allow throttling/back-off: + // while (!log.TryAppend(entry, out long logicalAddress)) + // { + // Thread.Sleep(10); + // } } } @@ -97,6 +105,10 @@ static void Main(string[] args) log = new FasterLog(new FasterLogSettings { LogDevice = device }); new Thread(new ThreadStart(AppendThread)).Start(); + + // Can have multiple append threads if needed + // new Thread(new ThreadStart(AppendThread)).Start(); + new Thread(new ThreadStart(ScanThread)).Start(); new Thread(new ThreadStart(ReportThread)).Start(); new Thread(new ThreadStart(CommitThread)).Start(); diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index 90c65041c..fc963cb3d 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -21,6 +21,7 @@ public class FasterLog : IDisposable { private readonly BlittableAllocator allocator; private readonly LightEpoch epoch; + private ILogCommitManager logCommitManager; /// /// Beginning address of log @@ -40,9 +41,7 @@ public class FasterLog : IDisposable /// /// Log commit until address /// - public long CommitUntilAddress; - - private ILogCommitManager logCommitManager; + public long CommittedUntilAddress; /// /// Create new log instance @@ -62,49 +61,6 @@ public FasterLog(FasterLogSettings logSettings) Restore(); } - /// - /// Commit log - /// - private void Commit(long flushAddress) - { - epoch.Resume(); - FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); - info.FlushedUntilAddress = allocator.FlushedUntilAddress; - info.BeginAddress = allocator.BeginAddress; - epoch.Suspend(); - - // We can only allow serial monotonic synchronous commit - lock (this) - { - if (flushAddress > CommitUntilAddress) - { - logCommitManager.Commit(info.ToByteArray()); - CommitUntilAddress = flushAddress; - info.DebugPrint(); - } - } - } - - /// - /// Restore log - /// - private void Restore() - { - FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); - var commitInfo = logCommitManager.GetCommitMetadata(); - - if (commitInfo == null) return; - - using (var r = new BinaryReader(new MemoryStream(commitInfo))) - { - info.Initialize(r); - } - - allocator.RestoreHybridLog(info.FlushedUntilAddress, - info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress), - info.BeginAddress); - } - /// /// Dispose /// @@ -150,6 +106,60 @@ public unsafe long Append(byte[] entry) return logicalAddress; } + /// + /// Try to append entry to log + /// + /// Entry to be appended to log + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryAppend(byte[] entry, out long logicalAddress) + { + epoch.Resume(); + logicalAddress = 0; + long tail = -allocator.GetTailAddress(); + allocator.CheckForAllocateComplete(ref tail); + if (tail < 0) + { + epoch.Suspend(); + return false; + } + var length = entry.Length; + BlockAllocate(4 + length, out logicalAddress); + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + *(int*)physicalAddress = length; + fixed (byte* bp = entry) + Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); + epoch.Suspend(); + return true; + } + + /// + /// Try to append entry to log + /// + /// Entry to be appended to log + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryAppend(Span entry, out long logicalAddress) + { + epoch.Resume(); + logicalAddress = 0; + long tail = -allocator.GetTailAddress(); + allocator.CheckForAllocateComplete(ref tail); + if (tail < 0) + { + epoch.Suspend(); + return false; + } + var length = entry.Length; + BlockAllocate(4 + length, out logicalAddress); + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + *(int*)physicalAddress = length; + fixed (byte* bp = &entry.GetPinnableReference()) + Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); + epoch.Suspend(); + return true; + } + /// /// Append batch of entries to log /// @@ -182,7 +192,7 @@ public long FlushAndCommit(bool spinWait = false) if (spinWait) { - while (CommitUntilAddress < tailAddress) + while (CommittedUntilAddress < tailAddress) { epoch.ProtectAndDrain(); Thread.Yield(); @@ -204,7 +214,7 @@ public void TruncateUntil(long untilAddress) } /// - /// Iterator interface for scanning FASTER log + /// Pull-based iterator interface for scanning FASTER log /// /// /// @@ -235,6 +245,11 @@ public void ReleaseThread() epoch.Release(); } + /// + /// Block allocate + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private void BlockAllocate(int recordSize, out long logicalAddress) { @@ -259,5 +274,48 @@ private void BlockAllocate(int recordSize, out long logicalAddress) BlockAllocate(recordSize, out logicalAddress); } } + + /// + /// Commit log + /// + private void Commit(long flushAddress) + { + epoch.Resume(); + FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); + info.FlushedUntilAddress = allocator.FlushedUntilAddress; + info.BeginAddress = allocator.BeginAddress; + epoch.Suspend(); + + // We can only allow serial monotonic synchronous commit + lock (this) + { + if (flushAddress > CommittedUntilAddress) + { + logCommitManager.Commit(flushAddress, info.ToByteArray()); + CommittedUntilAddress = flushAddress; + info.DebugPrint(); + } + } + } + + /// + /// Restore log + /// + private void Restore() + { + FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); + var commitInfo = logCommitManager.GetCommitMetadata(); + + if (commitInfo == null) return; + + using (var r = new BinaryReader(new MemoryStream(commitInfo))) + { + info.Initialize(r); + } + + allocator.RestoreHybridLog(info.FlushedUntilAddress, + info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress), + info.BeginAddress); + } } } diff --git a/cs/src/core/Index/FasterLog/ILogCommitManager.cs b/cs/src/core/Index/FasterLog/ILogCommitManager.cs index 7c5f37de9..892273815 100644 --- a/cs/src/core/Index/FasterLog/ILogCommitManager.cs +++ b/cs/src/core/Index/FasterLog/ILogCommitManager.cs @@ -13,8 +13,9 @@ public interface ILogCommitManager /// /// Perform (synchronous) commit with specified metadata /// - /// - void Commit(byte[] commitMetadata); + /// Address committed until (for information only, not necessary to persist) + /// Commit metadata + void Commit(long address, byte[] commitMetadata); /// /// Return prior commit metadata during recovery diff --git a/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs index d7bdba2a0..761984d68 100644 --- a/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs +++ b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs @@ -22,10 +22,11 @@ public LocalLogCommitManager(string CommitFile) } /// - /// Commit log + /// Perform (synchronous) commit with specified metadata /// - /// - public void Commit(byte[] commitMetadata) + /// Address committed until (for information only, not necessary to persist) + /// Commit metadata + public void Commit(long address, byte[] commitMetadata) { // Two phase to ensure we write metadata in single Write operation using (var ms = new MemoryStream())