diff --git a/cs/playground/FasterLogSample/Program.cs b/cs/playground/FasterLogSample/Program.cs index 6ba53ffff..cb377cf6e 100644 --- a/cs/playground/FasterLogSample/Program.cs +++ b/cs/playground/FasterLogSample/Program.cs @@ -51,7 +51,7 @@ static void AppendThread() while (true) { log.Append(entry); - + // We also support a Span-based version of Append // We also support TryAppend to allow throttling/back-off: @@ -84,7 +84,7 @@ static void ScanThread() Thread.Sleep(1000); if (!result.SequenceEqual(entrySpan)) { - throw new Exception("Invalid entry found"); + throw new Exception("Invalid entry found at offset " + FindDiff(result, entrySpan)); } if (r.Next(100) < 10) @@ -99,6 +99,18 @@ static void ScanThread() } } + static int FindDiff(Span b1, Span b2) + { + for (int i=0; i : IDisposable /// /// HeadOffset lag (from tail) /// - protected const int HeadOffsetLagNumPages = 4; + protected const int HeadOffsetLagNumPages = 2; /// /// HeadOffset lag (from tail) for ReadCache /// - protected const int ReadCacheHeadOffsetLagNumPages = 1; + protected const int ReadCacheHeadOffsetLagNumPages = 2; /// /// HeadOffset lag size /// @@ -168,6 +157,11 @@ public unsafe abstract partial class AllocatorBase : IDisposable /// public long FlushedUntilAddress; + /// + /// Flushed until address + /// + public long ClosedUntilAddress; + /// /// Begin address /// @@ -190,7 +184,7 @@ public unsafe abstract partial class AllocatorBase : IDisposable /// /// Index in circular buffer, of the current tail page /// - private volatile int TailPageIndex; + private volatile int TailPageCache; // Array that indicates the status of each buffer page internal readonly FullPageStatus[] PageStatusIndicator; @@ -521,9 +515,9 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer SegmentSize = 1 << LogSegmentSizeBits; SegmentBufferSize = 1 + (LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize)); - if (BufferSize < 16) + if (BufferSize < 4) { - throw new Exception("HLOG buffer must be at least 16 pages"); + throw new Exception("HLOG buffer must be at least 4 pages"); } PageStatusIndicator = new FullPageStatus[BufferSize]; @@ -555,13 +549,14 @@ protected void Initialize(long firstValidAddress) ReadOnlyAddress = firstValidAddress; SafeHeadAddress = firstValidAddress; HeadAddress = firstValidAddress; + ClosedUntilAddress = firstValidAddress; FlushedUntilAddress = firstValidAddress; BeginAddress = firstValidAddress; TailPageOffset.Page = (int)(firstValidAddress >> LogPageSizeBits); TailPageOffset.Offset = (int)(firstValidAddress & PageSizeMask); - TailPageIndex = 0; + TailPageCache = 0; } /// @@ -623,6 +618,11 @@ public long GetSegmentSize() public long GetTailAddress() { var local = TailPageOffset; + if (local.Offset >= PageSize) + { + local.Page++; + local.Offset = 0; + } return ((long)local.Page << LogPageSizeBits) | (uint)local.Offset; } @@ -778,18 +778,14 @@ public long Allocate(int numSlots = 1) long address = (((long)page) << LogPageSizeBits) | ((long)offset); - // Check if TailPageIndex is appropriate and allocated! - int pageIndex = page % BufferSize; - - if (TailPageIndex == pageIndex) + // Check for TailPageCache hit + if (TailPageCache == page) { return (address); } - //Invert the address if either the previous page is not flushed or if it is null - if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) || - (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) || - (!IsAllocated(pageIndex))) + // Negate the address if page not ready to be used + if (CannotAllocate(page)) { address = -address; } @@ -799,13 +795,11 @@ public long Allocate(int numSlots = 1) { if (address >= 0) { - TailPageIndex = pageIndex; + TailPageCache = page; Interlocked.MemoryBarrier(); } - long newPage = page + 1; - int newPageIndex = (int)((page + 1) % BufferSize); - + int newPageIndex = (page + 1) % BufferSize; long tailAddress = (address < 0 ? -address : address); PageAlignedShiftReadOnlyAddress(tailAddress); PageAlignedShiftHeadAddress(tailAddress); @@ -814,9 +808,14 @@ public long Allocate(int numSlots = 1) { AllocatePage(newPageIndex); } + + // We refreshed epoch, so address may have + // become read-only; re-check + if (tailAddress < ReadOnlyAddress) + return Allocate(numSlots); } - return (address); + return address; } /// @@ -837,22 +836,17 @@ public void CheckForAllocateComplete(ref long address) p.Page = (int)((-address) >> LogPageSizeBits); p.Offset = (int)((-address) & PageSizeMask); - //Check write cache - int pageIndex = p.Page % BufferSize; - if (TailPageIndex == pageIndex) + // Check write cache + if (TailPageCache == p.Page) { address = -address; return; } - //Check if we can move the head offset - long currentTailAddress = GetTailAddress(); - PageAlignedShiftHeadAddress(currentTailAddress); + PageAlignedShiftHeadAddress(GetTailAddress()); - //Check if I can allocate pageIndex at all - if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) || - (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) || - (!IsAllocated(pageIndex))) + // Check if we can allocate pageIndex + if (CannotAllocate(p.Page)) { return; } @@ -861,11 +855,19 @@ public void CheckForAllocateComplete(ref long address) address = -address; if (p.Offset == 0) { - TailPageIndex = pageIndex; + TailPageCache = p.Page; } return; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool CannotAllocate(int page) + { + return + (page >= BufferSize + (ClosedUntilAddress >> LogPageSizeBits) - 1) || + !IsAllocated(page % BufferSize); + } + /// /// Used by applications to make the current state of the database immutable quickly /// @@ -942,22 +944,8 @@ public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress, bool waitForPendi if (Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress)) { Debug.WriteLine("SafeReadOnly shifted from {0:X} to {1:X}", oldSafeReadOnlyAddress, newSafeReadOnlyAddress); - long startPage = oldSafeReadOnlyAddress >> LogPageSizeBits; - - long endPage = (newSafeReadOnlyAddress >> LogPageSizeBits); - OnReadOnlyObserver?.OnNext(Scan(oldSafeReadOnlyAddress, newSafeReadOnlyAddress, ScanBufferingMode.NoBuffering)); - - int numPages = (int)(endPage - startPage); - if (numPages > 10) - { - new Thread( - () => AsyncFlushPages(oldSafeReadOnlyAddress, newSafeReadOnlyAddress)).Start(); - } - else - { - AsyncFlushPages(oldSafeReadOnlyAddress, newSafeReadOnlyAddress); - } + AsyncFlushPages(oldSafeReadOnlyAddress, newSafeReadOnlyAddress); } } @@ -980,39 +968,42 @@ public void OnPagesClosed(long newSafeHeadAddress) return; } - int closePage = (int)((closePageAddress >> LogPageSizeBits) % BufferSize); + int closePage = (int)(closePageAddress >> LogPageSizeBits); + int closePageIndex = closePage % BufferSize; - if (!IsAllocated(closePage)) + if (!IsAllocated(closePageIndex)) + AllocatePage(closePageIndex); + else + ClearPage(closePage); + Utility.MonotonicUpdate(ref PageStatusIndicator[closePageIndex].LastClosedUntilAddress, closePageAddress + PageSize, out _); + ShiftClosedUntilAddress(); + if (ClosedUntilAddress > FlushedUntilAddress) { - AllocatePage(closePage); + throw new Exception($"Closed address {ClosedUntilAddress} exceeds flushed address {FlushedUntilAddress}"); } - - while (true) - { - var oldStatus = PageStatusIndicator[closePage].PageFlushCloseStatus; - if (oldStatus.PageFlushStatus == PMMFlushStatus.Flushed) - { - ClearPage(closePageAddress >> LogPageSizeBits); - } - else - { - throw new Exception("Error: page should already be flushed at this point"); - } - - var newStatus = oldStatus; - newStatus.PageCloseStatus = PMMCloseStatus.Closed; - if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[closePage].PageFlushCloseStatus.value, newStatus.value, oldStatus.value)) - { - break; - } - } - - // Necessary to propagate this change to other threads - Interlocked.MemoryBarrier(); } } } + private void DebugPrintAddresses(long closePageAddress) + { + var _flush = FlushedUntilAddress; + var _readonly = ReadOnlyAddress; + var _safereadonly = SafeReadOnlyAddress; + var _tail = GetTailAddress(); + var _head = HeadAddress; + var _safehead = SafeHeadAddress; + + Console.WriteLine("ClosePageAddress: {0}.{1}", GetPage(closePageAddress), GetOffsetInPage(closePageAddress)); + Console.WriteLine("FlushedUntil: {0}.{1}", GetPage(_flush), GetOffsetInPage(_flush)); + Console.WriteLine("Tail: {0}.{1}", GetPage(_tail), GetOffsetInPage(_tail)); + Console.WriteLine("Head: {0}.{1}", GetPage(_head), GetOffsetInPage(_head)); + Console.WriteLine("SafeHead: {0}.{1}", GetPage(_safehead), GetOffsetInPage(_safehead)); + Console.WriteLine("ReadOnly: {0}.{1}", GetPage(_readonly), GetOffsetInPage(_readonly)); + Console.WriteLine("SafeReadOnly: {0}.{1}", GetPage(_safereadonly), GetOffsetInPage(_safereadonly)); + Console.WriteLine("TailPageCache: {0}", TailPageCache); + } + /// /// Called every time a new tail page is allocated. Here the read-only is /// shifted only to page boundaries unlike ShiftReadOnlyToTail where shifting @@ -1097,13 +1088,13 @@ protected void ShiftFlushedUntilAddress() long page = GetPage(currentFlushedUntilAddress); bool update = false; - long pageLastFlushedAddress = Interlocked.Read(ref PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress); - while (pageLastFlushedAddress >= currentFlushedUntilAddress) + long pageLastFlushedAddress = PageStatusIndicator[page % BufferSize].LastFlushedUntilAddress; + while (pageLastFlushedAddress >= currentFlushedUntilAddress && currentFlushedUntilAddress >= (page << LogPageSizeBits)) { currentFlushedUntilAddress = pageLastFlushedAddress; update = true; page++; - pageLastFlushedAddress = Interlocked.Read(ref PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress); + pageLastFlushedAddress = PageStatusIndicator[page % BufferSize].LastFlushedUntilAddress; } if (update) @@ -1115,6 +1106,30 @@ protected void ShiftFlushedUntilAddress() } } + /// + /// Shift ClosedUntil address + /// + protected void ShiftClosedUntilAddress() + { + long currentClosedUntilAddress = ClosedUntilAddress; + long page = GetPage(currentClosedUntilAddress); + + bool update = false; + long pageLastClosedAddress = PageStatusIndicator[page % BufferSize].LastClosedUntilAddress; + while (pageLastClosedAddress >= currentClosedUntilAddress && currentClosedUntilAddress >= (page << LogPageSizeBits)) + { + currentClosedUntilAddress = pageLastClosedAddress; + update = true; + page++; + pageLastClosedAddress = PageStatusIndicator[(int)(page % BufferSize)].LastClosedUntilAddress; + } + + if (update) + { + Utility.MonotonicUpdate(ref ClosedUntilAddress, currentClosedUntilAddress, out long oldClosedUntilAddress); + } + } + /// /// Reset for recovery /// @@ -1127,35 +1142,25 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress) long offsetInPage = GetOffsetInPage(tailAddress); TailPageOffset.Page = (int)tailPage; TailPageOffset.Offset = (int)offsetInPage; - TailPageIndex = GetPageIndexForPage(TailPageOffset.Page); + TailPageCache = TailPageOffset.Page; // allocate next page as well - this is an invariant in the allocator! var pageIndex = (TailPageOffset.Page % BufferSize); var nextPageIndex = (pageIndex + 1) % BufferSize; if (tailAddress > 0) - AllocatePage(nextPageIndex); + if (!IsAllocated(nextPageIndex)) + AllocatePage(nextPageIndex); BeginAddress = beginAddress; HeadAddress = headAddress; SafeHeadAddress = headAddress; + ClosedUntilAddress = headAddress; FlushedUntilAddress = tailAddress; ReadOnlyAddress = tailAddress; SafeReadOnlyAddress = tailAddress; - // ensure appropriate page status for all pages in memory - // note: they must have been read in previously during recovery - var addr = GetStartLogicalAddress(GetPage(headAddress)); - for (; addr < tailAddress; addr += PageSize) - { - pageIndex = GetPageIndexForAddress(addr); - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - } - // for the last page which contains tailoffset, it must be open pageIndex = GetPageIndexForAddress(tailAddress); - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Open; - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; // clear the last page starting from tail address ClearPage(pageIndex, (int)GetOffsetInPage(tailAddress)); @@ -1307,12 +1312,25 @@ public void AsyncFlushPages(long fromAddress, long untilAddress) long startPage = fromAddress >> LogPageSizeBits; long endPage = untilAddress >> LogPageSizeBits; int numPages = (int)(endPage - startPage); + + long offsetInStartPage = GetOffsetInPage(fromAddress); long offsetInEndPage = GetOffsetInPage(untilAddress); + + + // Extra (partial) page being flushed if (offsetInEndPage > 0) - { numPages++; - } + // Partial page starting point, need to wait until the + // ongoing adjacent flush is completed to ensure correctness + if (offsetInStartPage > 0) + { + while (FlushedUntilAddress < fromAddress) + { + epoch.ProtectAndDrain(); + Thread.Yield(); + } + } /* Request asynchronous writes to the device. If waitForPendingFlushComplete * is set, then a CountDownEvent is set in the callback handle. @@ -1325,41 +1343,24 @@ public void AsyncFlushPages(long fromAddress, long untilAddress) var asyncResult = new PageAsyncFlushResult { page = flushPage, - count = 1 + count = 1, + partial = false, + fromAddress = pageStartAddress, + untilAddress = pageEndAddress }; - if (pageEndAddress > untilAddress || pageStartAddress < fromAddress) + if ( + ((fromAddress > pageStartAddress) && (fromAddress < pageEndAddress)) || + ((untilAddress > pageStartAddress) && (untilAddress < pageEndAddress)) + ) { asyncResult.partial = true; - asyncResult.fromAddress = pageStartAddress; - asyncResult.untilAddress = pageEndAddress; - if (pageEndAddress > untilAddress) + if (untilAddress < pageEndAddress) asyncResult.untilAddress = untilAddress; - if (pageStartAddress < fromAddress) + if (fromAddress > pageStartAddress) asyncResult.fromAddress = fromAddress; - - - // Are we flushing until the end of page? - if (untilAddress >= pageEndAddress) - { - // Set status to in-progress - PageStatusIndicator[flushPage % BufferSize].PageFlushCloseStatus - = new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.InProgress, PageCloseStatus = PMMCloseStatus.Open }; - } } - else - { - asyncResult.partial = false; - asyncResult.fromAddress = pageStartAddress; - asyncResult.untilAddress = pageEndAddress; - - // Set status to in-progress - PageStatusIndicator[flushPage % BufferSize].PageFlushCloseStatus - = new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.InProgress, PageCloseStatus = PMMCloseStatus.Open }; - } - - Interlocked.Exchange(ref PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress, -1); WriteAsync(flushPage, AsyncFlushPageCallback, asyncResult); } @@ -1527,24 +1528,7 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, NativeOverlap if (Interlocked.Decrement(ref result.count) == 0) { - if (!result.partial || (result.untilAddress >= ((result.page + 1) << LogPageSizeBits))) - { - while (true) - { - var oldStatus = PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus; - if (oldStatus.PageCloseStatus == PMMCloseStatus.Closed) - { - throw new Exception("Error: page should not be closed at this point"); - } - var newStatus = oldStatus; - newStatus.PageFlushStatus = PMMFlushStatus.Flushed; - if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus.value, newStatus.value, oldStatus.value)) - { - break; - } - } - } - Interlocked.Exchange(ref PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress, result.untilAddress); + Utility.MonotonicUpdate(ref PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress, result.untilAddress, out long old); ShiftFlushedUntilAddress(); result.Free(); } diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index 6e80ccb78..9c37ffb3e 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -131,10 +131,6 @@ internal override void AllocatePage(int index) long p = (long)handles[index].AddrOfPinnedObject(); pointers[index] = (p + (sectorSize - 1)) & ~(sectorSize - 1); values[index] = tmp; - - PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - Interlocked.MemoryBarrier(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 0b6b7983d..1442c8e81 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -41,8 +41,8 @@ public unsafe sealed class GenericAllocator : AllocatorBase(); private readonly bool valueBlittable = Utility.IsBlittable(); - public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch, null) + public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null, Action flushCallback = null) + : base(settings, comparer, evictCallback, epoch, flushCallback) { SerializerSettings = serializerSettings; @@ -200,9 +200,6 @@ internal override void DeleteFromMemory() internal override void AllocatePage(int index) { values[index] = AllocatePage(); - PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - Interlocked.MemoryBarrier(); } internal Record[] AllocatePage() diff --git a/cs/src/core/Allocator/VarLenBlittableAllocator.cs b/cs/src/core/Allocator/VarLenBlittableAllocator.cs index b5057d8cd..3b59a0050 100644 --- a/cs/src/core/Allocator/VarLenBlittableAllocator.cs +++ b/cs/src/core/Allocator/VarLenBlittableAllocator.cs @@ -33,8 +33,8 @@ public unsafe sealed class VariableLengthBlittableAllocator : Alloca internal readonly IVariableLengthStruct KeyLength; internal readonly IVariableLengthStruct ValueLength; - public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStructSettings vlSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch, null) + public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStructSettings vlSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null, Action flushCallback = null) + : base(settings, comparer, evictCallback, epoch, flushCallback) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; @@ -215,10 +215,6 @@ internal override void AllocatePage(int index) long p = (long)handles[index].AddrOfPinnedObject(); pointers[index] = (p + (sectorSize - 1)) & ~(sectorSize - 1); values[index] = tmp; - - PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - Interlocked.MemoryBarrier(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 4311777ce..32fc159d4 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -1896,7 +1896,7 @@ private void BlockAllocate(int recordSize, out long logicalAddress) hlog.CheckForAllocateComplete(ref logicalAddress); if (logicalAddress < 0) { - Thread.Sleep(10); + Thread.Yield(); } } @@ -1923,7 +1923,7 @@ private void BlockAllocateReadCache(int recordSize, out long logicalAddress) readcache.CheckForAllocateComplete(ref logicalAddress); if (logicalAddress < 0) { - Thread.Sleep(10); + Thread.Yield(); } } @@ -2323,29 +2323,32 @@ private void ReadCacheEvict(long fromHeadAddress, long toHeadAddress) { physicalAddress = readcache.GetPhysicalAddress(logicalAddress); var recordSize = readcache.GetRecordSize(physicalAddress); - ref Key key = ref readcache.GetKey(physicalAddress); ref RecordInfo info = ref readcache.GetInfo(physicalAddress); - entry.word = info.PreviousAddress; - if (!entry.ReadCache) + if (!info.Invalid) { - var hash = comparer.GetHashCode64(ref key); - var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - - entry = default(HashBucketEntry); - var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); - while (tagExists && entry.ReadCache) + ref Key key = ref readcache.GetKey(physicalAddress); + entry.word = info.PreviousAddress; + if (!entry.ReadCache) { - var updatedEntry = default(HashBucketEntry); - updatedEntry.Tag = tag; - updatedEntry.Address = info.PreviousAddress; - updatedEntry.Pending = entry.Pending; - updatedEntry.Tentative = false; - - if (entry.word == Interlocked.CompareExchange - (ref bucket->bucket_entries[slot], updatedEntry.word, entry.word)) - break; + var hash = comparer.GetHashCode64(ref key); + var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); + + entry = default(HashBucketEntry); + var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); + while (tagExists && entry.ReadCache) + { + var updatedEntry = default(HashBucketEntry); + updatedEntry.Tag = tag; + updatedEntry.Address = info.PreviousAddress; + updatedEntry.Pending = entry.Pending; + updatedEntry.Tentative = false; + + if (entry.word == Interlocked.CompareExchange + (ref bucket->bucket_entries[slot], updatedEntry.word, entry.word)) + break; - tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); + tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); + } } } logicalAddress += recordSize; diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index fc963cb3d..9ebf1af80 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -79,7 +79,8 @@ public unsafe long Append(Span entry) { epoch.Resume(); var length = entry.Length; - BlockAllocate(4 + length, out long logicalAddress); + var alignedLength = (length + 3) & ~3; // round up to multiple of 4 + BlockAllocate(4 + alignedLength, out long logicalAddress); var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); *(int*)physicalAddress = length; fixed (byte* bp = &entry.GetPinnableReference()) @@ -97,7 +98,8 @@ public unsafe long Append(byte[] entry) { epoch.Resume(); var length = entry.Length; - BlockAllocate(4 + length, out long logicalAddress); + var alignedLength = (length + 3) & ~3; // round up to multiple of 4 + BlockAllocate(4 + alignedLength, out long logicalAddress); var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); *(int*)physicalAddress = length; fixed (byte* bp = entry) @@ -124,7 +126,8 @@ public unsafe bool TryAppend(byte[] entry, out long logicalAddress) return false; } var length = entry.Length; - BlockAllocate(4 + length, out logicalAddress); + var alignedLength = (length + 3) & ~3; // round up to multiple of 4 + BlockAllocate(4 + alignedLength, out logicalAddress); var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); *(int*)physicalAddress = length; fixed (byte* bp = entry) @@ -151,7 +154,8 @@ public unsafe bool TryAppend(Span entry, out long logicalAddress) return false; } var length = entry.Length; - BlockAllocate(4 + length, out logicalAddress); + var alignedLength = (length + 3) & ~3; // round up to multiple of 4 + BlockAllocate(4 + alignedLength, out logicalAddress); var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); *(int*)physicalAddress = length; fixed (byte* bp = &entry.GetPinnableReference()) @@ -172,7 +176,8 @@ public unsafe long Append(List entries) foreach (var entry in entries) { var length = entry.Length; - BlockAllocate(4 + length, out logicalAddress); + var alignedLength = (length + 3) & ~3; // round up to multiple of 4 + BlockAllocate(4 + alignedLength, out logicalAddress); var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); *(int*)physicalAddress = length; fixed (byte* bp = entry) @@ -262,7 +267,7 @@ private void BlockAllocate(int recordSize, out long logicalAddress) allocator.CheckForAllocateComplete(ref logicalAddress); if (logicalAddress < 0) { - Thread.Sleep(10); + Thread.Yield(); } } @@ -280,11 +285,9 @@ private void BlockAllocate(int recordSize, out long logicalAddress) /// private void Commit(long flushAddress) { - epoch.Resume(); FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); - info.FlushedUntilAddress = allocator.FlushedUntilAddress; + info.FlushedUntilAddress = flushAddress; info.BeginAddress = allocator.BeginAddress; - epoch.Suspend(); // We can only allow serial monotonic synchronous commit lock (this) @@ -293,7 +296,7 @@ private void Commit(long flushAddress) { logCommitManager.Commit(flushAddress, info.ToByteArray()); CommittedUntilAddress = flushAddress; - info.DebugPrint(); + // info.DebugPrint(); } } } diff --git a/cs/src/core/Index/FasterLog/FasterLogIterator.cs b/cs/src/core/Index/FasterLog/FasterLogIterator.cs index b09db13c3..e61c5131d 100644 --- a/cs/src/core/Index/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/Index/FasterLog/FasterLogIterator.cs @@ -121,9 +121,9 @@ public unsafe bool GetNext(out Span entry) // Check if record fits on page, if not skip to next page int length = *(int*)physicalAddress; - int recordSize = 4; - if (length > 0) - recordSize += length; + int alignedLength = (length + 3) & ~3; // round up to multiple of 4 + + int recordSize = 4 + alignedLength; if ((currentAddress & allocator.PageSizeMask) + recordSize > allocator.PageSize) { if (currentAddress >= headAddress) diff --git a/cs/src/core/Index/FasterLog/FasterLogSettings.cs b/cs/src/core/Index/FasterLog/FasterLogSettings.cs index 5969a034e..d6294cb32 100644 --- a/cs/src/core/Index/FasterLog/FasterLogSettings.cs +++ b/cs/src/core/Index/FasterLog/FasterLogSettings.cs @@ -25,14 +25,15 @@ public class FasterLogSettings public int PageSizeBits = 22; /// - /// Size of a segment (group of pages), in bits + /// Total size of in-memory part of log, in bits + /// Num pages = 2^(MemorySizeBits-PageSizeBits) /// - public int SegmentSizeBits = 30; + public int MemorySizeBits = 24; /// - /// Total size of in-memory part of log, in bits + /// Size of a segment (group of pages), in bits /// - public int MemorySizeBits = 26; + public int SegmentSizeBits = 30; /// /// Log commit manager diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 7ddd5db3f..4ae0388ac 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -433,7 +433,8 @@ public void RestoreHybridLog(long untilAddress, long headAddress, long beginAddr ((headAddress == untilAddress) && (GetOffsetInPage(headAddress) == 0)) // Empty in-memory page ) { - AllocatePage(GetPageIndexForAddress(headAddress)); + if (!IsAllocated(GetPageIndexForAddress(headAddress))) + AllocatePage(GetPageIndexForAddress(headAddress)); } else {