diff --git a/cs/playground/FasterLogSample/Program.cs b/cs/playground/FasterLogSample/Program.cs index cb377cf6e..a6365a805 100644 --- a/cs/playground/FasterLogSample/Program.cs +++ b/cs/playground/FasterLogSample/Program.cs @@ -10,7 +10,7 @@ namespace FasterLogSample { public class Program { - const int entryLength = 96; + const int entryLength = 996; static FasterLog log; static void ReportThread() @@ -50,15 +50,18 @@ static void AppendThread() while (true) { + // Sync append log.Append(entry); - // We also support a Span-based version of Append + // We also support a Span-based variant of Append - // We also support TryAppend to allow throttling/back-off: - // while (!log.TryAppend(entry, out long logicalAddress)) - // { - // Thread.Sleep(10); - // } + // We also support TryAppend to allow throttling/back-off + // (expect this to be slightly slower than the sync version) + // Make sure you supply a "starting" logical address of 0 + // Retries must send back the current logical address. + // + // long logicalAddress = 0; + // while (!log.TryAppend(entry, ref logicalAddress)) ; } } diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 8718266d1..ecedcf2c6 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -818,6 +818,90 @@ public long Allocate(int numSlots = 1) return address; } + /// + /// Try allocate, no thread spinning allowed + /// May return 0 in case of inability + /// May also return negative address + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public long TryAllocate(int numSlots = 1) + { + PageOffset localTailPageOffset = default(PageOffset); + + // Necessary to check because threads keep retrying and we do not + // want to overflow offset more than once per thread + if (TailPageOffset.Offset > PageSize) + return 0; + + // Determine insertion index. + // ReSharper disable once CSharpWarnings::CS0420 +#pragma warning disable 420 + localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots); +#pragma warning restore 420 + + int page = localTailPageOffset.Page; + int offset = localTailPageOffset.Offset - numSlots; + + #region HANDLE PAGE OVERFLOW + if (localTailPageOffset.Offset > PageSize) + { + if (offset > PageSize) + { + return 0; + } + + // The thread that "makes" the offset incorrect + // is the one that is elected to fix it and + // shift read-only/head. + localTailPageOffset.Page++; + localTailPageOffset.Offset = 0; + TailPageOffset = localTailPageOffset; + + long shiftAddress = ((long)(page + 1)) << LogPageSizeBits; + PageAlignedShiftReadOnlyAddress(shiftAddress); + PageAlignedShiftHeadAddress(shiftAddress); + + return 0; + } + #endregion + + long address = (((long)page) << LogPageSizeBits) | ((long)offset); + + // Check for TailPageCache hit + if (TailPageCache == page) + { + return address; + } + + // Address has been allocated. Negate the address + // if page is not ready to be used. + if (CannotAllocate(page)) + { + address = -address; + } + + // Update the read-only so that we can get more space for the tail + if (offset == 0) + { + if (address >= 0) + { + TailPageCache = page; + Interlocked.MemoryBarrier(); + } + + // Allocate next page in advance, if needed + int newPageIndex = (page + 1) % BufferSize; + if ((!IsAllocated(newPageIndex))) + { + AllocatePage(newPageIndex); + } + } + + return address; + } + /// /// If allocator cannot allocate new memory as the head has not shifted or the previous page /// is not yet closed, it allocates but returns the negative address. @@ -827,11 +911,6 @@ public long Allocate(int numSlots = 1) [MethodImpl(MethodImplOptions.AggressiveInlining)] public void CheckForAllocateComplete(ref long address) { - if (address >= 0) - { - throw new Exception("Address already allocated!"); - } - PageOffset p = default(PageOffset); p.Page = (int)((-address) >> LogPageSizeBits); p.Offset = (int)((-address) & PageSizeMask); diff --git a/cs/src/core/Index/Common/LogSettings.cs b/cs/src/core/Index/Common/LogSettings.cs index c0943dfe8..f6b59e337 100644 --- a/cs/src/core/Index/Common/LogSettings.cs +++ b/cs/src/core/Index/Common/LogSettings.cs @@ -176,8 +176,10 @@ public class ReadCacheSettings public int MemorySizeBits = 34; /// - /// Fraction of log used for second chance copy to tail + /// Fraction of log head (in memory) used for second chance + /// copy to tail. This is (1 - MutableFraction) for the + /// underlying log /// - public double SecondChanceFraction = 0.9; + public double SecondChanceFraction = 0.1; } } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 32c7a5a5d..e222a3585 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -135,7 +135,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits, MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, - MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction + MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction }, variableLengthStructSettings, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); @@ -153,7 +153,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits, MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, - MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction + MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction }, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); @@ -174,7 +174,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits, MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, - MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction + MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction }, serializerSettings, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index 9ebf1af80..979a9d803 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -109,82 +109,73 @@ public unsafe long Append(byte[] entry) } /// - /// Try to append entry to log + /// Try to append entry to log. If is returns true, we are + /// done. If it returns false with negative address, user + /// needs to call TryCompleteAppend to finalize the append. + /// See TryCompleteAppend for more info. /// /// Entry to be appended to log /// Logical address of added entry /// Whether the append succeeded - public unsafe bool TryAppend(byte[] entry, out long logicalAddress) + public unsafe bool TryAppend(byte[] entry, ref long logicalAddress) { + if (logicalAddress < 0) + return TryCompleteAppend(entry, ref logicalAddress); + epoch.Resume(); - logicalAddress = 0; - long tail = -allocator.GetTailAddress(); - allocator.CheckForAllocateComplete(ref tail); - if (tail < 0) - { + + var length = entry.Length; + var alignedLength = (length + 3) & ~3; // round up to multiple of 4 + + logicalAddress = allocator.TryAllocate(4 + alignedLength); + if (logicalAddress <= 0) + { epoch.Suspend(); return false; } - var length = entry.Length; - var alignedLength = (length + 3) & ~3; // round up to multiple of 4 - BlockAllocate(4 + alignedLength, out logicalAddress); + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); *(int*)physicalAddress = length; fixed (byte* bp = entry) Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); + epoch.Suspend(); return true; } /// - /// Try to append entry to log + /// Try to append entry to log. If is returns true, we are + /// done. If it returns false with negative address, user + /// needs to call TryCompleteAppend to finalize the append. + /// See TryCompleteAppend for more info. /// /// Entry to be appended to log /// Logical address of added entry /// Whether the append succeeded - public unsafe bool TryAppend(Span entry, out long logicalAddress) + public unsafe bool TryAppend(Span entry, ref long logicalAddress) { + if (logicalAddress < 0) + return TryCompleteAppend(entry, ref logicalAddress); + epoch.Resume(); - logicalAddress = 0; - long tail = -allocator.GetTailAddress(); - allocator.CheckForAllocateComplete(ref tail); - if (tail < 0) + + var length = entry.Length; + var alignedLength = (length + 3) & ~3; // round up to multiple of 4 + + logicalAddress = allocator.TryAllocate(4 + alignedLength); + if (logicalAddress <= 0) { epoch.Suspend(); return false; } - var length = entry.Length; - var alignedLength = (length + 3) & ~3; // round up to multiple of 4 - BlockAllocate(4 + alignedLength, out logicalAddress); + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); *(int*)physicalAddress = length; fixed (byte* bp = &entry.GetPinnableReference()) Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); - epoch.Suspend(); - return true; - } - /// - /// Append batch of entries to log - /// - /// - /// Logical address of last added entry - public unsafe long Append(List entries) - { - long logicalAddress = 0; - epoch.Resume(); - foreach (var entry in entries) - { - var length = entry.Length; - var alignedLength = (length + 3) & ~3; // round up to multiple of 4 - BlockAllocate(4 + alignedLength, out logicalAddress); - var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); - *(int*)physicalAddress = length; - fixed (byte* bp = entry) - Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); - } epoch.Suspend(); - return logicalAddress; + return true; } /// @@ -280,6 +271,84 @@ private void BlockAllocate(int recordSize, out long logicalAddress) } } + /// + /// Try to complete partial allocation. Succeeds when address + /// turns positive. If failed with negative address, try the + /// operation. If failed with zero address, user needs to start + /// afresh with a new TryAppend operation. + /// + /// + /// + /// Whether operation succeeded + private unsafe bool TryCompleteAppend(byte[] entry, ref long logicalAddress) + { + epoch.Resume(); + + allocator.CheckForAllocateComplete(ref logicalAddress); + + if (logicalAddress < 0) + { + epoch.Suspend(); + return false; + } + + if (logicalAddress < allocator.ReadOnlyAddress) + { + logicalAddress = 0; + epoch.Suspend(); + return false; + } + + var length = entry.Length; + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + *(int*)physicalAddress = length; + fixed (byte* bp = entry) + Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); + + epoch.Suspend(); + return true; + } + + /// + /// Try to complete partial allocation. Succeeds when address + /// turns positive. If failed with negative address, try the + /// operation. If failed with zero address, user needs to start + /// afresh with a new TryAppend operation. + /// + /// + /// + /// Whether operation succeeded + private unsafe bool TryCompleteAppend(Span entry, ref long logicalAddress) + { + epoch.Resume(); + + allocator.CheckForAllocateComplete(ref logicalAddress); + + if (logicalAddress < 0) + { + epoch.Suspend(); + return false; + } + + if (logicalAddress < allocator.ReadOnlyAddress) + { + logicalAddress = 0; + epoch.Suspend(); + return false; + } + + var length = entry.Length; + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + *(int*)physicalAddress = length; + fixed (byte* bp = &entry.GetPinnableReference()) + Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); + + epoch.Suspend(); + return true; + } + /// /// Commit log /// @@ -316,9 +385,10 @@ private void Restore() info.Initialize(r); } - allocator.RestoreHybridLog(info.FlushedUntilAddress, - info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress), - info.BeginAddress); + var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress); + if (headAddress == 0) headAddress = Constants.kFirstValidAddress; + + allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress); } } }