diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 21fd70bb7..4af941e9b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -5,7 +5,7 @@ variables: jobs: - job: 'csharpWindows' pool: - vmImage: vs2017-win2016 + vmImage: windows-latest displayName: 'C# (Windows)' strategy: diff --git a/cs/FASTER.sln b/cs/FASTER.sln index af8a268e5..c1f9b7a42 100644 --- a/cs/FASTER.sln +++ b/cs/FASTER.sln @@ -44,6 +44,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B1 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -156,6 +158,12 @@ Global {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64 {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|Any CPU.ActiveCfg = Debug|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.ActiveCfg = Debug|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.Build.0 = Debug|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.ActiveCfg = Release|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -176,6 +184,7 @@ Global {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496} {E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE} + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC} diff --git a/cs/benchmark/FASTER.benchmark.csproj b/cs/benchmark/FASTER.benchmark.csproj index 89e99cf29..304f6a288 100644 --- a/cs/benchmark/FASTER.benchmark.csproj +++ b/cs/benchmark/FASTER.benchmark.csproj @@ -36,7 +36,7 @@ - + diff --git a/cs/playground/FasterLogSample/App.config b/cs/playground/FasterLogSample/App.config new file mode 100644 index 000000000..d69a9b153 --- /dev/null +++ b/cs/playground/FasterLogSample/App.config @@ -0,0 +1,6 @@ + + + + + + diff --git a/cs/playground/FasterLogSample/FasterLogSample.csproj b/cs/playground/FasterLogSample/FasterLogSample.csproj new file mode 100644 index 000000000..c9e13fb4b --- /dev/null +++ b/cs/playground/FasterLogSample/FasterLogSample.csproj @@ -0,0 +1,39 @@ + + + + netcoreapp2.2 + x64 + preview + win7-x64;linux-x64 + + + + Exe + true + StructSample + prompt + PackageReference + true + + + + TRACE;DEBUG + full + true + bin\x64\Debug\ + + + TRACE + pdbonly + true + bin\x64\Release\ + + + + + + + + + + \ No newline at end of file diff --git a/cs/playground/FasterLogSample/Program.cs b/cs/playground/FasterLogSample/Program.cs new file mode 100644 index 000000000..cdee54937 --- /dev/null +++ b/cs/playground/FasterLogSample/Program.cs @@ -0,0 +1,267 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using FASTER.core; + +namespace FasterLogSample +{ + public class Program + { + // Entry length can be between 1 and ((1 << FasterLogSettings.PageSizeBits) - 4) + const int entryLength = 1 << 10; + static readonly byte[] staticEntry = new byte[entryLength]; + static FasterLog log; + static FasterLogScanIterator iter; + + /// + /// Main program entry point + /// + static void Main() + { + bool sync = true; + var device = Devices.CreateLogDevice("D:\\logs\\hlog.log"); + log = new FasterLog(new FasterLogSettings { LogDevice = device }); + + // Populate entry being inserted + for (int i = 0; i < entryLength; i++) + { + staticEntry[i] = (byte)i; + } + + if (sync) + { + // Log writer thread: create as many as needed + new Thread(new ThreadStart(LogWriterThread)).Start(); + + // Threads for scan, reporting, commit + new Thread(new ThreadStart(ScanThread)).Start(); + new Thread(new ThreadStart(ReportThread)).Start(); + new Thread(new ThreadStart(CommitThread)).Start(); + } + else + { + // Async version of demo: expect lower performance + // particularly for small payload sizes + + const int NumParallelTasks = 10_000; + ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount); + TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) => + { + Console.WriteLine($"Unobserved task exception: {e.Exception}"); + e.SetObserved(); + }; + + Task[] tasks = new Task[NumParallelTasks]; + for (int i = 0; i < NumParallelTasks; i++) + { + int local = i; + tasks[i] = Task.Run(() => AsyncLogWriter(local)); + } + + var scan = Task.Run(() => AsyncScan()); + + // Threads for reporting, commit + new Thread(new ThreadStart(ReportThread)).Start(); + new Thread(new ThreadStart(CommitThread)).Start(); + + Task.WaitAll(tasks); + Task.WaitAll(scan); + } + } + + + static void LogWriterThread() + { + while (true) + { + // TryEnqueue - can be used with throttling/back-off + // Accepts byte[] and ReadOnlySpan + while (!log.TryEnqueue(staticEntry, out _)) ; + + // Synchronous blocking enqueue + // Accepts byte[] and ReadOnlySpan + // log.Enqueue(entry); + + // Batched enqueue - batch must fit on one page + // Add this to class: + // static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10); + // while (!log.TryEnqueue(spanBatch, out _)) ; + } + } + + /// + /// Async version of enqueue + /// + static async Task AsyncLogWriter(int id) + { + bool batched = false; + + await Task.Yield(); + + if (!batched) + { + // Single commit version - append each item and wait for commit + // Needs high parallelism (NumParallelTasks) for perf + // Needs separate commit thread to perform regular commit + // Otherwise we commit only at page boundaries + while (true) + { + try + { + await log.EnqueueAndWaitForCommitAsync(staticEntry); + } + catch (Exception ex) + { + Console.WriteLine($"{nameof(AsyncLogWriter)}({id}): {ex}"); + } + } + } + else + { + // Batched version - we enqueue many entries to memory, + // then wait for commit periodically + int count = 0; + while (true) + { + await log.EnqueueAsync(staticEntry); + if (count++ % 100 == 0) + { + await log.WaitForCommitAsync(); + } + } + } + } + + static void ScanThread() + { + Random r = new Random(); + byte[] result; + + using (iter = log.Scan(log.BeginAddress, long.MaxValue)) + { + while (true) + { + while (!iter.GetNext(out result, out int length)) + { + // For finite end address, check if iteration ended + // if (iter.CurrentAddress >= endAddress) return; + iter.WaitAsync().GetAwaiter().GetResult(); + } + + // Memory pool variant: + // iter.GetNext(pool, out IMemoryOwner resultMem, out int length)) + + if (Different(result, staticEntry, out int location)) + throw new Exception("Invalid entry found"); + + // Re-insert entry with small probability + if (r.Next(100) < 10) + { + log.Enqueue(result); + } + + // Example of random read from given address + // (result, _) = log.ReadAsync(iter.CurrentAddress).GetAwaiter().GetResult(); + + log.TruncateUntil(iter.NextAddress); + } + } + + // Example of recoverable (named) iterator: + // using (iter = log.Scan(log.BeginAddress, long.MaxValue, "foo")) + } + + static async Task AsyncScan() + { + using (iter = log.Scan(log.BeginAddress, long.MaxValue)) + await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable()) + { + if (Different(result, staticEntry, out int location)) + throw new Exception("Invalid entry found"); + log.TruncateUntil(iter.NextAddress); + } + } + + static void ReportThread() + { + long lastTime = 0; + long lastValue = log.TailAddress; + long lastIterValue = log.BeginAddress; + + Stopwatch sw = new Stopwatch(); + sw.Start(); + + while (true) + { + Thread.Sleep(5000); + + var nowTime = sw.ElapsedMilliseconds; + var nowValue = log.TailAddress; + + Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}", + (nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue); + lastValue = nowValue; + + if (iter != null) + { + var nowIterValue = iter.CurrentAddress; + Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}", + (nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue); + lastIterValue = nowIterValue; + } + + lastTime = nowTime; + } + } + + static void CommitThread() + { + //Task prevCommitTask = null; + while (true) + { + Thread.Sleep(5); + log.Commit(true); + + // Async version + // await log.CommitAsync(); + + // Async version that catches all commit failures in between + //try + //{ + // prevCommitTask = await log.CommitAsync(prevCommitTask); + //} + //catch (CommitFailureException e) + //{ + // Console.WriteLine(e); + // prevCommitTask = e.LinkedCommitInfo.nextTcs.Task; + //} + } + } + + private static bool Different(byte[] b1, byte[] b2, out int location) + { + location = 0; + if (b1.Length != b2.Length) return true; + for (location = 0; location < b1.Length; location++) + { + if (b1[location] != b2[location]) + { + return true; + } + } + return false; + } + + private struct ReadOnlySpanBatch : IReadOnlySpanBatch + { + private readonly int batchSize; + public ReadOnlySpanBatch(int batchSize) => this.batchSize = batchSize; + public ReadOnlySpan Get(int index) => staticEntry; + public int TotalEntries() => batchSize; + } + } +} diff --git a/cs/playground/FasterLogSample/Properties/AssemblyInfo.cs b/cs/playground/FasterLogSample/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..5e08438c2 --- /dev/null +++ b/cs/playground/FasterLogSample/Properties/AssemblyInfo.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyDescription("")] +[assembly: AssemblyCopyright("Copyright © 2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")] diff --git a/cs/playground/StructSampleCore/StructSampleCore.csproj b/cs/playground/StructSampleCore/StructSampleCore.csproj index 06d9ad194..eb5638298 100644 --- a/cs/playground/StructSampleCore/StructSampleCore.csproj +++ b/cs/playground/StructSampleCore/StructSampleCore.csproj @@ -1,7 +1,7 @@  - netcoreapp2.0 + netcoreapp2.2 x64 win7-x64;linux-x64 true diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 371cfad69..0bd293ccf 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Runtime.InteropServices; using System.Diagnostics; +using System.Collections.Generic; namespace FASTER.core { @@ -19,18 +20,7 @@ internal struct FullPageStatus [FieldOffset(0)] public long LastFlushedUntilAddress; [FieldOffset(8)] - public FlushCloseStatus PageFlushCloseStatus; - } - - [StructLayout(LayoutKind.Explicit)] - internal struct FlushCloseStatus - { - [FieldOffset(0)] - public PMMFlushStatus PageFlushStatus; - [FieldOffset(4)] - public PMMCloseStatus PageCloseStatus; - [FieldOffset(0)] - public long value; + public long LastClosedUntilAddress; } [StructLayout(LayoutKind.Explicit)] @@ -49,7 +39,7 @@ internal struct PageOffset /// /// /// - public unsafe abstract class AllocatorBase : IDisposable + public unsafe abstract partial class AllocatorBase : IDisposable where Key : new() where Value : new() { @@ -116,16 +106,8 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// HeadOffset lag (from tail) /// - protected const int HeadOffsetLagNumPages = 4; + protected readonly bool HeadOffsetExtraLag; - /// - /// HeadOffset lag (from tail) for ReadCache - /// - protected const int ReadCacheHeadOffsetLagNumPages = 1; - /// - /// HeadOffset lag size - /// - protected readonly int HeadOffsetLagSize; /// /// HeadOFfset lag address /// @@ -168,6 +150,11 @@ public unsafe abstract class AllocatorBase : IDisposable /// public long FlushedUntilAddress; + /// + /// Flushed until address + /// + public long ClosedUntilAddress; + /// /// Begin address /// @@ -187,13 +174,10 @@ public unsafe abstract class AllocatorBase : IDisposable #endregion #region Private page metadata - /// - /// Index in circular buffer, of the current tail page - /// - private volatile int TailPageIndex; // Array that indicates the status of each buffer page internal readonly FullPageStatus[] PageStatusIndicator; + internal readonly PendingFlushList[] PendingFlush; /// /// Global address of the current tail (next element to be allocated from the circular buffer) @@ -221,6 +205,16 @@ public unsafe abstract class AllocatorBase : IDisposable /// protected readonly Action EvictCallback = null; + /// + /// Flush callback + /// + protected readonly Action FlushCallback = null; + + /// + /// Error handling + /// + private readonly ErrorList errorList = new ErrorList(); + /// /// Observer for records entering read-only region /// @@ -380,7 +374,8 @@ public unsafe abstract class AllocatorBase : IDisposable /// Clear page /// /// Page number to be cleared - protected abstract void ClearPage(long page); + /// Offset to clear from (if partial clear) + protected abstract void ClearPage(long page, int offset = 0); /// /// Write page (async) /// @@ -469,13 +464,15 @@ public unsafe abstract class AllocatorBase : IDisposable /// /// /// - public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback, LightEpoch epoch) + /// + public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback, LightEpoch epoch, Action flushCallback) { if (evictCallback != null) { ReadCache = true; EvictCallback = evictCallback; } + FlushCallback = flushCallback; this.comparer = comparer; if (epoch == null) @@ -501,8 +498,10 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer BufferSizeMask = BufferSize - 1; // HeadOffset lag (from tail). - HeadOffsetLagSize = BufferSize - (ReadCache ? ReadCacheHeadOffsetLagNumPages : HeadOffsetLagNumPages); - HeadOffsetLagAddress = (long)HeadOffsetLagSize << LogPageSizeBits; + var headOffsetLagSize = BufferSize - 1; // (ReadCache ? ReadCacheHeadOffsetLagNumPages : HeadOffsetLagNumPages); + if (BufferSize > 1 && HeadOffsetExtraLag) headOffsetLagSize--; + + HeadOffsetLagAddress = (long)headOffsetLagSize << LogPageSizeBits; // ReadOnlyOffset lag (from tail). This should not exceed HeadOffset lag. LogMutableFraction = settings.MutableFraction; @@ -513,12 +512,18 @@ public AllocatorBase(LogSettings settings, IFasterEqualityComparer comparer SegmentSize = 1 << LogSegmentSizeBits; SegmentBufferSize = 1 + (LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize)); - if (BufferSize < 16) + if (SegmentSize < PageSize) + throw new Exception("Segment must be at least of page size"); + + if (BufferSize < 1) { - throw new Exception("HLOG buffer must be at least 16 pages"); + throw new Exception("Log buffer must be of size at least 1 page"); } PageStatusIndicator = new FullPageStatus[BufferSize]; + PendingFlush = new PendingFlushList[BufferSize]; + for (int i = 0; i < BufferSize; i++) + PendingFlush[i] = new PendingFlushList(); device = settings.LogDevice; sectorSize = (int)device.SectorSize; @@ -540,20 +545,22 @@ protected void Initialize(long firstValidAddress) AllocatePage(tailPageIndex); // Allocate next page as well - if (firstValidAddress > 0) - AllocatePage(tailPageIndex + 1); + int nextPageIndex = (int)(tailPage + 1) % BufferSize; + if ((!IsAllocated(nextPageIndex))) + { + AllocatePage(nextPageIndex); + } SafeReadOnlyAddress = firstValidAddress; ReadOnlyAddress = firstValidAddress; SafeHeadAddress = firstValidAddress; HeadAddress = firstValidAddress; + ClosedUntilAddress = firstValidAddress; FlushedUntilAddress = firstValidAddress; BeginAddress = firstValidAddress; TailPageOffset.Page = (int)(firstValidAddress >> LogPageSizeBits); TailPageOffset.Offset = (int)(firstValidAddress & PageSizeMask); - - TailPageIndex = 0; } /// @@ -615,6 +622,11 @@ public long GetSegmentSize() public long GetTailAddress() { var local = TailPageOffset; + if (local.Offset >= PageSize) + { + local.Page++; + local.Offset = 0; + } return ((long)local.Page << LogPageSizeBits) | (uint)local.Offset; } @@ -677,15 +689,6 @@ public long GetOffsetInPage(long address) return address & PageSizeMask; } - /// - /// Get offset lag in pages - /// - /// - public long GetHeadOffsetLagInPages() - { - return HeadOffsetLagSize; - } - /// /// Get sector size for main hlog device /// @@ -696,15 +699,24 @@ public int GetDeviceSectorSize() } /// - /// Key function used to allocate memory for a specified number of items + /// Try allocate, no thread spinning allowed + /// May return 0 in case of inability to allocate /// /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public long Allocate(int numSlots = 1) + public long TryAllocate(int numSlots = 1) { + if (numSlots > PageSize) + throw new Exception("Entry does not fit on page"); + PageOffset localTailPageOffset = default(PageOffset); + // Necessary to check because threads keep retrying and we do not + // want to overflow offset more than once per thread + if (TailPageOffset.Offset > PageSize) + return 0; + // Determine insertion index. // ReSharper disable once CSharpWarnings::CS0420 #pragma warning disable 420 @@ -715,162 +727,69 @@ public long Allocate(int numSlots = 1) int offset = localTailPageOffset.Offset - numSlots; #region HANDLE PAGE OVERFLOW - /* To prove correctness of the following modifications - * done to TailPageOffset and the allocation itself, - * we should use the fact that only one thread will have any - * of the following cases since it is a counter and we spin-wait - * until the tail is folded onto next page accordingly. - */ - if (localTailPageOffset.Offset >= PageSize) + if (localTailPageOffset.Offset > PageSize) { - if (offset >= PageSize) + if (offset > PageSize) { - //The tail offset value was more than page size before atomic add - //We consider that a failed attempt and retry again - var spin = new SpinWait(); - do - { - //Just to give some more time to the thread - // that is handling this overflow - while (TailPageOffset.Offset >= PageSize) - { - spin.SpinOnce(); - } - - // ReSharper disable once CSharpWarnings::CS0420 -#pragma warning disable 420 - localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots); -#pragma warning restore 420 - - page = localTailPageOffset.Page; - offset = localTailPageOffset.Offset - numSlots; - } while (offset >= PageSize); + return 0; } + // The thread that "makes" the offset incorrect + // is the one that is elected to fix it and + // shift read-only/head. - if (localTailPageOffset.Offset == PageSize) - { - //Folding over at page boundary - localTailPageOffset.Page++; - localTailPageOffset.Offset = 0; - TailPageOffset = localTailPageOffset; - } - else if (localTailPageOffset.Offset >= PageSize) - { - //Overflows not allowed. We allot same space in next page. - localTailPageOffset.Page++; - localTailPageOffset.Offset = numSlots; - TailPageOffset = localTailPageOffset; + long shiftAddress = ((long)(localTailPageOffset.Page + 1)) << LogPageSizeBits; + PageAlignedShiftReadOnlyAddress(shiftAddress); + PageAlignedShiftHeadAddress(shiftAddress); - page = localTailPageOffset.Page; - offset = 0; + if (CannotAllocate(localTailPageOffset.Page + 1)) + { + // We should not allocate the next page; reset to end of page + // so that next attempt can retry + localTailPageOffset.Offset = PageSize; + Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); + return 0; } - } - #endregion - - long address = (((long)page) << LogPageSizeBits) | ((long)offset); - - // Check if TailPageIndex is appropriate and allocated! - int pageIndex = page % BufferSize; - - if (TailPageIndex == pageIndex) - { - return (address); - } - - //Invert the address if either the previous page is not flushed or if it is null - if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) || - (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) || - (!IsAllocated(pageIndex))) - { - address = -address; - } - // Update the read-only so that we can get more space for the tail - if (offset == 0) - { - if (address >= 0) + // Allocate next page in advance, if needed + int nextPageIndex = (localTailPageOffset.Page + 2) % BufferSize; + if ((!IsAllocated(nextPageIndex))) { - TailPageIndex = pageIndex; - Interlocked.MemoryBarrier(); + AllocatePage(nextPageIndex); } - long newPage = page + 1; - int newPageIndex = (int)((page + 1) % BufferSize); - - long tailAddress = (address < 0 ? -address : address); - PageAlignedShiftReadOnlyAddress(tailAddress); - PageAlignedShiftHeadAddress(tailAddress); + localTailPageOffset.Page++; + localTailPageOffset.Offset = 0; + TailPageOffset = localTailPageOffset; - if ((!IsAllocated(newPageIndex))) - { - AllocatePage(newPageIndex); - } + return 0; } + #endregion - return (address); + return (((long)page) << LogPageSizeBits) | ((long)offset); } - /// - /// If allocator cannot allocate new memory as the head has not shifted or the previous page - /// is not yet closed, it allocates but returns the negative address. - /// This function is invoked to check if the address previously allocated has become valid to be used - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void CheckForAllocateComplete(ref long address) + private bool CannotAllocate(int page) { - if (address >= 0) - { - throw new Exception("Address already allocated!"); - } - - PageOffset p = default(PageOffset); - p.Page = (int)((-address) >> LogPageSizeBits); - p.Offset = (int)((-address) & PageSizeMask); - - //Check write cache - int pageIndex = p.Page % BufferSize; - if (TailPageIndex == pageIndex) - { - address = -address; - return; - } - - //Check if we can move the head offset - long currentTailAddress = GetTailAddress(); - PageAlignedShiftHeadAddress(currentTailAddress); - - //Check if I can allocate pageIndex at all - if ((PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus != PMMFlushStatus.Flushed) || - (PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus != PMMCloseStatus.Closed) || - (!IsAllocated(pageIndex))) - { - return; - } - - //correct values and set write cache - address = -address; - if (p.Offset == 0) - { - TailPageIndex = pageIndex; - } - return; + return + (page >= BufferSize + (ClosedUntilAddress >> LogPageSizeBits)); } /// /// Used by applications to make the current state of the database immutable quickly /// /// - public void ShiftReadOnlyToTail(out long tailAddress) + public bool ShiftReadOnlyToTail(out long tailAddress) { tailAddress = GetTailAddress(); long localTailAddress = tailAddress; long currentReadOnlyOffset = ReadOnlyAddress; if (Utility.MonotonicUpdate(ref ReadOnlyAddress, tailAddress, out long oldReadOnlyOffset)) { - epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(localTailAddress, false)); + epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(localTailAddress)); + return true; } + return false; } /// @@ -881,7 +800,7 @@ public bool ShiftReadOnlyAddress(long newReadOnlyAddress) { if (Utility.MonotonicUpdate(ref ReadOnlyAddress, newReadOnlyAddress, out long oldReadOnlyOffset)) { - epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(newReadOnlyAddress, false)); + epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(newReadOnlyAddress)); return true; } return false; @@ -894,24 +813,32 @@ public bool ShiftReadOnlyAddress(long newReadOnlyAddress) public void ShiftBeginAddress(long newBeginAddress) { // First update the begin address - Utility.MonotonicUpdate(ref BeginAddress, newBeginAddress, out long oldBeginAddress); + var b = Utility.MonotonicUpdate(ref BeginAddress, newBeginAddress, out long oldBeginAddress); + b = b && (oldBeginAddress >> LogSegmentSizeBits != newBeginAddress >> LogSegmentSizeBits); + // Then the head address var h = Utility.MonotonicUpdate(ref HeadAddress, newBeginAddress, out long old); + // Finally the read-only address var r = Utility.MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out old); - // Clean up until begin address - epoch.BumpCurrentEpoch(() => + if (h || r || b) { - if (r) + epoch.Resume(); + // Clean up until begin address + epoch.BumpCurrentEpoch(() => { - Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out long _old); - Utility.MonotonicUpdate(ref FlushedUntilAddress, newBeginAddress, out _old); - } - if (h) OnPagesClosed(newBeginAddress); + if (r) + { + Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out long _old); + Utility.MonotonicUpdate(ref FlushedUntilAddress, newBeginAddress, out _old); + } + if (h) OnPagesClosed(newBeginAddress); - TruncateUntilAddress(newBeginAddress); - }); + if (b) TruncateUntilAddress(newBeginAddress); + }); + epoch.Suspend(); + } } /// @@ -928,28 +855,13 @@ protected virtual void TruncateUntilAddress(long toAddress) /// Flush: send page to secondary store /// /// - /// - public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress, bool waitForPendingFlushComplete = false) + public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress) { if (Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress)) { Debug.WriteLine("SafeReadOnly shifted from {0:X} to {1:X}", oldSafeReadOnlyAddress, newSafeReadOnlyAddress); - long startPage = oldSafeReadOnlyAddress >> LogPageSizeBits; - - long endPage = (newSafeReadOnlyAddress >> LogPageSizeBits); - OnReadOnlyObserver?.OnNext(Scan(oldSafeReadOnlyAddress, newSafeReadOnlyAddress, ScanBufferingMode.NoBuffering)); - - int numPages = (int)(endPage - startPage); - if (numPages > 10) - { - new Thread( - () => AsyncFlushPages(oldSafeReadOnlyAddress, newSafeReadOnlyAddress)).Start(); - } - else - { - AsyncFlushPages(oldSafeReadOnlyAddress, newSafeReadOnlyAddress); - } + AsyncFlushPages(oldSafeReadOnlyAddress, newSafeReadOnlyAddress); } } @@ -972,39 +884,41 @@ public void OnPagesClosed(long newSafeHeadAddress) return; } - int closePage = (int)((closePageAddress >> LogPageSizeBits) % BufferSize); - - if (!IsAllocated(closePage)) - { - AllocatePage(closePage); - } + int closePage = (int)(closePageAddress >> LogPageSizeBits); + int closePageIndex = closePage % BufferSize; - while (true) + if (!IsAllocated(closePageIndex)) + AllocatePage(closePageIndex); + else + ClearPage(closePage); + Utility.MonotonicUpdate(ref PageStatusIndicator[closePageIndex].LastClosedUntilAddress, closePageAddress + PageSize, out _); + ShiftClosedUntilAddress(); + if (ClosedUntilAddress > FlushedUntilAddress) { - var oldStatus = PageStatusIndicator[closePage].PageFlushCloseStatus; - if (oldStatus.PageFlushStatus == PMMFlushStatus.Flushed) - { - ClearPage(closePageAddress >> LogPageSizeBits); - } - else - { - throw new Exception("Error: page should already be flushed at this point"); - } - - var newStatus = oldStatus; - newStatus.PageCloseStatus = PMMCloseStatus.Closed; - if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[closePage].PageFlushCloseStatus.value, newStatus.value, oldStatus.value)) - { - break; - } + throw new Exception($"Closed address {ClosedUntilAddress} exceeds flushed address {FlushedUntilAddress}"); } - - // Necessary to propagate this change to other threads - Interlocked.MemoryBarrier(); } } } + private void DebugPrintAddresses(long closePageAddress) + { + var _flush = FlushedUntilAddress; + var _readonly = ReadOnlyAddress; + var _safereadonly = SafeReadOnlyAddress; + var _tail = GetTailAddress(); + var _head = HeadAddress; + var _safehead = SafeHeadAddress; + + Console.WriteLine("ClosePageAddress: {0}.{1}", GetPage(closePageAddress), GetOffsetInPage(closePageAddress)); + Console.WriteLine("FlushedUntil: {0}.{1}", GetPage(_flush), GetOffsetInPage(_flush)); + Console.WriteLine("Tail: {0}.{1}", GetPage(_tail), GetOffsetInPage(_tail)); + Console.WriteLine("Head: {0}.{1}", GetPage(_head), GetOffsetInPage(_head)); + Console.WriteLine("SafeHead: {0}.{1}", GetPage(_safehead), GetOffsetInPage(_safehead)); + Console.WriteLine("ReadOnly: {0}.{1}", GetPage(_readonly), GetOffsetInPage(_readonly)); + Console.WriteLine("SafeReadOnly: {0}.{1}", GetPage(_safereadonly), GetOffsetInPage(_safereadonly)); + } + /// /// Called every time a new tail page is allocated. Here the read-only is /// shifted only to page boundaries unlike ShiftReadOnlyToTail where shifting @@ -1089,18 +1003,62 @@ protected void ShiftFlushedUntilAddress() long page = GetPage(currentFlushedUntilAddress); bool update = false; - long pageLastFlushedAddress = Interlocked.Read(ref PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress); - while (pageLastFlushedAddress >= currentFlushedUntilAddress) + long pageLastFlushedAddress = PageStatusIndicator[page % BufferSize].LastFlushedUntilAddress; + while (pageLastFlushedAddress >= currentFlushedUntilAddress && currentFlushedUntilAddress >= (page << LogPageSizeBits)) { currentFlushedUntilAddress = pageLastFlushedAddress; update = true; page++; - pageLastFlushedAddress = Interlocked.Read(ref PageStatusIndicator[(int)(page % BufferSize)].LastFlushedUntilAddress); + pageLastFlushedAddress = PageStatusIndicator[page % BufferSize].LastFlushedUntilAddress; + } + + if (update) + { + if (Utility.MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress)) + { + uint errorCode = 0; + if (errorList.Count > 0) + { + errorCode = errorList.CheckAndWait(oldFlushedUntilAddress, currentFlushedUntilAddress); + } + FlushCallback?.Invoke( + new CommitInfo + { + BeginAddress = BeginAddress, + FromAddress = oldFlushedUntilAddress, + UntilAddress = currentFlushedUntilAddress, + ErrorCode = errorCode + }); + + if (errorList.Count > 0) + { + errorList.RemoveUntil(currentFlushedUntilAddress); + } + } + } + } + + /// + /// Shift ClosedUntil address + /// + protected void ShiftClosedUntilAddress() + { + long currentClosedUntilAddress = ClosedUntilAddress; + long page = GetPage(currentClosedUntilAddress); + + bool update = false; + long pageLastClosedAddress = PageStatusIndicator[page % BufferSize].LastClosedUntilAddress; + while (pageLastClosedAddress >= currentClosedUntilAddress && currentClosedUntilAddress >= (page << LogPageSizeBits)) + { + currentClosedUntilAddress = pageLastClosedAddress; + update = true; + page++; + pageLastClosedAddress = PageStatusIndicator[(int)(page % BufferSize)].LastClosedUntilAddress; } if (update) { - Utility.MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress); + Utility.MonotonicUpdate(ref ClosedUntilAddress, currentClosedUntilAddress, out long oldClosedUntilAddress); } } @@ -1116,35 +1074,27 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress) long offsetInPage = GetOffsetInPage(tailAddress); TailPageOffset.Page = (int)tailPage; TailPageOffset.Offset = (int)offsetInPage; - TailPageIndex = GetPageIndexForPage(TailPageOffset.Page); // allocate next page as well - this is an invariant in the allocator! var pageIndex = (TailPageOffset.Page % BufferSize); var nextPageIndex = (pageIndex + 1) % BufferSize; if (tailAddress > 0) - AllocatePage(nextPageIndex); + if (!IsAllocated(nextPageIndex)) + AllocatePage(nextPageIndex); BeginAddress = beginAddress; HeadAddress = headAddress; SafeHeadAddress = headAddress; + ClosedUntilAddress = headAddress; FlushedUntilAddress = tailAddress; ReadOnlyAddress = tailAddress; SafeReadOnlyAddress = tailAddress; - // ensure appropriate page status for all pages in memory - // note: they must have been read in previously during recovery - var addr = GetStartLogicalAddress(GetPage(headAddress)); - for (; addr < tailAddress; addr += PageSize) - { - pageIndex = GetPageIndexForAddress(addr); - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - } - // for the last page which contains tailoffset, it must be open pageIndex = GetPageIndexForAddress(tailAddress); - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Open; - PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; + + // clear the last page starting from tail address + ClearPage(pageIndex, (int)GetOffsetInPage(tailAddress)); // Printing debug info Debug.WriteLine("******* Recovered HybridLog Stats *******"); @@ -1187,6 +1137,33 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress) asyncResult); } + /// + /// Read record to memory - simple version + /// + /// + /// + /// + /// + internal void AsyncReadRecordToMemory(long fromLogical, int numBytes, IOCompletionCallback callback, ref SimpleReadContext context) + { + ulong fileOffset = (ulong)(AlignedPageSizeBytes * (fromLogical >> LogPageSizeBits) + (fromLogical & PageSizeMask)); + ulong alignedFileOffset = (ulong)(((long)fileOffset / sectorSize) * sectorSize); + + uint alignedReadLength = (uint)((long)fileOffset + numBytes - (long)alignedFileOffset); + alignedReadLength = (uint)((alignedReadLength + (sectorSize - 1)) & ~(sectorSize - 1)); + + context.record = bufferPool.Get((int)alignedReadLength); + context.record.valid_offset = (int)(fileOffset - alignedFileOffset); + context.record.available_bytes = (int)(alignedReadLength - (fileOffset - alignedFileOffset)); + context.record.required_bytes = numBytes; + + device.ReadAsync(alignedFileOffset, + (IntPtr)context.record.aligned_pointer, + alignedReadLength, + callback, + context); + } + /// /// Read pages from specified device /// @@ -1209,7 +1186,7 @@ public void AsyncReadPagesFromDevice( IDevice logDevice = null, IDevice objectLogDevice = null) { AsyncReadPagesFromDevice(readPageStart, numPages, untilAddress, callback, context, - out CountdownEvent completed, devicePageOffset, logDevice, objectLogDevice); + out _, devicePageOffset, logDevice, objectLogDevice); } /// @@ -1293,12 +1270,13 @@ public void AsyncFlushPages(long fromAddress, long untilAddress) long startPage = fromAddress >> LogPageSizeBits; long endPage = untilAddress >> LogPageSizeBits; int numPages = (int)(endPage - startPage); - long offsetInEndPage = GetOffsetInPage(untilAddress); + + long offsetInStartPage = GetOffsetInPage(fromAddress); + long offsetInEndPage = GetOffsetInPage(untilAddress); + + // Extra (partial) page being flushed if (offsetInEndPage > 0) - { numPages++; - } - /* Request asynchronous writes to the device. If waitForPendingFlushComplete * is set, then a CountDownEvent is set in the callback handle. @@ -1311,43 +1289,39 @@ public void AsyncFlushPages(long fromAddress, long untilAddress) var asyncResult = new PageAsyncFlushResult { page = flushPage, - count = 1 + count = 1, + partial = false, + fromAddress = pageStartAddress, + untilAddress = pageEndAddress }; - if (pageEndAddress > untilAddress || pageStartAddress < fromAddress) + if ( + ((fromAddress > pageStartAddress) && (fromAddress < pageEndAddress)) || + ((untilAddress > pageStartAddress) && (untilAddress < pageEndAddress)) + ) { asyncResult.partial = true; - asyncResult.fromAddress = pageStartAddress; - asyncResult.untilAddress = pageEndAddress; - if (pageEndAddress > untilAddress) + if (untilAddress < pageEndAddress) asyncResult.untilAddress = untilAddress; - if (pageStartAddress < fromAddress) + if (fromAddress > pageStartAddress) asyncResult.fromAddress = fromAddress; - + } - // Are we flushing until the end of page? - if (untilAddress >= pageEndAddress) + // Partial page starting point, need to wait until the + // ongoing adjacent flush is completed to ensure correctness + if (GetOffsetInPage(asyncResult.fromAddress) > 0) + { + // Enqueue work in shared queue + var index = GetPageIndexForAddress(asyncResult.fromAddress); + PendingFlush[index].Add(asyncResult); + if (PendingFlush[index].RemoveAdjacent(FlushedUntilAddress, out PageAsyncFlushResult request)) { - // Set status to in-progress - PageStatusIndicator[flushPage % BufferSize].PageFlushCloseStatus - = new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.InProgress, PageCloseStatus = PMMCloseStatus.Open }; + WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request); } } else - { - asyncResult.partial = false; - asyncResult.fromAddress = pageStartAddress; - asyncResult.untilAddress = pageEndAddress; - - // Set status to in-progress - PageStatusIndicator[flushPage % BufferSize].PageFlushCloseStatus - = new FlushCloseStatus { PageFlushStatus = PMMFlushStatus.InProgress, PageCloseStatus = PMMCloseStatus.Open }; - } - - Interlocked.Exchange(ref PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress, -1); - - WriteAsync(flushPage, AsyncFlushPageCallback, asyncResult); + WriteAsync(flushPage, AsyncFlushPageCallback, asyncResult); } } @@ -1495,6 +1469,8 @@ private void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, NativeOverl Overlapped.Free(overlap); } + // static DateTime last = DateTime.Now; + /// /// IOCompletion callback for page flush /// @@ -1508,33 +1484,35 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, NativeOverlap Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); } + /* + if (DateTime.Now - last > TimeSpan.FromSeconds(7)) + { + last = DateTime.Now; + errorCode = 1; + Console.WriteLine("Disk error"); + }*/ + + // Set the page status to flushed PageAsyncFlushResult result = (PageAsyncFlushResult)Overlapped.Unpack(overlap).AsyncResult; if (Interlocked.Decrement(ref result.count) == 0) { - if (!result.partial || (result.untilAddress >= ((result.page + 1) << LogPageSizeBits))) + if (errorCode != 0) { - while (true) - { - var oldStatus = PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus; - if (oldStatus.PageCloseStatus == PMMCloseStatus.Closed) - { - throw new Exception("Error: page should not be closed at this point"); - } - var newStatus = oldStatus; - newStatus.PageFlushStatus = PMMFlushStatus.Flushed; - if (oldStatus.value == Interlocked.CompareExchange(ref PageStatusIndicator[result.page % BufferSize].PageFlushCloseStatus.value, newStatus.value, oldStatus.value)) - { - break; - } - } + errorList.Add(result.fromAddress); } - Interlocked.Exchange(ref PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress, result.untilAddress); + Utility.MonotonicUpdate(ref PageStatusIndicator[result.page % BufferSize].LastFlushedUntilAddress, result.untilAddress, out _); ShiftFlushedUntilAddress(); result.Free(); } + var _flush = FlushedUntilAddress; + if (GetOffsetInPage(_flush) > 0 && PendingFlush[GetPage(_flush) % BufferSize].RemoveAdjacent(_flush, out PageAsyncFlushResult request)) + { + WriteAsync(request.fromAddress >> LogPageSizeBits, AsyncFlushPageCallback, request); + } + Overlapped.Free(overlap); } @@ -1579,5 +1557,10 @@ public virtual void ShallowCopy(ref Value src, ref Value dst) { dst = src; } + + private string PrettyPrint(long address) + { + return $"{GetPage(address)}:{GetOffsetInPage(address)}"; + } } } diff --git a/cs/src/core/Allocator/AsyncIOContext.cs b/cs/src/core/Allocator/AsyncIOContext.cs index 07c535593..f0ce5fae9 100644 --- a/cs/src/core/Allocator/AsyncIOContext.cs +++ b/cs/src/core/Allocator/AsyncIOContext.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; +using System.Threading; namespace FASTER.core { @@ -61,4 +62,19 @@ public void Dispose() record.Return(); } } + + internal class SimpleReadContext : IAsyncResult + { + public long logicalAddress; + public SectorAlignedMemory record; + public SemaphoreSlim completedRead; + + public object AsyncState => throw new NotImplementedException(); + + public WaitHandle AsyncWaitHandle => throw new NotImplementedException(); + + public bool CompletedSynchronously => throw new NotImplementedException(); + + public bool IsCompleted => throw new NotImplementedException(); + } } diff --git a/cs/src/core/Allocator/AtomicOwner.cs b/cs/src/core/Allocator/AtomicOwner.cs new file mode 100644 index 000000000..ad7de3824 --- /dev/null +++ b/cs/src/core/Allocator/AtomicOwner.cs @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Threading; +using System.Runtime.InteropServices; +using System; + +namespace FASTER.core +{ + [StructLayout(LayoutKind.Explicit)] + struct AtomicOwner + { + [FieldOffset(0)] + int owner; + [FieldOffset(4)] + int count; + [FieldOffset(0)] + long atomic; + + /// + /// Enqueue token + /// true: success + caller is new owner + /// false: success + someone else is owner + /// + /// + public bool Enqueue() + { + while (true) + { + var older = this; + var newer = older; + newer.count++; + if (older.owner == 0) + newer.owner = 1; + + if (Interlocked.CompareExchange(ref this.atomic, newer.atomic, older.atomic) == older.atomic) + { + return older.owner == 0; + } + } + } + + /// + /// Dequeue token (caller is/remains owner) + /// true: successful dequeue + /// false: failed dequeue + /// + /// + public bool Dequeue() + { + while (true) + { + var older = this; + var newer = older; + newer.count--; + + if (Interlocked.CompareExchange(ref this.atomic, newer.atomic, older.atomic) == older.atomic) + { + return newer.count > 0; + } + } + } + + /// + /// Release queue ownership + /// true: successful release + /// false: failed release + /// + /// + public bool Release() + { + while (true) + { + var older = this; + var newer = older; + + if (newer.count > 0) + return false; + + if (newer.owner == 0) + throw new Exception("Invalid release by non-owner thread"); + newer.owner = 0; + + if (Interlocked.CompareExchange(ref this.atomic, newer.atomic, older.atomic) == older.atomic) + { + return true; + } + } + } + } +} diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index e3339ff6b..a164e71ac 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -5,11 +5,6 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Runtime.InteropServices; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.IO; -using System.Diagnostics; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member @@ -31,8 +26,8 @@ public unsafe sealed class BlittableAllocator : AllocatorBase comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch) + public BlittableAllocator(LogSettings settings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null, Action flushCallback = null) + : base(settings, comparer, evictCallback, epoch, flushCallback) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; @@ -131,10 +126,6 @@ internal override void AllocatePage(int index) long p = (long)handles[index].AddrOfPinnedObject(); pointers[index] = (p + (sectorSize - 1)) & ~(sectorSize - 1); values[index] = tmp; - - PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - Interlocked.MemoryBarrier(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -198,9 +189,16 @@ public override long GetFirstValidLogicalAddress(long page) return page << LogPageSizeBits; } - protected override void ClearPage(long page) + protected override void ClearPage(long page, int offset) { - Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length); + if (offset == 0) + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + else + { + // Adjust array offset for cache alignment + offset += (int)(pointers[page % BufferSize] - (long)handles[page % BufferSize].AddrOfPinnedObject()); + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + } } /// @@ -337,6 +335,7 @@ public override IFasterScanIterator Scan(long beginAddress, long end /// /// /// + /// internal void AsyncReadPagesFromDeviceToFrame( long readPageStart, int numPages, @@ -346,7 +345,9 @@ internal void AsyncReadPagesFromDeviceToFrame( BlittableFrame frame, out CountdownEvent completed, long devicePageOffset = 0, - IDevice device = null, IDevice objectLogDevice = null) + IDevice device = null, + IDevice objectLogDevice = null, + CancellationTokenSource cts = null) { var usedDevice = device; IDevice usedObjlogDevice = objectLogDevice; @@ -373,7 +374,8 @@ internal void AsyncReadPagesFromDeviceToFrame( page = readPage, context = context, handle = completed, - frame = frame + frame = frame, + cts = cts }; ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage); diff --git a/cs/src/core/Allocator/ErrorList.cs b/cs/src/core/Allocator/ErrorList.cs new file mode 100644 index 000000000..59d8b48ae --- /dev/null +++ b/cs/src/core/Allocator/ErrorList.cs @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Collections.Generic; +using System.Threading; + +namespace FASTER.core +{ + class ErrorList + { + private readonly List errorList; + + public ErrorList() => errorList = new List(); + + public void Add(long address) + { + lock (errorList) + errorList.Add(address); + } + + public uint CheckAndWait(long oldFlushedUntilAddress, long currentFlushedUntilAddress) + { + bool done = false; + uint errorCode = 0; + while (!done) + { + done = true; + lock (errorList) + { + for (int i = 0; i < errorList.Count; i++) + { + if (errorList[i] >= oldFlushedUntilAddress && errorList[i] < currentFlushedUntilAddress) + { + errorCode = 1; + } + else if (errorList[i] < oldFlushedUntilAddress) + { + done = false; // spin barrier for other threads during exception + Thread.Yield(); + } + } + } + } + return errorCode; + } + + public void RemoveUntil(long currentFlushedUntilAddress) + { + lock (errorList) + { + for (int i = 0; i < errorList.Count; i++) + { + if (errorList[i] < currentFlushedUntilAddress) + { + errorList.RemoveAt(i); + } + } + } + + } + public int Count => errorList.Count; + } +} diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index ccea80368..23b747b04 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -41,8 +41,8 @@ public unsafe sealed class GenericAllocator : AllocatorBase(); private readonly bool valueBlittable = Utility.IsBlittable(); - public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch) + public GenericAllocator(LogSettings settings, SerializerSettings serializerSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null, Action flushCallback = null) + : base(settings, comparer, evictCallback, epoch, flushCallback) { SerializerSettings = serializerSettings; @@ -200,9 +200,6 @@ internal override void DeleteFromMemory() internal override void AllocatePage(int index) { values[index] = AllocatePage(); - PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - Interlocked.MemoryBarrier(); } internal Record[] AllocatePage() @@ -254,9 +251,9 @@ protected override void WriteAsyncToDevice - protected override void ClearPage(long page) + protected override void ClearPage(long page, int offset) { - Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length); + Array.Clear(values[page % BufferSize], offset / recordSize, values[page % BufferSize].Length - offset / recordSize); // Close segments var thisCloseSegment = page >> (LogSegmentSizeBits - LogPageSizeBits); diff --git a/cs/src/core/Allocator/PendingFlushList.cs b/cs/src/core/Allocator/PendingFlushList.cs new file mode 100644 index 000000000..0896481aa --- /dev/null +++ b/cs/src/core/Allocator/PendingFlushList.cs @@ -0,0 +1,56 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Threading; + +namespace FASTER.core +{ + class PendingFlushList + { + const int maxSize = 8; + const int maxRetries = 10; + public PageAsyncFlushResult[] list; + + public PendingFlushList() + { + list = new PageAsyncFlushResult[maxSize]; + } + + public void Add(PageAsyncFlushResult t) + { + int retries = 0; + do + { + for (int i = 0; i < maxSize; i++) + { + if (list[i] == default) + { + if (Interlocked.CompareExchange(ref list[i], t, default) == default) + { + return; + } + } + } + } while (retries++ < maxRetries); + throw new Exception("Unable to add item to list"); + } + + public bool RemoveAdjacent(long address, out PageAsyncFlushResult request) + { + for (int i=0; i : Alloca internal readonly IVariableLengthStruct KeyLength; internal readonly IVariableLengthStruct ValueLength; - public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStructSettings vlSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null) - : base(settings, comparer, evictCallback, epoch) + public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStructSettings vlSettings, IFasterEqualityComparer comparer, Action evictCallback = null, LightEpoch epoch = null, Action flushCallback = null) + : base(settings, comparer, evictCallback, epoch, flushCallback) { values = new byte[BufferSize][]; handles = new GCHandle[BufferSize]; @@ -215,10 +215,6 @@ internal override void AllocatePage(int index) long p = (long)handles[index].AddrOfPinnedObject(); pointers[index] = (p + (sectorSize - 1)) & ~(sectorSize - 1); values[index] = tmp; - - PageStatusIndicator[index].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed; - PageStatusIndicator[index].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Closed; - Interlocked.MemoryBarrier(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -282,9 +278,16 @@ public override long GetFirstValidLogicalAddress(long page) return page << LogPageSizeBits; } - protected override void ClearPage(long page) + protected override void ClearPage(long page, int offset) { - Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length); + if (offset == 0) + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + else + { + // Adjust array offset for cache alignment + offset += (int)(pointers[page % BufferSize] - (long)handles[page % BufferSize].AddrOfPinnedObject()); + Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset); + } } /// diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs index 14a975445..4bfa0a14f 100644 --- a/cs/src/core/Device/Devices.cs +++ b/cs/src/core/Device/Devices.cs @@ -26,9 +26,10 @@ public static class Devices /// Path to file that will store the log (empty for null device) /// Whether we try to preallocate the file on creation /// Delete files on close - /// + /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + /// Whether to recover device metadata from existing files /// Device instance - public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED) + public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false) { if (string.IsNullOrWhiteSpace(logPath)) return new NullDevice(); @@ -38,12 +39,12 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = tru #if DOTNETCORE if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { - logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity); + logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity, recoverDevice); } else #endif { - logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity: capacity); + logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice); } return logDevice; } diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index c034390fa..aa6d0aa9d 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -4,6 +4,7 @@ using Microsoft.Win32.SafeHandles; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Runtime.InteropServices; @@ -24,16 +25,18 @@ public class LocalStorageDevice : StorageDeviceBase /// /// Constructor /// - /// + /// File name (or prefix) with path /// /// /// /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + /// Whether to recover device metadata from existing files public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, - long capacity = Devices.CAPACITY_UNSPECIFIED) + long capacity = Devices.CAPACITY_UNSPECIFIED, + bool recoverDevice = false) : base(filename, GetSectorSize(filename), capacity) { @@ -42,7 +45,8 @@ public LocalStorageDevice(string filename, this.deleteOnClose = deleteOnClose; this.disableFileBuffering = disableFileBuffering; logHandles = new SafeConcurrentDictionary(); - RecoverFiles(); + if (recoverDevice) + RecoverFiles(); } private void RecoverFiles() @@ -53,14 +57,19 @@ private void RecoverFiles() string bareName = fi.Name; - int prevSegmentId = -1; + List segids = new List(); foreach (FileInfo item in di.GetFiles(bareName + "*")) { - int segmentId = Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")); + segids.Add(Int32.Parse(item.Name.Replace(bareName, "").Replace(".", ""))); + } + segids.Sort(); + + int prevSegmentId = -1; + foreach (int segmentId in segids) + { if (segmentId != prevSegmentId + 1) { startSegment = segmentId; - } else { diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 35ffebeb9..034c30473 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -4,6 +4,7 @@ using Microsoft.Win32.SafeHandles; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Runtime.InteropServices; @@ -24,11 +25,12 @@ public class ManagedLocalStorageDevice : StorageDeviceBase /// /// /// - /// + /// File name (or prefix) with path /// /// - /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit - public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED) + /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + /// Whether to recover device metadata from existing files + public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED, bool recoverDevice = false) : base(filename, GetSectorSize(filename), capacity) { pool = new SectorAlignedBufferPool(1, 1); @@ -36,7 +38,8 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; logHandles = new ConcurrentDictionary(); - RecoverFiles(); + if (recoverDevice) + RecoverFiles(); } @@ -48,14 +51,19 @@ private void RecoverFiles() string bareName = fi.Name; - int prevSegmentId = -1; + List segids = new List(); foreach (FileInfo item in di.GetFiles(bareName + "*")) { - int segmentId = Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")); + segids.Add(Int32.Parse(item.Name.Replace(bareName, "").Replace(".", ""))); + } + segids.Sort(); + + int prevSegmentId = -1; + foreach (int segmentId in segids) + { if (segmentId != prevSegmentId + 1) { startSegment = segmentId; - } else { @@ -68,16 +76,19 @@ private void RecoverFiles() + class ReadCallbackWrapper { + readonly Stream logHandle; readonly IOCompletionCallback callback; readonly IAsyncResult asyncResult; SectorAlignedMemory memory; readonly IntPtr destinationAddress; readonly uint readLength; - public ReadCallbackWrapper(IOCompletionCallback callback, IAsyncResult asyncResult, SectorAlignedMemory memory, IntPtr destinationAddress, uint readLength) + public ReadCallbackWrapper(Stream logHandle, IOCompletionCallback callback, IAsyncResult asyncResult, SectorAlignedMemory memory, IntPtr destinationAddress, uint readLength) { + this.logHandle = logHandle; this.callback = callback; this.asyncResult = asyncResult; this.memory = memory; @@ -87,34 +98,56 @@ public ReadCallbackWrapper(IOCompletionCallback callback, IAsyncResult asyncResu public unsafe void Callback(IAsyncResult result) { - fixed (void* source = memory.buffer) + uint errorCode = 0; + try { - Buffer.MemoryCopy(source, (void*)destinationAddress, readLength, readLength); + logHandle.EndRead(result); + fixed (void* source = memory.buffer) + { + Buffer.MemoryCopy(source, (void*)destinationAddress, readLength, readLength); + } } + catch + { + errorCode = 1; + } + memory.Return(); Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult); - callback(0, 0, ov.UnsafePack(callback, IntPtr.Zero)); + callback(errorCode, 0, ov.UnsafePack(callback, IntPtr.Zero)); } } class WriteCallbackWrapper { + readonly Stream logHandle; readonly IOCompletionCallback callback; readonly IAsyncResult asyncResult; SectorAlignedMemory memory; - public WriteCallbackWrapper(IOCompletionCallback callback, IAsyncResult asyncResult, SectorAlignedMemory memory) + public WriteCallbackWrapper(Stream logHandle, IOCompletionCallback callback, IAsyncResult asyncResult, SectorAlignedMemory memory) { this.callback = callback; this.asyncResult = asyncResult; this.memory = memory; + this.logHandle = logHandle; } public unsafe void Callback(IAsyncResult result) { + uint errorCode = 0; + try + { + logHandle.EndWrite(result); + } + catch + { + errorCode = 1; + } + memory.Return(); Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult); - callback(0, 0, ov.UnsafePack(callback, IntPtr.Zero)); + callback(errorCode, 0, ov.UnsafePack(callback, IntPtr.Zero)); } } @@ -137,7 +170,7 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, var memory = pool.Get((int)readLength); logHandle.Seek((long)sourceAddress, SeekOrigin.Begin); logHandle.BeginRead(memory.buffer, 0, (int)readLength, - new ReadCallbackWrapper(callback, asyncResult, memory, destinationAddress, readLength).Callback, null); + new ReadCallbackWrapper(logHandle, callback, asyncResult, memory, destinationAddress, readLength).Callback, null); } /// @@ -165,7 +198,7 @@ public override unsafe void WriteAsync(IntPtr sourceAddress, } logHandle.Seek((long)destinationAddress, SeekOrigin.Begin); logHandle.BeginWrite(memory.buffer, 0, (int)numBytesToWrite, - new WriteCallbackWrapper(callback, asyncResult, memory).Callback, null); + new WriteCallbackWrapper(logHandle, callback, asyncResult, memory).Callback, null); } /// diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index 1c84d708c..7ab14ab82 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -233,7 +233,7 @@ public void TruncateUntilSegment(int toSegment) public virtual void TruncateUntilAddressAsync(long toAddress, AsyncCallback callback, IAsyncResult result) { // Truncate only up to segment boundary if address is not aligned - TruncateUntilSegmentAsync((int)toAddress >> segmentSizeBits, callback, result); + TruncateUntilSegmentAsync((int)(toAddress >> segmentSizeBits), callback, result); } /// diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs index e9f53656f..7bc2e2e30 100644 --- a/cs/src/core/Epochs/FastThreadLocal.cs +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Net; using System.Threading; namespace FASTER.core @@ -16,18 +17,25 @@ internal class FastThreadLocal private const int kMaxInstances = 128; [ThreadStatic] - private static T[] values; + private static T[] tl_values; + [ThreadStatic] + private static int[] tl_iid; + + private readonly int offset; + private readonly int iid; - private readonly int id; private static readonly int[] instances = new int[kMaxInstances]; + private static int instanceId = 0; public FastThreadLocal() { + iid = Interlocked.Increment(ref instanceId); + for (int i = 0; i < kMaxInstances; i++) { - if (0 == Interlocked.CompareExchange(ref instances[i], 1, 0)) + if (0 == Interlocked.CompareExchange(ref instances[i], iid, 0)) { - id = i; + offset = i; return; } } @@ -36,22 +44,22 @@ public FastThreadLocal() public void InitializeThread() { - if (values == null) - values = new T[kMaxInstances]; + if (tl_values == null) + { + tl_values = new T[kMaxInstances]; + tl_iid = new int[kMaxInstances]; + } + if (tl_iid[offset] != iid) + { + tl_iid[offset] = iid; + tl_values[offset] = default(T); + } } public void DisposeThread() { - Value = default(T); - - // Dispose values only if there are no other - // instances active for this thread - for (int i = 0; i < kMaxInstances; i++) - { - if ((instances[i] == 1) && (i != id)) - return; - } - values = null; + tl_values[offset] = default(T); + tl_iid[offset] = 0; } /// @@ -59,15 +67,15 @@ public void DisposeThread() /// public void Dispose() { - instances[id] = 0; + instances[offset] = 0; } public T Value { - get => values[id]; - set => values[id] = value; + get => tl_values[offset]; + set => tl_values[offset] = value; } - public bool IsInitializedForThread => values != null; + public bool IsInitializedForThread => (tl_values != null) && (iid == tl_iid[offset]); } -} +} \ No newline at end of file diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 2cd7f6232..5d7b789e4 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -9,9 +9,8 @@ namespace FASTER.core { - /// - /// + /// Epoch protection /// public unsafe class LightEpoch { @@ -37,6 +36,10 @@ public unsafe class LightEpoch private GCHandle tableHandle; private Entry* tableAligned; + private static Entry[] threadIndex; + private static GCHandle threadIndexHandle; + private static Entry* threadIndexAligned; + /// /// List of action, epoch pairs containing actions to performed /// when an epoch becomes safe to reclaim. @@ -45,14 +48,19 @@ public unsafe class LightEpoch private readonly EpochActionPair[] drainList = new EpochActionPair[kDrainListSize]; /// - /// Number of entries in the epoch table + /// A thread's entry in the epoch table. /// - private int numEntries; + [ThreadStatic] + private static int threadEntryIndex; /// - /// A thread's entry in the epoch table. + /// Number of instances using this entry /// - private FastThreadLocal threadEntryIndex; + [ThreadStatic] + private static int threadEntryIndexCount; + + [ThreadStatic] + static int threadId; /// /// Global current epoch value @@ -65,25 +73,28 @@ public unsafe class LightEpoch public int SafeToReclaimEpoch; /// - /// Instantiate the epoch table + /// Static constructor to setup shared cache-aligned space + /// to store per-entry count of instances using that entry /// - /// - public LightEpoch(int size = kTableSize) + static LightEpoch() { - Initialize(size); + // Over-allocate to do cache-line alignment + threadIndex = new Entry[kTableSize + 2]; + threadIndexHandle = GCHandle.Alloc(threadIndex, GCHandleType.Pinned); + long p = (long)threadIndexHandle.AddrOfPinnedObject(); + + // Force the pointer to align to 64-byte boundaries + long p2 = (p + (Constants.kCacheLineBytes - 1)) & ~(Constants.kCacheLineBytes - 1); + threadIndexAligned = (Entry*)p2; } /// - /// Initialize the epoch table + /// Instantiate the epoch table /// - /// - unsafe void Initialize(int size) + public LightEpoch() { - threadEntryIndex = new FastThreadLocal(); - numEntries = size; - // Over-allocate to do cache-line alignment - tableRaw = new Entry[size + 2]; + tableRaw = new Entry[kTableSize + 2]; tableHandle = GCHandle.Alloc(tableRaw, GCHandleType.Pinned); long p = (long)tableHandle.AddrOfPinnedObject(); @@ -107,12 +118,8 @@ public void Dispose() tableHandle.Free(); tableAligned = null; tableRaw = null; - - numEntries = 0; CurrentEpoch = 1; SafeToReclaimEpoch = 0; - - threadEntryIndex.Dispose(); } /// @@ -121,7 +128,7 @@ public void Dispose() /// Result of the check public bool IsProtected() { - return threadEntryIndex.IsInitializedForThread && kInvalidIndex != threadEntryIndex.Value; + return kInvalidIndex != threadEntryIndex; } /// @@ -131,8 +138,9 @@ public bool IsProtected() [MethodImpl(MethodImplOptions.AggressiveInlining)] public int ProtectAndDrain() { - int entry = threadEntryIndex.Value; + int entry = threadEntryIndex; + (*(tableAligned + entry)).threadId = threadEntryIndex; (*(tableAligned + entry)).localCurrentEpoch = CurrentEpoch; if (drainCount > 0) @@ -173,28 +181,50 @@ private void Drain(int nextEpoch) /// /// Thread acquires its epoch entry /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Acquire() { - threadEntryIndex.InitializeThread(); - threadEntryIndex.Value = ReserveEntryForThread(); + if (threadEntryIndex == kInvalidIndex) + threadEntryIndex = ReserveEntryForThread(); + threadEntryIndexCount++; } /// /// Thread releases its epoch entry /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Release() { - int entry = threadEntryIndex.Value; - if (kInvalidIndex == entry) + int entry = threadEntryIndex; + (*(tableAligned + entry)).localCurrentEpoch = 0; + (*(tableAligned + entry)).threadId = 0; + + threadEntryIndexCount--; + if (threadEntryIndexCount == 0) { - return; + (threadIndexAligned + threadEntryIndex)->threadId = 0; + threadEntryIndex = kInvalidIndex; } + } - threadEntryIndex.Value = kInvalidIndex; - threadEntryIndex.DisposeThread(); - (*(tableAligned + entry)).localCurrentEpoch = 0; - (*(tableAligned + entry)).threadId = 0; + /// + /// Thread suspends its epoch entry + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Suspend() + { + Release(); + } + + /// + /// Thread resumes its epoch entry + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Resume() + { + Acquire(); + ProtectAndDrain(); } /// @@ -253,6 +283,7 @@ public int BumpCurrentEpoch(Action onDrain) if (++i == kDrainListSize) { + ProtectAndDrain(); i = 0; if (++j == 500) { @@ -276,7 +307,7 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch) { int oldestOngoingCall = currentEpoch; - for (int index = 1; index <= numEntries; ++index) + for (int index = 1; index <= kTableSize; ++index) { int entry_epoch = (*(tableAligned + index)).localCurrentEpoch; if (0 != entry_epoch) @@ -301,20 +332,20 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch) /// Start index /// Thread id /// Reserved entry - private int ReserveEntry(int startIndex, int threadId) + private static int ReserveEntry(int startIndex, int threadId) { int current_iteration = 0; for (; ; ) { // Reserve an entry in the table. - for (int i = 0; i < numEntries; ++i) + for (int i = 0; i < kTableSize; ++i) { - int index_to_test = 1 + ((startIndex + i) & (numEntries - 1)); - if (0 == (*(tableAligned + index_to_test)).threadId) + int index_to_test = 1 + ((startIndex + i) & (kTableSize - 1)); + if (0 == (threadIndexAligned + index_to_test)->threadId) { bool success = (0 == Interlocked.CompareExchange( - ref (*(tableAligned + index_to_test)).threadId, + ref (threadIndexAligned+index_to_test)->threadId, threadId, 0)); if (success) @@ -325,7 +356,7 @@ private int ReserveEntry(int startIndex, int threadId) ++current_iteration; } - if (current_iteration > (numEntries * 3)) + if (current_iteration > (kTableSize * 10)) { throw new Exception("Unable to reserve an epoch entry, try increasing the epoch table size (kTableSize)"); } @@ -337,10 +368,13 @@ private int ReserveEntry(int startIndex, int threadId) /// once for a thread. /// /// Reserved entry - private int ReserveEntryForThread() + private static int ReserveEntryForThread() { - // for portability(run on non-windows platform) - int threadId = Environment.OSVersion.Platform == PlatformID.Win32NT ? (int)Native32.GetCurrentThreadId() : Thread.CurrentThread.ManagedThreadId; + if (threadId == 0) // run once per thread for performance + { + // For portability(run on non-windows platform) + threadId = Environment.OSVersion.Platform == PlatformID.Win32NT ? (int)Native32.GetCurrentThreadId() : Thread.CurrentThread.ManagedThreadId; + } int startIndex = Utility.Murmur3(threadId); return ReserveEntry(startIndex, threadId); } @@ -388,7 +422,7 @@ private struct EpochActionPair [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool MarkAndCheckIsComplete(int markerIdx, int version) { - int entry = threadEntryIndex.Value; + int entry = threadEntryIndex; if (kInvalidIndex == entry) { Debug.WriteLine("New Thread entered during CPR"); @@ -398,13 +432,13 @@ public bool MarkAndCheckIsComplete(int markerIdx, int version) (*(tableAligned + entry)).markers[markerIdx] = version; // check if all threads have reported complete - for (int index = 1; index <= numEntries; ++index) + for (int index = 1; index <= kTableSize; ++index) { int entry_epoch = (*(tableAligned + index)).localCurrentEpoch; int fc_version = (*(tableAligned + index)).markers[markerIdx]; if (0 != entry_epoch) { - if (fc_version != version) + if (fc_version != version && entry_epoch < int.MaxValue) { return false; } @@ -413,4 +447,4 @@ public bool MarkAndCheckIsComplete(int markerIdx, int version) return true; } } -} +} \ No newline at end of file diff --git a/cs/src/core/FASTER.core.csproj b/cs/src/core/FASTER.core.csproj index 1d68a5425..9d49b9163 100644 --- a/cs/src/core/FASTER.core.csproj +++ b/cs/src/core/FASTER.core.csproj @@ -3,6 +3,7 @@ netstandard2.0;net46 AnyCPU;x64 + preview @@ -35,6 +36,15 @@ - + + + + + + + + + 4.0.0 + \ No newline at end of file diff --git a/cs/src/core/FASTER.core.debug.nuspec b/cs/src/core/FASTER.core.debug.nuspec index 3753d4d5d..48728a1b1 100644 --- a/cs/src/core/FASTER.core.debug.nuspec +++ b/cs/src/core/FASTER.core.debug.nuspec @@ -6,20 +6,26 @@ FASTER (Debug) Microsoft Microsoft - https://github.com/Microsoft/FASTER + https://github.com/microsoft/FASTER MIT true - Debug version. FASTER is a fast concurrent key-value store that also supports indexing of larger-than-memory data - See the project website at https://github.com/Microsoft/FASTER for more details + Debug version. FASTER is a fast concurrent key-value store and log for larger-than-memory data. + See the project website at https://github.com/microsoft/FASTER for more details. © Microsoft Corporation. All rights reserved. en-US - key-value store dictionary hashtable concurrent log persistent + key-value store dictionary hashtable concurrent log persistent commit write-ahead - + + + + - + + + + diff --git a/cs/src/core/FASTER.core.nuspec b/cs/src/core/FASTER.core.nuspec index c2d29ea49..294cb75a2 100644 --- a/cs/src/core/FASTER.core.nuspec +++ b/cs/src/core/FASTER.core.nuspec @@ -6,20 +6,26 @@ FASTER Microsoft Microsoft - https://github.com/Microsoft/FASTER + https://github.com/microsoft/FASTER MIT true - FASTER is a fast concurrent key-value store that also supports indexing of larger-than-memory data - See the project website at https://github.com/Microsoft/FASTER for more details + FASTER is a fast concurrent key-value store and log for larger-than-memory data. + See the project website at https://github.com/microsoft/FASTER for more details. © Microsoft Corporation. All rights reserved. en-US - key-value store dictionary hashtable concurrent log persistent + key-value store dictionary hashtable concurrent log persistent commit write-ahead - + + + + - + + + + diff --git a/cs/src/core/Index/Common/LogSettings.cs b/cs/src/core/Index/Common/LogSettings.cs index c0943dfe8..f6b59e337 100644 --- a/cs/src/core/Index/Common/LogSettings.cs +++ b/cs/src/core/Index/Common/LogSettings.cs @@ -176,8 +176,10 @@ public class ReadCacheSettings public int MemorySizeBits = 34; /// - /// Fraction of log used for second chance copy to tail + /// Fraction of log head (in memory) used for second chance + /// copy to tail. This is (1 - MutableFraction) for the + /// underlying log /// - public double SecondChanceFraction = 0.9; + public double SecondChanceFraction = 0.1; } } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 32c7a5a5d..e222a3585 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -135,7 +135,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits, MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, - MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction + MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction }, variableLengthStructSettings, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); @@ -153,7 +153,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits, MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, - MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction + MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction }, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); @@ -174,7 +174,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits, MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits, - MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction + MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction }, serializerSettings, this.comparer, ReadCacheEvict, epoch); readcache.Initialize(); ReadCache = new LogAccessor(this, readcache); diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 4311777ce..52a025375 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -1887,52 +1887,20 @@ private void HeavyEnter(long hash) [MethodImpl(MethodImplOptions.AggressiveInlining)] private void BlockAllocate(int recordSize, out long logicalAddress) { - logicalAddress = hlog.Allocate(recordSize); - if (logicalAddress >= 0) return; - - while (logicalAddress < 0 && -logicalAddress >= hlog.ReadOnlyAddress) + while ((logicalAddress = hlog.TryAllocate(recordSize)) == 0) { InternalRefresh(); - hlog.CheckForAllocateComplete(ref logicalAddress); - if (logicalAddress < 0) - { - Thread.Sleep(10); - } - } - - logicalAddress = logicalAddress < 0 ? -logicalAddress : logicalAddress; - - if (logicalAddress < hlog.ReadOnlyAddress) - { - Debug.WriteLine("Allocated address is read-only, retrying"); - BlockAllocate(recordSize, out logicalAddress); + Thread.Yield(); } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] private void BlockAllocateReadCache(int recordSize, out long logicalAddress) { - logicalAddress = readcache.Allocate(recordSize); - if (logicalAddress >= 0) - return; - - while (logicalAddress < 0 && -logicalAddress >= readcache.ReadOnlyAddress) + while ((logicalAddress = readcache.TryAllocate(recordSize)) == 0) { InternalRefresh(); - readcache.CheckForAllocateComplete(ref logicalAddress); - if (logicalAddress < 0) - { - Thread.Sleep(10); - } - } - - logicalAddress = logicalAddress < 0 ? -logicalAddress : logicalAddress; - - if (logicalAddress < readcache.ReadOnlyAddress) - { - Debug.WriteLine("Allocated address is read-only, retrying"); - BlockAllocateReadCache(recordSize, out logicalAddress); + Thread.Yield(); } } @@ -2323,29 +2291,32 @@ private void ReadCacheEvict(long fromHeadAddress, long toHeadAddress) { physicalAddress = readcache.GetPhysicalAddress(logicalAddress); var recordSize = readcache.GetRecordSize(physicalAddress); - ref Key key = ref readcache.GetKey(physicalAddress); ref RecordInfo info = ref readcache.GetInfo(physicalAddress); - entry.word = info.PreviousAddress; - if (!entry.ReadCache) + if (!info.Invalid) { - var hash = comparer.GetHashCode64(ref key); - var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - - entry = default(HashBucketEntry); - var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); - while (tagExists && entry.ReadCache) + ref Key key = ref readcache.GetKey(physicalAddress); + entry.word = info.PreviousAddress; + if (!entry.ReadCache) { - var updatedEntry = default(HashBucketEntry); - updatedEntry.Tag = tag; - updatedEntry.Address = info.PreviousAddress; - updatedEntry.Pending = entry.Pending; - updatedEntry.Tentative = false; - - if (entry.word == Interlocked.CompareExchange - (ref bucket->bucket_entries[slot], updatedEntry.word, entry.word)) - break; + var hash = comparer.GetHashCode64(ref key); + var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); + + entry = default(HashBucketEntry); + var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); + while (tagExists && entry.ReadCache) + { + var updatedEntry = default(HashBucketEntry); + updatedEntry.Tag = tag; + updatedEntry.Address = info.PreviousAddress; + updatedEntry.Pending = entry.Pending; + updatedEntry.Tentative = false; - tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); + if (entry.word == Interlocked.CompareExchange + (ref bucket->bucket_entries[slot], updatedEntry.word, entry.word)) + break; + + tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); + } } } logicalAddress += recordSize; diff --git a/cs/src/core/Index/FasterLog/CommitFailureException.cs b/cs/src/core/Index/FasterLog/CommitFailureException.cs new file mode 100644 index 000000000..c6374806f --- /dev/null +++ b/cs/src/core/Index/FasterLog/CommitFailureException.cs @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma warning disable 0162 + +using System; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// Exception thrown when commit fails + /// + public class CommitFailureException : Exception + { + /// + /// Commit info and next commit task in chain + /// + public LinkedCommitInfo LinkedCommitInfo { get; private set; } + + internal CommitFailureException(LinkedCommitInfo linkedCommitInfo, string message) + : base(message) + => LinkedCommitInfo = linkedCommitInfo; + } +} diff --git a/cs/src/core/Index/FasterLog/CommitInfo.cs b/cs/src/core/Index/FasterLog/CommitInfo.cs new file mode 100644 index 000000000..70401edcc --- /dev/null +++ b/cs/src/core/Index/FasterLog/CommitInfo.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma warning disable 0162 + +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// Info contained in task associated with commit + /// + public struct CommitInfo + { + /// + /// Begin address + /// + public long BeginAddress; + + /// + /// From address of commit range + /// + public long FromAddress; + + /// + /// Until address of commit range + /// + public long UntilAddress; + + /// + /// Error code (0 = success) + /// + public uint ErrorCode; + } + + /// + /// Linked list (chain) of commit info + /// + public struct LinkedCommitInfo + { + /// + /// Commit info + /// + public CommitInfo CommitInfo; + + /// + /// Next task in commit chain + /// + public Task NextTask; + } +} diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs new file mode 100644 index 000000000..3a46cfb86 --- /dev/null +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -0,0 +1,941 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma warning disable 0162 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// FASTER log + /// + public class FasterLog : IDisposable + { + private readonly BlittableAllocator allocator; + private readonly LightEpoch epoch; + private readonly ILogCommitManager logCommitManager; + private readonly GetMemory getMemory; + private readonly int headerSize; + private readonly LogChecksumType logChecksum; + private readonly Dictionary RecoveredIterators; + private TaskCompletionSource commitTcs + = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + /// + /// Beginning address of log + /// + public long BeginAddress => allocator.BeginAddress; + + /// + /// Tail address of log + /// + public long TailAddress => allocator.GetTailAddress(); + + /// + /// Log flushed until address + /// + public long FlushedUntilAddress => allocator.FlushedUntilAddress; + + /// + /// Log committed until address + /// + public long CommittedUntilAddress; + + /// + /// Log committed begin address + /// + public long CommittedBeginAddress; + + /// + /// Task notifying commit completions + /// + internal Task CommitTask => commitTcs.Task; + + /// + /// Create new log instance + /// + /// + public FasterLog(FasterLogSettings logSettings) + { + logCommitManager = logSettings.LogCommitManager ?? + new LocalLogCommitManager(logSettings.LogCommitFile ?? + logSettings.LogDevice.FileName + ".commit"); + + // Reserve 8 byte checksum in header if requested + logChecksum = logSettings.LogChecksum; + headerSize = logChecksum == LogChecksumType.PerEntry ? 12 : 4; + getMemory = logSettings.GetMemory; + epoch = new LightEpoch(); + CommittedUntilAddress = Constants.kFirstValidAddress; + CommittedBeginAddress = Constants.kFirstValidAddress; + + allocator = new BlittableAllocator( + logSettings.GetLogSettings(), null, + null, epoch, CommitCallback); + allocator.Initialize(); + Restore(out RecoveredIterators); + } + + /// + /// Dispose + /// + public void Dispose() + { + allocator.Dispose(); + epoch.Dispose(); + commitTcs.TrySetException(new ObjectDisposedException("Log has been disposed")); + } + + #region Enqueue + /// + /// Enqueue entry to log (in memory) - no guarantee of flush/commit + /// + /// Entry to be enqueued to log + /// Logical address of added entry + public long Enqueue(byte[] entry) + { + long logicalAddress; + while (!TryEnqueue(entry, out logicalAddress)) ; + return logicalAddress; + } + + /// + /// Enqueue entry to log (in memory) - no guarantee of flush/commit + /// + /// Entry to be enqueued to log + /// Logical address of added entry + public long Enqueue(ReadOnlySpan entry) + { + long logicalAddress; + while (!TryEnqueue(entry, out logicalAddress)) ; + return logicalAddress; + } + + /// + /// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit + /// + /// Batch of entries to be enqueued to log + /// Logical address of added entry + public long Enqueue(IReadOnlySpanBatch readOnlySpanBatch) + { + long logicalAddress; + while (!TryEnqueue(readOnlySpanBatch, out logicalAddress)) ; + return logicalAddress; + } + #endregion + + #region TryEnqueue + /// + /// Try to enqueue entry to log (in memory). If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// Entry to be enqueued to log + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(byte[] entry, out long logicalAddress) + { + logicalAddress = 0; + + epoch.Resume(); + + var length = entry.Length; + logicalAddress = allocator.TryAllocate(headerSize + Align(length)); + if (logicalAddress == 0) + { + epoch.Suspend(); + return false; + } + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + fixed (byte* bp = entry) + Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length); + SetHeader(length, (byte*)physicalAddress); + epoch.Suspend(); + return true; + } + + /// + /// Try to append entry to log. If it returns true, we are + /// done. If it returns false, we need to retry. + /// + /// Entry to be appended to log + /// Logical address of added entry + /// Whether the append succeeded + public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) + { + logicalAddress = 0; + + epoch.Resume(); + + var length = entry.Length; + logicalAddress = allocator.TryAllocate(headerSize + Align(length)); + if (logicalAddress == 0) + { + epoch.Suspend(); + return false; + } + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + fixed (byte* bp = &entry.GetPinnableReference()) + Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), length, length); + SetHeader(length, (byte*)physicalAddress); + epoch.Suspend(); + return true; + } + + /// + /// Try to enqueue batch of entries as a single atomic unit (to memory). Entire + /// batch needs to fit on one log page. + /// + /// Batch to be appended to log + /// Logical address of first added entry + /// Whether the append succeeded + public bool TryEnqueue(IReadOnlySpanBatch readOnlySpanBatch, out long logicalAddress) + { + return TryAppend(readOnlySpanBatch, out logicalAddress, out _); + } + #endregion + + #region EnqueueAsync + /// + /// Enqueue entry to log in memory (async) - completes after entry is + /// appended to memory, NOT committed to storage. + /// + /// + /// + public async ValueTask EnqueueAsync(byte[] entry) + { + long logicalAddress; + + while (true) + { + var task = CommitTask; + if (TryEnqueue(entry, out logicalAddress)) + break; + if (NeedToWait(CommittedUntilAddress, TailAddress)) + { + // Wait for *some* commit - failure can be ignored + try + { + await task; + } + catch { } + } + } + + return logicalAddress; + } + + /// + /// Enqueue entry to log in memory (async) - completes after entry is + /// appended to memory, NOT committed to storage. + /// + /// + /// + public async ValueTask EnqueueAsync(ReadOnlyMemory entry) + { + long logicalAddress; + + while (true) + { + var task = CommitTask; + if (TryEnqueue(entry.Span, out logicalAddress)) + break; + if (NeedToWait(CommittedUntilAddress, TailAddress)) + { + // Wait for *some* commit - failure can be ignored + try + { + await task; + } + catch { } + } + } + + return logicalAddress; + } + + /// + /// Enqueue batch of entries to log in memory (async) - completes after entry is + /// appended to memory, NOT committed to storage. + /// + /// + /// + public async ValueTask EnqueueAsync(IReadOnlySpanBatch readOnlySpanBatch) + { + long logicalAddress; + + while (true) + { + var task = CommitTask; + if (TryEnqueue(readOnlySpanBatch, out logicalAddress)) + break; + if (NeedToWait(CommittedUntilAddress, TailAddress)) + { + // Wait for *some* commit - failure can be ignored + try + { + await task; + } + catch { } + } + } + + return logicalAddress; + } + #endregion + + #region WaitForCommit and WaitForCommitAsync + + /// + /// Spin-wait for enqueues, until tail or specified address, to commit to + /// storage. Does NOT itself issue a commit, just waits for commit. So you should + /// ensure that someone else causes the commit to happen. + /// + /// Address until which we should wait for commit, default 0 for tail of log + /// + public void WaitForCommit(long untilAddress = 0) + { + var tailAddress = untilAddress; + if (tailAddress == 0) tailAddress = allocator.GetTailAddress(); + + while (CommittedUntilAddress < tailAddress) ; + } + + /// + /// Wait for appends (in memory), until tail or specified address, to commit to + /// storage. Does NOT itself issue a commit, just waits for commit. So you should + /// ensure that someone else causes the commit to happen. + /// + /// Address until which we should wait for commit, default 0 for tail of log + /// + public async ValueTask WaitForCommitAsync(long untilAddress = 0) + { + var task = CommitTask; + var tailAddress = untilAddress; + if (tailAddress == 0) tailAddress = allocator.GetTailAddress(); + + while (true) + { + var linkedCommitInfo = await task; + if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress) + task = linkedCommitInfo.NextTask; + else + break; + } + } + #endregion + + #region Commit + + /// + /// Issue commit request for log (until tail) + /// + /// If true, spin-wait until commit completes. Otherwise, issue commit and return immediately. + /// + public void Commit(bool spinWait = false) + { + CommitInternal(spinWait); + } + + /// + /// Async commit log (until tail), completes only when we + /// complete the commit. Throws exception if this or any + /// ongoing commit fails. + /// + /// + public async ValueTask CommitAsync() + { + var task = CommitTask; + var tailAddress = CommitInternal(); + + while (true) + { + var linkedCommitInfo = await task; + if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress) + task = linkedCommitInfo.NextTask; + else + break; + } + } + + /// + /// Async commit log (until tail), completes only when we + /// complete the commit. Throws exception if any commit + /// from prevCommitTask to current fails. + /// + /// + public async ValueTask> CommitAsync(Task prevCommitTask) + { + if (prevCommitTask == null) prevCommitTask = commitTcs.Task; + var tailAddress = CommitInternal(); + + while (true) + { + var linkedCommitInfo = await prevCommitTask; + if (linkedCommitInfo.CommitInfo.UntilAddress < tailAddress) + prevCommitTask = linkedCommitInfo.NextTask; + else + return linkedCommitInfo.NextTask; + } + } + + #endregion + + #region EnqueueAndWaitForCommit + + /// + /// Append entry to log - spin-waits until entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// + /// + public long EnqueueAndWaitForCommit(byte[] entry) + { + long logicalAddress; + while (!TryEnqueue(entry, out logicalAddress)) ; + while (CommittedUntilAddress < logicalAddress + 1) ; + return logicalAddress; + } + + /// + /// Append entry to log - spin-waits until entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// + /// + public long EnqueueAndWaitForCommit(ReadOnlySpan entry) + { + long logicalAddress; + while (!TryEnqueue(entry, out logicalAddress)) ; + while (CommittedUntilAddress < logicalAddress + 1) ; + return logicalAddress; + } + + /// + /// Append batch of entries to log - spin-waits until entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// + /// + public long EnqueueAndWaitForCommit(IReadOnlySpanBatch readOnlySpanBatch) + { + long logicalAddress; + while (!TryEnqueue(readOnlySpanBatch, out logicalAddress)) ; + while (CommittedUntilAddress < logicalAddress + 1) ; + return logicalAddress; + } + + #endregion + + #region EnqueueAndWaitForCommitAsync + + /// + /// Append entry to log (async) - completes after entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// + /// + public async ValueTask EnqueueAndWaitForCommitAsync(byte[] entry) + { + long logicalAddress; + Task task; + + // Phase 1: wait for commit to memory + while (true) + { + task = CommitTask; + if (TryEnqueue(entry, out logicalAddress)) + break; + if (NeedToWait(CommittedUntilAddress, TailAddress)) + { + // Wait for *some* commit - failure can be ignored + try + { + await task; + } + catch { } + } + } + + // Phase 2: wait for commit/flush to storage + while (true) + { + LinkedCommitInfo linkedCommitInfo; + try + { + linkedCommitInfo = await task; + } + catch (CommitFailureException e) + { + linkedCommitInfo = e.LinkedCommitInfo; + if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) + throw e; + } + if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) + task = linkedCommitInfo.NextTask; + else + break; + } + + return logicalAddress; + } + + /// + /// Append entry to log (async) - completes after entry is committed to storage. + /// Does NOT itself issue flush! + /// + /// + /// + public async ValueTask EnqueueAndWaitForCommitAsync(ReadOnlyMemory entry) + { + long logicalAddress; + Task task; + + // Phase 1: wait for commit to memory + while (true) + { + task = CommitTask; + if (TryEnqueue(entry.Span, out logicalAddress)) + break; + if (NeedToWait(CommittedUntilAddress, TailAddress)) + { + // Wait for *some* commit - failure can be ignored + try + { + await task; + } + catch { } + } + } + + // Phase 2: wait for commit/flush to storage + while (true) + { + LinkedCommitInfo linkedCommitInfo; + try + { + linkedCommitInfo = await task; + } + catch (CommitFailureException e) + { + linkedCommitInfo = e.LinkedCommitInfo; + if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) + throw e; + } + if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) + task = linkedCommitInfo.NextTask; + else + break; + } + + return logicalAddress; + } + + /// + /// Append batch of entries to log (async) - completes after batch is committed to storage. + /// Does NOT itself issue flush! + /// + /// + /// + public async ValueTask EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch readOnlySpanBatch) + { + long logicalAddress; + Task task; + + // Phase 1: wait for commit to memory + while (true) + { + task = CommitTask; + if (TryEnqueue(readOnlySpanBatch, out logicalAddress)) + break; + if (NeedToWait(CommittedUntilAddress, TailAddress)) + { + // Wait for *some* commit - failure can be ignored + try + { + await task; + } + catch { } + } + } + + // Phase 2: wait for commit/flush to storage + while (true) + { + LinkedCommitInfo linkedCommitInfo; + try + { + linkedCommitInfo = await task; + } + catch (CommitFailureException e) + { + linkedCommitInfo = e.LinkedCommitInfo; + if (logicalAddress >= linkedCommitInfo.CommitInfo.FromAddress && logicalAddress < linkedCommitInfo.CommitInfo.UntilAddress) + throw e; + } + if (linkedCommitInfo.CommitInfo.UntilAddress < logicalAddress + 1) + task = linkedCommitInfo.NextTask; + else + break; + } + + return logicalAddress; + } + #endregion + + /// + /// Truncate the log until, but not including, untilAddress + /// + /// + public void TruncateUntil(long untilAddress) + { + allocator.ShiftBeginAddress(untilAddress); + } + + /// + /// Pull-based iterator interface for scanning FASTER log + /// + /// Begin address for scan. + /// End address for scan (or long.MaxValue for tailing). + /// Name of iterator, if we need to persist/recover it (default null - do not persist). + /// Whether to recover named iterator from latest commit (if exists). If false, iterator starts from beginAddress. + /// Use single or double buffering + /// + public FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = null, bool recover = true, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering) + { + FasterLogScanIterator iter; + if (recover && name != null && RecoveredIterators != null && RecoveredIterators.ContainsKey(name)) + iter = new FasterLogScanIterator(this, allocator, RecoveredIterators[name], endAddress, getMemory, scanBufferingMode, epoch, headerSize, name); + else + iter = new FasterLogScanIterator(this, allocator, beginAddress, endAddress, getMemory, scanBufferingMode, epoch, headerSize, name); + + if (name != null) + { + if (name.Length > 20) + throw new Exception("Max length of iterator name is 20 characters"); + if (FasterLogScanIterator.PersistedIterators.ContainsKey(name)) + Debug.WriteLine("Iterator name exists, overwriting"); + FasterLogScanIterator.PersistedIterators[name] = iter; + } + + return iter; + } + + /// + /// Random read record from log, at given address + /// + /// Logical address to read from + /// Estimated length of entry, if known + /// + public async ValueTask<(byte[], int)> ReadAsync(long address, int estimatedLength = 0) + { + epoch.Resume(); + if (address >= CommittedUntilAddress || address < BeginAddress) + { + epoch.Suspend(); + return default; + } + var ctx = new SimpleReadContext + { + logicalAddress = address, + completedRead = new SemaphoreSlim(0) + }; + unsafe + { + allocator.AsyncReadRecordToMemory(address, headerSize + estimatedLength, AsyncGetFromDiskCallback, ref ctx); + } + epoch.Suspend(); + await ctx.completedRead.WaitAsync(); + return GetRecordAndFree(ctx.record); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private int Align(int length) + { + return (length + 3) & ~3; + } + + /// + /// Commit log + /// + private void CommitCallback(CommitInfo commitInfo) + { + TaskCompletionSource _commitTcs = default; + + // We can only allow serial monotonic synchronous commit + lock (this) + { + if (CommittedBeginAddress > commitInfo.BeginAddress) + commitInfo.BeginAddress = CommittedBeginAddress; + if (CommittedUntilAddress > commitInfo.FromAddress) + commitInfo.FromAddress = CommittedUntilAddress; + if (CommittedUntilAddress > commitInfo.UntilAddress) + commitInfo.UntilAddress = CommittedUntilAddress; + + FasterLogRecoveryInfo info = new FasterLogRecoveryInfo + { + BeginAddress = commitInfo.BeginAddress, + FlushedUntilAddress = commitInfo.UntilAddress + }; + info.PopulateIterators(); + + logCommitManager.Commit(info.BeginAddress, info.FlushedUntilAddress, info.ToByteArray()); + CommittedBeginAddress = info.BeginAddress; + CommittedUntilAddress = info.FlushedUntilAddress; + + _commitTcs = commitTcs; + // If task is not faulted, create new task + // If task is faulted due to commit exception, create new task + if (commitTcs.Task.Status != TaskStatus.Faulted || commitTcs.Task.Exception.InnerException as CommitFailureException != null) + { + commitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + } + var lci = new LinkedCommitInfo + { + CommitInfo = commitInfo, + NextTask = commitTcs.Task + }; + + if (commitInfo.ErrorCode == 0) + _commitTcs?.TrySetResult(lci); + else + _commitTcs.TrySetException(new CommitFailureException(lci, $"Commit of address range [{commitInfo.FromAddress}-{commitInfo.UntilAddress}] failed with error code {commitInfo.ErrorCode}")); + } + + /// + /// Restore log + /// + private void Restore(out Dictionary recoveredIterators) + { + recoveredIterators = null; + FasterLogRecoveryInfo info = new FasterLogRecoveryInfo(); + var commitInfo = logCommitManager.GetCommitMetadata(); + + if (commitInfo == null) return; + + using (var r = new BinaryReader(new MemoryStream(commitInfo))) + { + info.Initialize(r); + } + + var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress); + if (headAddress == 0) headAddress = Constants.kFirstValidAddress; + + recoveredIterators = info.Iterators; + + allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress); + CommittedUntilAddress = info.FlushedUntilAddress; + CommittedBeginAddress = info.BeginAddress; + } + + /// + /// Try to append batch of entries as a single atomic unit. Entire batch + /// needs to fit on one page. + /// + /// Batch to be appended to log + /// Logical address of first added entry + /// Actual allocated length + /// Whether the append succeeded + private unsafe bool TryAppend(IReadOnlySpanBatch readOnlySpanBatch, out long logicalAddress, out int allocatedLength) + { + logicalAddress = 0; + + int totalEntries = readOnlySpanBatch.TotalEntries(); + allocatedLength = 0; + for (int i = 0; i < totalEntries; i++) + { + allocatedLength += Align(readOnlySpanBatch.Get(i).Length) + headerSize; + } + + epoch.Resume(); + + logicalAddress = allocator.TryAllocate(allocatedLength); + if (logicalAddress == 0) + { + epoch.Suspend(); + return false; + } + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + for (int i = 0; i < totalEntries; i++) + { + var span = readOnlySpanBatch.Get(i); + var entryLength = span.Length; + fixed (byte* bp = &span.GetPinnableReference()) + Buffer.MemoryCopy(bp, (void*)(headerSize + physicalAddress), entryLength, entryLength); + SetHeader(entryLength, (byte*)physicalAddress); + physicalAddress += Align(entryLength) + headerSize; + } + + epoch.Suspend(); + return true; + } + + private unsafe void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap) + { + var ctx = (SimpleReadContext)Overlapped.Unpack(overlap).AsyncResult; + + if (errorCode != 0) + { + Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); + ctx.record.Return(); + ctx.record = null; + ctx.completedRead.Release(); + } + else + { + var record = ctx.record.GetValidPointer(); + var length = GetLength(record); + + if (length < 0 || length > allocator.PageSize) + { + Debug.WriteLine("Invalid record length found: " + length); + ctx.record.Return(); + ctx.record = null; + ctx.completedRead.Release(); + } + else + { + int requiredBytes = headerSize + length; + if (ctx.record.available_bytes >= requiredBytes) + { + ctx.completedRead.Release(); + } + else + { + ctx.record.Return(); + allocator.AsyncReadRecordToMemory(ctx.logicalAddress, requiredBytes, AsyncGetFromDiskCallback, ref ctx); + } + } + } + Overlapped.Free(overlap); + } + + private (byte[], int) GetRecordAndFree(SectorAlignedMemory record) + { + if (record == null) + return (null, 0); + + byte[] result; + int length; + unsafe + { + var ptr = record.GetValidPointer(); + length = GetLength(ptr); + if (!VerifyChecksum(ptr, length)) + { + throw new Exception("Checksum failed for read"); + } + result = getMemory != null ? getMemory(length) : new byte[length]; + fixed (byte* bp = result) + { + Buffer.MemoryCopy(ptr + headerSize, bp, length, length); + } + } + record.Return(); + return (result, length); + } + + private long CommitInternal(bool spinWait = false) + { + epoch.Resume(); + if (allocator.ShiftReadOnlyToTail(out long tailAddress)) + { + if (spinWait) + { + while (CommittedUntilAddress < tailAddress) + { + epoch.ProtectAndDrain(); + Thread.Yield(); + } + } + epoch.Suspend(); + } + else + { + // May need to commit begin address and/or iterators + epoch.Suspend(); + var beginAddress = allocator.BeginAddress; + if (beginAddress > CommittedBeginAddress || FasterLogScanIterator.PersistedIterators.Count > 0) + CommitCallback(new CommitInfo { BeginAddress = beginAddress, + FromAddress = CommittedUntilAddress, + UntilAddress = CommittedUntilAddress, + ErrorCode = 0 }); + } + + return tailAddress; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal unsafe int GetLength(byte* ptr) + { + if (logChecksum == LogChecksumType.None) + return *(int*)ptr; + else if (logChecksum == LogChecksumType.PerEntry) + return *(int*)(ptr + 8); + return 0; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal unsafe bool VerifyChecksum(byte* ptr, int length) + { + if (logChecksum == LogChecksumType.PerEntry) + { + var cs = Utility.XorBytes(ptr + 8, length + 4); + if (cs != *(ulong*)ptr) + { + return false; + } + } + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal unsafe ulong GetChecksum(byte* ptr) + { + if (logChecksum == LogChecksumType.PerEntry) + { + return *(ulong*)ptr; + } + return 0; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private unsafe void SetHeader(int length, byte* dest) + { + if (logChecksum == LogChecksumType.None) + { + *(int*)dest = length; + return; + } + else if (logChecksum == LogChecksumType.PerEntry) + { + *(int*)(dest + 8) = length; + *(ulong*)dest = Utility.XorBytes(dest + 8, length + 4); + } + } + + /// + /// Do we need to await a commit to make forward progress? + /// + /// + /// + /// + private bool NeedToWait(long committedUntilAddress, long tailAddress) + { + Thread.Yield(); + return + allocator.GetPage(committedUntilAddress) <= + (allocator.GetPage(tailAddress) - allocator.BufferSize); + } + } +} diff --git a/cs/src/core/Index/FasterLog/FasterLogIterator.cs b/cs/src/core/Index/FasterLog/FasterLogIterator.cs new file mode 100644 index 000000000..385823a17 --- /dev/null +++ b/cs/src/core/Index/FasterLog/FasterLogIterator.cs @@ -0,0 +1,425 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Threading; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; +using System.Buffers; +using System.Collections.Generic; +using System.Collections.Concurrent; + +namespace FASTER.core +{ + /// + /// Scan iterator for hybrid log + /// + public class FasterLogScanIterator : IDisposable + { + private readonly int frameSize; + private readonly string name; + private readonly FasterLog fasterLog; + private readonly BlittableAllocator allocator; + private readonly long endAddress; + private readonly BlittableFrame frame; + private readonly CountdownEvent[] loaded; + private readonly CancellationTokenSource[] loadedCancel; + private readonly long[] loadedPage; + private readonly LightEpoch epoch; + private readonly GetMemory getMemory; + private readonly int headerSize; + private long currentAddress, nextAddress; + + /// + /// Current address + /// + public long CurrentAddress => currentAddress; + + /// + /// Next address + /// + public long NextAddress => nextAddress; + + internal static readonly ConcurrentDictionary PersistedIterators + = new ConcurrentDictionary(); + + /// + /// Constructor + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator hlog, long beginAddress, long endAddress, GetMemory getMemory, ScanBufferingMode scanBufferingMode, LightEpoch epoch, int headerSize, string name) + { + this.fasterLog = fasterLog; + this.allocator = hlog; + this.getMemory = getMemory; + this.epoch = epoch; + this.headerSize = headerSize; + + if (beginAddress == 0) + beginAddress = hlog.GetFirstValidLogicalAddress(0); + + this.name = name; + this.endAddress = endAddress; + currentAddress = beginAddress; + nextAddress = beginAddress; + + if (scanBufferingMode == ScanBufferingMode.SinglePageBuffering) + frameSize = 1; + else if (scanBufferingMode == ScanBufferingMode.DoublePageBuffering) + frameSize = 2; + else if (scanBufferingMode == ScanBufferingMode.NoBuffering) + { + frameSize = 0; + return; + } + + frame = new BlittableFrame(frameSize, hlog.PageSize, hlog.GetDeviceSectorSize()); + loaded = new CountdownEvent[frameSize]; + loadedCancel = new CancellationTokenSource[frameSize]; + loadedPage = new long[frameSize]; + for (int i = 0; i < frameSize; i++) + { + loadedPage[i] = -1; + loadedCancel[i] = new CancellationTokenSource(); + } + } + +#if DOTNETCORE + /// + /// Async enumerable for iterator + /// + /// Entry and entry length + public async IAsyncEnumerable<(byte[], int)> GetAsyncEnumerable() + { + while (true) + { + byte[] result; + int length; + while (!GetNext(out result, out length)) + { + if (currentAddress >= endAddress) + yield break; + await WaitAsync(); + } + yield return (result, length); + } + } + + /// + /// Async enumerable for iterator (memory pool based version) + /// + /// Entry and entry length + public async IAsyncEnumerable<(IMemoryOwner, int)> GetAsyncEnumerable(MemoryPool pool) + { + while (true) + { + IMemoryOwner result; + int length; + while (!GetNext(pool, out result, out length)) + { + if (currentAddress >= endAddress) + yield break; + await WaitAsync(); + } + yield return (result, length); + } + } +#endif + + /// + /// Wait for iteration to be ready to continue + /// + /// + public async ValueTask WaitAsync() + { + while (true) + { + var commitTask = fasterLog.CommitTask; + if (nextAddress >= fasterLog.CommittedUntilAddress) + { + // Ignore commit exceptions + try + { + await commitTask; + } + catch { } + } + else + break; + } + } + + /// + /// Get next record in iterator + /// + /// Copy of entry, if found + /// Actual length of entry + /// + public unsafe bool GetNext(out byte[] entry, out int entryLength) + { + if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken)) + { + if (getMemory != null) + { + // Use user delegate to allocate memory + entry = getMemory(entryLength); + if (entry.Length < entryLength) + throw new Exception("Byte array provided has invalid length"); + } + else + { + // We allocate a byte array from heap + entry = new byte[entryLength]; + } + + fixed (byte* bp = entry) + Buffer.MemoryCopy((void*)(headerSize + physicalAddress), bp, entryLength, entryLength); + + if (epochTaken) + epoch.Suspend(); + + return true; + } + entry = default; + return false; + } + + /// + /// GetNext supporting memory pools + /// + /// + /// + /// + /// + public unsafe bool GetNext(MemoryPool pool, out IMemoryOwner entry, out int entryLength) + { + if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken)) + { + entry = pool.Rent(entryLength); + + fixed (byte* bp = &entry.Memory.Span.GetPinnableReference()) + Buffer.MemoryCopy((void*)(headerSize + physicalAddress), bp, entryLength, entryLength); + + if (epochTaken) + epoch.Suspend(); + + return true; + } + entry = default; + entryLength = default; + return false; + } + + /// + /// Dispose the iterator + /// + public void Dispose() + { + frame?.Dispose(); + if (name != null) + PersistedIterators.TryRemove(name, out _); + } + + private unsafe void BufferAndLoad(long currentAddress, long currentPage, long currentFrame) + { + if (loadedPage[currentFrame] != currentPage) + { + if (loadedPage[currentFrame] != -1) + { + WaitForFrameLoad(currentFrame); + } + + allocator.AsyncReadPagesFromDeviceToFrame(currentAddress >> allocator.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame], 0, null, null, loadedCancel[currentFrame]); + loadedPage[currentFrame] = currentAddress >> allocator.LogPageSizeBits; + } + + if (frameSize == 2) + { + var nextPage = currentPage + 1; + var nextFrame = (currentFrame + 1) % frameSize; + + if (loadedPage[nextFrame] != nextPage) + { + if (loadedPage[nextFrame] != -1) + { + WaitForFrameLoad(nextFrame); + } + + allocator.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> allocator.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[nextFrame], 0, null, null, loadedCancel[nextFrame]); + loadedPage[nextFrame] = 1 + (currentAddress >> allocator.LogPageSizeBits); + } + } + + WaitForFrameLoad(currentFrame); + } + + private void WaitForFrameLoad(long frame) + { + if (loaded[frame].IsSet) return; + + try + { + loaded[frame].Wait(loadedCancel[frame].Token); // Ensure we have completed ongoing load + } + catch (Exception e) + { + loadedPage[frame] = -1; + loadedCancel[frame] = new CancellationTokenSource(); + nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits; + throw new Exception("Page read from storage failed, skipping page. Inner exception: " + e.ToString()); + } + } + + private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap) + { + var result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult; + + if (errorCode != 0) + { + Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); + result.cts?.Cancel(); + } + + if (result.freeBuffer1 != null) + { + if (errorCode == 0) + allocator.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + result.freeBuffer1.Return(); + result.freeBuffer1 = null; + } + + if (errorCode == 0) + result.handle?.Signal(); + + Interlocked.MemoryBarrier(); + Overlapped.Free(overlap); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private int Align(int length) + { + return (length + 3) & ~3; + } + + /// + /// Retrieve physical address of next iterator value + /// (under epoch protection if it is from main page buffer) + /// + /// + /// + /// + /// + private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out bool epochTaken) + { + physicalAddress = 0; + entryLength = 0; + epochTaken = false; + + currentAddress = nextAddress; + while (true) + { + // Check for boundary conditions + if (currentAddress < allocator.BeginAddress) + { + Debug.WriteLine("Iterator address is less than log BeginAddress " + allocator.BeginAddress + ", adjusting iterator address"); + currentAddress = allocator.BeginAddress; + } + + if ((currentAddress >= endAddress) || (currentAddress >= fasterLog.CommittedUntilAddress)) + { + nextAddress = currentAddress; + return false; + } + + if (frameSize == 0 && currentAddress < allocator.HeadAddress) + { + throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode"); + } + + var currentPage = currentAddress >> allocator.LogPageSizeBits; + var offset = currentAddress & allocator.PageSizeMask; + + var headAddress = allocator.HeadAddress; + + if (currentAddress < headAddress) + { + BufferAndLoad(currentAddress, currentPage, currentPage % frameSize); + physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset); + } + else + { + epoch.Resume(); + headAddress = allocator.HeadAddress; + if (currentAddress < headAddress) // rare case + { + epoch.Suspend(); + continue; + } + + physicalAddress = allocator.GetPhysicalAddress(currentAddress); + } + + // Get and check entry length + entryLength = fasterLog.GetLength((byte*)physicalAddress); + if (entryLength == 0) + { + if (currentAddress >= headAddress) + epoch.Suspend(); + + nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits; + if (0 != fasterLog.GetChecksum((byte*)physicalAddress)) + { + var curPage = currentAddress >> allocator.LogPageSizeBits; + throw new Exception("Invalid checksum found during scan, skipping page " + curPage); + } + else + { + // We are likely at end of page, skip to next + currentAddress = nextAddress; + continue; + } + } + + int recordSize = headerSize + Align(entryLength); + if ((currentAddress & allocator.PageSizeMask) + recordSize > allocator.PageSize) + { + if (currentAddress >= headAddress) + epoch.Suspend(); + nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits; + throw new Exception("Invalid length of record found: " + entryLength + ", skipping page"); + } + + // Verify checksum if needed + if (currentAddress < headAddress) + { + if (!fasterLog.VerifyChecksum((byte*)physicalAddress, entryLength)) + { + var curPage = currentAddress >> allocator.LogPageSizeBits; + nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits; + throw new Exception("Invalid checksum found during scan, skipping page " + curPage); + } + } + + if ((currentAddress & allocator.PageSizeMask) + recordSize == allocator.PageSize) + nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits; + else + nextAddress = currentAddress + recordSize; + + epochTaken = currentAddress >= headAddress; + return true; + } + } + + } +} + + diff --git a/cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs b/cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs new file mode 100644 index 000000000..4dd46d452 --- /dev/null +++ b/cs/src/core/Index/FasterLog/FasterLogRecoveryInfo.cs @@ -0,0 +1,160 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma warning disable 0162 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; + +namespace FASTER.core +{ + /// + /// Recovery info for FASTER Log + /// + internal struct FasterLogRecoveryInfo + { + /// + /// Begin address + /// + public long BeginAddress; + + /// + /// Flushed logical address + /// + public long FlushedUntilAddress; + + /// + /// Persisted iterators + /// + public Dictionary Iterators; + + /// + /// Initialize + /// + public void Initialize() + { + BeginAddress = 0; + FlushedUntilAddress = 0; + } + + /// + /// Initialize from stream + /// + /// + public void Initialize(BinaryReader reader) + { + int version; + long checkSum; + try + { + version = reader.ReadInt32(); + checkSum = reader.ReadInt64(); + BeginAddress = reader.ReadInt64(); + FlushedUntilAddress = reader.ReadInt64(); + } + catch (Exception e) + { + throw new Exception("Unable to recover from previous commit. Inner exception: " + e.ToString()); + } + if (version != 0) + throw new Exception("Invalid version found during commit recovery"); + + if (checkSum != (BeginAddress ^ FlushedUntilAddress)) + throw new Exception("Invalid checksum found during commit recovery"); + + var count = 0; + try + { + count = reader.ReadInt32(); + } + catch { } + + if (count > 0) + { + Iterators = new Dictionary(); + for (int i = 0; i < count; i++) + { + Iterators.Add(reader.ReadString(), reader.ReadInt64()); + } + } + } + + /// + /// Recover info from token + /// + /// + /// + internal void Recover(ILogCommitManager logCommitManager) + { + var metadata = logCommitManager.GetCommitMetadata(); + if (metadata == null) + throw new Exception("Invalid log commit metadata during recovery"); + + Initialize(new BinaryReader(new MemoryStream(metadata))); + } + + /// + /// Reset + /// + public void Reset() + { + Initialize(); + } + + /// + /// Write info to byte array + /// + public byte[] ToByteArray() + { + using (var ms = new MemoryStream()) + { + using (var writer = new BinaryWriter(ms)) + { + writer.Write(0); // version + writer.Write(BeginAddress ^ FlushedUntilAddress); // checksum + writer.Write(BeginAddress); + writer.Write(FlushedUntilAddress); + if (Iterators?.Count > 0) + { + writer.Write(Iterators.Count); + foreach (var kvp in Iterators) + { + writer.Write(kvp.Key); + writer.Write(kvp.Value); + } + } + } + return ms.ToArray(); + } + } + + /// + /// Take snapshot of persisted iterators + /// + public void PopulateIterators() + { + if (FasterLogScanIterator.PersistedIterators.Count > 0) + { + Iterators = new Dictionary(); + + foreach (var kvp in FasterLogScanIterator.PersistedIterators) + { + Iterators.Add(kvp.Key, kvp.Value.CurrentAddress); + } + } + } + + /// + /// Print checkpoint info for debugging purposes + /// + public void DebugPrint() + { + Debug.WriteLine("******** Log Commit Info ********"); + + Debug.WriteLine("BeginAddress: {0}", BeginAddress); + Debug.WriteLine("FlushedUntilAddress: {0}", FlushedUntilAddress); + } + } +} diff --git a/cs/src/core/Index/FasterLog/FasterLogSettings.cs b/cs/src/core/Index/FasterLog/FasterLogSettings.cs new file mode 100644 index 000000000..8f02aade6 --- /dev/null +++ b/cs/src/core/Index/FasterLog/FasterLogSettings.cs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma warning disable 0162 + +using System; +using System.Diagnostics; +using System.IO; + +namespace FASTER.core +{ + /// + /// Delegate for getting memory from user + /// + /// Minimum length of returned byte array + /// + public delegate byte[] GetMemory(int minLength); + + /// + /// Type of checksum to add to log + /// + public enum LogChecksumType + { + /// + /// No checksums + /// + None, + /// + /// Checksum per entry + /// + PerEntry + } + + /// + /// FASTER Log Settings + /// + public class FasterLogSettings + { + /// + /// Device used for log + /// + public IDevice LogDevice = new NullDevice(); + + /// + /// Size of a page, in bits + /// + public int PageSizeBits = 22; + + /// + /// Total size of in-memory part of log, in bits + /// Should be at least one page long + /// Num pages = 2^(MemorySizeBits-PageSizeBits) + /// + public int MemorySizeBits = 23; + + /// + /// Size of a segment (group of pages), in bits + /// This is the granularity of files on disk + /// + public int SegmentSizeBits = 30; + + /// + /// Log commit manager + /// + public ILogCommitManager LogCommitManager = null; + + /// + /// Use specified directory for storing and retrieving checkpoints + /// This is a shortcut to providing the following: + /// FasterLogSettings.LogCommitManager = new LocalLogCommitManager(LogCommitFile) + /// + public string LogCommitFile = null; + + /// + /// User callback to allocate memory for read entries + /// + public GetMemory GetMemory = null; + + /// + /// Type of checksum to add to log + /// + public LogChecksumType LogChecksum = LogChecksumType.None; + + internal LogSettings GetLogSettings() + { + return new LogSettings + { + LogDevice = LogDevice, + PageSizeBits = PageSizeBits, + SegmentSizeBits = SegmentSizeBits, + MemorySizeBits = MemorySizeBits, + CopyReadsToTail = false, + MutableFraction = 0, + ObjectLogDevice = null, + ReadCacheSettings = null + }; + } + } +} diff --git a/cs/src/core/Index/FasterLog/ILogCommitManager.cs b/cs/src/core/Index/FasterLog/ILogCommitManager.cs new file mode 100644 index 000000000..f3282ede4 --- /dev/null +++ b/cs/src/core/Index/FasterLog/ILogCommitManager.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.IO; + +namespace FASTER.core +{ + /// + /// Log commit manager + /// + public interface ILogCommitManager + { + /// + /// Perform (synchronous) commit with specified metadata + /// + /// Committed begin address (for information only, not necessary to persist) + /// Address committed until (for information only, not necessary to persist) + /// Commit metadata - should be persisted + void Commit(long beginAddress, long untilAddress, byte[] commitMetadata); + + /// + /// Return prior commit metadata during recovery + /// + /// + byte[] GetCommitMetadata(); + } +} \ No newline at end of file diff --git a/cs/src/core/Index/FasterLog/IReadOnlySpanBatch.cs b/cs/src/core/Index/FasterLog/IReadOnlySpanBatch.cs new file mode 100644 index 000000000..15f61b13d --- /dev/null +++ b/cs/src/core/Index/FasterLog/IReadOnlySpanBatch.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; + +namespace FASTER.core +{ + /// + /// Interface to provide a batch of ReadOnlySpan[byte] data to FASTER + /// + public interface IReadOnlySpanBatch + { + /// + /// Number of entries in provided batch + /// + /// Number of entries + int TotalEntries(); + + /// + /// Retrieve batch entry at specified index + /// + /// Index + /// + ReadOnlySpan Get(int index); + } +} diff --git a/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs new file mode 100644 index 000000000..f3cdc90a1 --- /dev/null +++ b/cs/src/core/Index/FasterLog/LocalLogCommitManager.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.IO; + +namespace FASTER.core +{ + /// + /// Implementation of checkpoint interface for local file storage + /// + public class LocalLogCommitManager : ILogCommitManager + { + private string CommitFile; + + /// + /// Create new instance of local checkpoint manager at given base directory + /// + /// + public LocalLogCommitManager(string CommitFile) + { + this.CommitFile = CommitFile; + } + + /// + /// Perform (synchronous) commit with specified metadata + /// + /// Committed begin address (for information only, not necessary to persist) + /// Address committed until (for information only, not necessary to persist) + /// Commit metadata + public void Commit(long beginAddress, long untilAddress, byte[] commitMetadata) + { + // Two phase to ensure we write metadata in single Write operation + using (var ms = new MemoryStream()) + { + using (var writer = new BinaryWriter(ms)) + { + writer.Write(commitMetadata.Length); + writer.Write(commitMetadata); + } + using (var writer = new BinaryWriter(new FileStream(CommitFile, FileMode.OpenOrCreate))) + { + writer.Write(ms.ToArray()); + writer.Flush(); + } + } + } + + /// + /// Retrieve commit metadata + /// + /// Metadata, or null if invalid + public byte[] GetCommitMetadata() + { + if (!File.Exists(CommitFile)) + return null; + + using (var reader = new BinaryReader(new FileStream(CommitFile, FileMode.Open))) + { + var len = reader.ReadInt32(); + return reader.ReadBytes(len); + } + } + } +} \ No newline at end of file diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index e3ffcffad..4ae0388ac 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -89,7 +89,7 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) recoveredHLCInfo.info.DebugPrint(); // Check if the two checkpoints are compatible for recovery - if(!IsCompatible(recoveredICInfo.info, recoveredHLCInfo.info)) + if (!IsCompatible(recoveredICInfo.info, recoveredHLCInfo.info)) { throw new Exception("Cannot recover from (" + indexToken.ToString() + "," + hybridLogToken.ToString() + ") checkpoint pair!\n"); } @@ -118,69 +118,15 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken) { RecoverHybridLogFromSnapshotFile(recoveredICInfo.info, recoveredHLCInfo.info); } - + // Read appropriate hybrid log pages into memory - RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress, recoveredHLCInfo.info.beginAddress); + hlog.RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress, recoveredHLCInfo.info.beginAddress); // Recover session information _recoveredSessions = recoveredHLCInfo.info.continueTokens; } - private void RestoreHybridLog(long untilAddress, long headAddress, long beginAddress) - { - Debug.Assert(beginAddress <= headAddress); - Debug.Assert(headAddress <= untilAddress); - - // Special cases: we do not load any records into memory - if ( - (beginAddress == untilAddress) || // Empty log - ((headAddress == untilAddress) && (hlog.GetOffsetInPage(headAddress) == 0)) // Empty in-memory page - ) - { - hlog.AllocatePage(hlog.GetPageIndexForAddress(headAddress)); - } - else - { - var tailPage = hlog.GetPage(untilAddress); - var headPage = hlog.GetPage(headAddress); - - var recoveryStatus = new RecoveryStatus(hlog.GetCapacityNumPages(), headPage, tailPage, untilAddress); - for (int i = 0; i < recoveryStatus.capacity; i++) - { - recoveryStatus.readStatus[i] = ReadStatus.Done; - } - - var numPages = 0; - for (var page = headPage; page <= tailPage; page++) - { - var pageIndex = hlog.GetPageIndexForPage(page); - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; - numPages++; - } - - hlog.AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); - - var done = false; - while (!done) - { - done = true; - for (long page = headPage; page <= tailPage; page++) - { - int pageIndex = hlog.GetPageIndexForPage(page); - if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) - { - done = false; - break; - } - } - } - } - - hlog.RecoveryReset(untilAddress, headAddress, beginAddress); - } - - private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, HybridLogRecoveryInfo recoveryInfo) { @@ -202,8 +148,8 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); // Issue request to read pages as much as possible - hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); - + hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus); + for (long page = startPage; page < endPage; page++) { // Ensure page has been read into memory @@ -227,7 +173,7 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, { pageUntilAddress = hlog.GetOffsetInPage(untilAddress); } - + var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress); RecoverFromPage(fromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, recoveryInfo.version); @@ -292,7 +238,7 @@ private void RecoverHybridLogFromSnapshotFile( int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress, - AsyncReadPagesCallbackForRecovery, + hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus, recoveryStatus.recoveryDevicePageOffset, recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice); @@ -430,26 +376,6 @@ private void RecoverFromPage(long startRecoveryAddress, } } - private void AsyncReadPagesCallbackForRecovery(uint errorCode, uint numBytes, NativeOverlapped* overlap) - { - if (errorCode != 0) - { - Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); - } - - // Set the page status to flushed - var result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult; - - if (result.freeBuffer1 != null) - { - hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); - result.freeBuffer1.Return(); - } - int index = hlog.GetPageIndexForPage(result.page); - result.context.readStatus[index] = ReadStatus.Done; - Interlocked.MemoryBarrier(); - Overlapped.Free(overlap); - } private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, NativeOverlapped* overlap) { @@ -470,11 +396,11 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, Na long readPage = result.page + result.context.capacity; if (FoldOverSnapshot) { - hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, result.context); + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery, result.context); } else { - hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, AsyncReadPagesCallbackForRecovery, + hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery, result.context, result.context.recoveryDevicePageOffset, result.context.recoveryDevice, result.context.objectLogRecoveryDevice); @@ -485,4 +411,90 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, Na Overlapped.Free(overlap); } } + + public unsafe abstract partial class AllocatorBase : IDisposable + where Key : new() + where Value : new() + { + /// + /// Restore log + /// + /// + /// + /// + public void RestoreHybridLog(long untilAddress, long headAddress, long beginAddress) + { + Debug.Assert(beginAddress <= headAddress); + Debug.Assert(headAddress <= untilAddress); + + // Special cases: we do not load any records into memory + if ( + (beginAddress == untilAddress) || // Empty log + ((headAddress == untilAddress) && (GetOffsetInPage(headAddress) == 0)) // Empty in-memory page + ) + { + if (!IsAllocated(GetPageIndexForAddress(headAddress))) + AllocatePage(GetPageIndexForAddress(headAddress)); + } + else + { + var tailPage = GetPage(untilAddress); + var headPage = GetPage(headAddress); + + var recoveryStatus = new RecoveryStatus(GetCapacityNumPages(), headPage, tailPage, untilAddress); + for (int i = 0; i < recoveryStatus.capacity; i++) + { + recoveryStatus.readStatus[i] = ReadStatus.Done; + } + + var numPages = 0; + for (var page = headPage; page <= tailPage; page++) + { + var pageIndex = GetPageIndexForPage(page); + recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; + numPages++; + } + + AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); + + var done = false; + while (!done) + { + done = true; + for (long page = headPage; page <= tailPage; page++) + { + int pageIndex = GetPageIndexForPage(page); + if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) + { + done = false; + break; + } + } + } + } + + RecoveryReset(untilAddress, headAddress, beginAddress); + } + + internal void AsyncReadPagesCallbackForRecovery(uint errorCode, uint numBytes, NativeOverlapped* overlap) + { + if (errorCode != 0) + { + Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); + } + + // Set the page status to flushed + var result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult; + + if (result.freeBuffer1 != null) + { + PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + result.freeBuffer1.Return(); + } + int index = GetPageIndexForPage(result.page); + result.context.readStatus[index] = ReadStatus.Done; + Interlocked.MemoryBarrier(); + Overlapped.Free(overlap); + } + } } diff --git a/cs/src/core/Utilities/PageAsyncResultTypes.cs b/cs/src/core/Utilities/PageAsyncResultTypes.cs index 5a8792ce7..eb349ad3f 100644 --- a/cs/src/core/Utilities/PageAsyncResultTypes.cs +++ b/cs/src/core/Utilities/PageAsyncResultTypes.cs @@ -22,6 +22,7 @@ public class PageAsyncReadResult : IAsyncResult internal IOCompletionCallback callback; internal IDevice objlogDevice; internal object frame; + internal CancellationTokenSource cts; /* Used for iteration */ internal long resumePtr; diff --git a/cs/src/core/Utilities/Utility.cs b/cs/src/core/Utilities/Utility.cs index 3909371d1..c250d1b81 100644 --- a/cs/src/core/Utilities/Utility.cs +++ b/cs/src/core/Utilities/Utility.cs @@ -121,7 +121,6 @@ public static long GetHashCode(long input) return (long)Rotr64((ulong)local_rand_hash, 45); } - /// /// Get 64-bit hash code for a byte array /// @@ -147,7 +146,42 @@ public static unsafe long HashBytes(byte* pbString, int len) return (long)Rotr64(magicno * hashState, 4); } - + + /// + /// Compute XOR of all provided bytes + /// + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static unsafe ulong XorBytes(byte* src, int length) + { + ulong result = 0; + byte* curr = src; + byte* end = src + length; + while (curr + 4 * sizeof(ulong) <= end) + { + result ^= *(ulong*)curr; + result ^= *(1 + (ulong*)curr); + result ^= *(2 + (ulong*)curr); + result ^= *(3 + (ulong*)curr); + curr += 4 * sizeof(ulong); + } + while (curr + sizeof(ulong) <= end) + { + result ^= *(ulong*)curr; + curr += sizeof(ulong); + } + while (curr + 1 <= end) + { + result ^= *curr; + curr++; + } + + return result; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static ulong Rotr64(ulong x, int n) { diff --git a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj index de06d0e90..a1d1766eb 100644 --- a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj +++ b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj @@ -35,7 +35,7 @@ - + diff --git a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec index 85c52a7f8..4c13700fa 100644 --- a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec +++ b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec @@ -6,21 +6,21 @@ Azure storage device for FASTER Microsoft Microsoft - https://github.com/Microsoft/FASTER + https://github.com/microsoft/FASTER MIT true - This is a FASTER device implementation for Azure Storage (page blobs). FASTER is a fast concurrent key-value store that also supports indexing of larger-than-memory data. - See the project website at https://github.com/Microsoft/FASTER for more details + This is a FASTER device implementation for Azure Storage (page blobs). FASTER is a fast concurrent key-value store and log for larger-than-memory data. + See the project website at https://github.com/microsoft/FASTER for more details © Microsoft Corporation. All rights reserved. en-US key-value store dictionary hashtable concurrent log persistent azure storage FASTER - + - + diff --git a/cs/test/FasterLogTests.cs b/cs/test/FasterLogTests.cs new file mode 100644 index 000000000..e1a020269 --- /dev/null +++ b/cs/test/FasterLogTests.cs @@ -0,0 +1,210 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using FASTER.core; +using NUnit.Framework; + +namespace FASTER.test +{ + + [TestFixture] + internal class FasterLogTests + { + const int entryLength = 100; + const int numEntries = 1000000; + private FasterLog log; + private IDevice device; + + [SetUp] + public void Setup() + { + if (File.Exists(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit")) + File.Delete(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit"); + device = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log", deleteOnClose: true); + } + + [TearDown] + public void TearDown() + { + device.Close(); + if (File.Exists(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit")) + File.Delete(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit"); + } + + [Test] + public void FasterLogTest1([Values]LogChecksumType logChecksum) + { + log = new FasterLog(new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum }); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + for (int i = 0; i < numEntries; i++) + { + log.Enqueue(entry); + } + log.Commit(true); + + using (var iter = log.Scan(0, long.MaxValue)) + { + int count = 0; + while (iter.GetNext(out byte[] result, out int length)) + { + count++; + Assert.IsTrue(result.SequenceEqual(entry)); + if (count % 100 == 0) + log.TruncateUntil(iter.CurrentAddress); + } + Assert.IsTrue(count == numEntries); + } + + log.Dispose(); + } + + [Test] + public async Task FasterLogTest2([Values]LogChecksumType logChecksum) + { + log = new FasterLog(new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum }); + byte[] data1 = new byte[10000]; + for (int i = 0; i < 10000; i++) data1[i] = (byte)i; + + using (var iter = log.Scan(0, long.MaxValue, scanBufferingMode: ScanBufferingMode.SinglePageBuffering)) + { + int i = 0; + while (i++ < 500) + { + var waitingReader = iter.WaitAsync(); + Assert.IsTrue(!waitingReader.IsCompleted); + + while (!log.TryEnqueue(data1, out _)) ; + Assert.IsFalse(waitingReader.IsCompleted); + + await log.CommitAsync(); + while (!waitingReader.IsCompleted) ; + Assert.IsTrue(waitingReader.IsCompleted); + + var curr = iter.GetNext(out byte[] result, out _); + Assert.IsTrue(curr); + Assert.IsTrue(result.SequenceEqual(data1)); + + var next = iter.GetNext(out _, out _); + Assert.IsFalse(next); + } + } + log.Dispose(); + } + + [Test] + public async Task FasterLogTest3([Values]LogChecksumType logChecksum) + { + log = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 14, LogChecksum = logChecksum }); + byte[] data1 = new byte[10000]; + for (int i = 0; i < 10000; i++) data1[i] = (byte)i; + + using (var iter = log.Scan(0, long.MaxValue, scanBufferingMode: ScanBufferingMode.SinglePageBuffering)) + { + var appendResult = log.TryEnqueue(data1, out _); + Assert.IsTrue(appendResult); + await log.CommitAsync(); + await iter.WaitAsync(); + var iterResult = iter.GetNext(out byte[] entry, out _); + Assert.IsTrue(iterResult); + + appendResult = log.TryEnqueue(data1, out _); + Assert.IsFalse(appendResult); + await iter.WaitAsync(); + + // Should read the "hole" and return false + iterResult = iter.GetNext(out entry, out _); + Assert.IsFalse(iterResult); + + // Should wait for next item + var task = iter.WaitAsync(); + Assert.IsFalse(task.IsCompleted); + + appendResult = log.TryEnqueue(data1, out _); + Assert.IsTrue(appendResult); + await log.CommitAsync(); + + await task; + iterResult = iter.GetNext(out entry, out _); + Assert.IsTrue(iterResult); + } + log.Dispose(); + } + + [Test] + public async Task FasterLogTest4([Values]LogChecksumType logChecksum) + { + log = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 14, LogChecksum = logChecksum }); + byte[] data1 = new byte[100]; + for (int i = 0; i < 100; i++) data1[i] = (byte)i; + + for (int i=0; i<100; i++) + { + log.Enqueue(data1); + } + + Assert.IsTrue(log.CommittedUntilAddress == log.BeginAddress); + await log.CommitAsync(); + + Assert.IsTrue(log.CommittedUntilAddress == log.TailAddress); + Assert.IsTrue(log.CommittedBeginAddress == log.BeginAddress); + + using (var iter = log.Scan(0, long.MaxValue)) + { + // Should read the "hole" and return false + var iterResult = iter.GetNext(out byte[] entry, out _); + log.TruncateUntil(iter.NextAddress); + + Assert.IsTrue(log.CommittedUntilAddress == log.TailAddress); + Assert.IsTrue(log.CommittedBeginAddress < log.BeginAddress); + Assert.IsTrue(iter.NextAddress == log.BeginAddress); + + await log.CommitAsync(); + + Assert.IsTrue(log.CommittedUntilAddress == log.TailAddress); + Assert.IsTrue(log.CommittedBeginAddress == log.BeginAddress); + } + log.Dispose(); + } + + [Test] + public async Task FasterLogTest5([Values]LogChecksumType logChecksum) + { + log = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum }); + + int headerSize = logChecksum == LogChecksumType.None ? 4 : 12; + bool _disposed = false; + var commit = new Thread(() => { while (!_disposed) { log.Commit(true); Thread.Sleep(1); } }); + + commit.Start(); + + // 65536=page size|headerSize|64=log header + await log.EnqueueAndWaitForCommitAsync(new byte[65536 - headerSize - 64]); + + // 65536=page size|headerSize + await log.EnqueueAndWaitForCommitAsync(new byte[65536 - headerSize]); + + // 65536=page size|headerSize + await log.EnqueueAndWaitForCommitAsync(new byte[65536 - headerSize]); + + // 65536=page size|headerSize + await log.EnqueueAndWaitForCommitAsync(new byte[65536 - headerSize]); + + // 65536=page size|headerSize + await log.EnqueueAndWaitForCommitAsync(new byte[65536 - headerSize]); + + _disposed = true; + + commit.Join(); + log.Dispose(); + } + } +}