From 16edd83aa895951640135e77b3d3402ace6fdfdd Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 17 Sep 2019 10:32:16 -0700 Subject: [PATCH] Initial checkin --- cs/FASTER.sln | 9 + cs/playground/FasterLogSample/App.config | 6 + .../FasterLogSample/FasterLogSample.csproj | 38 +++ cs/playground/FasterLogSample/Program.cs | 99 ++++++++ .../Properties/AssemblyInfo.cs | 22 ++ cs/src/core/Device/Devices.cs | 9 +- cs/src/core/Device/LocalStorageDevice.cs | 21 +- .../core/Device/ManagedLocalStorageDevice.cs | 23 +- cs/src/core/Device/StorageDeviceBase.cs | 2 +- cs/src/core/Epochs/FastThreadLocal.cs | 50 ++-- cs/src/core/Epochs/LightEpoch.cs | 61 +++-- cs/src/core/FASTER.core.csproj | 1 + cs/src/core/Index/FasterLog/FasterLog.cs | 197 +++++++++++++++ .../core/Index/FasterLog/FasterLogIterator.cs | 224 ++++++++++++++++++ 14 files changed, 708 insertions(+), 54 deletions(-) create mode 100644 cs/playground/FasterLogSample/App.config create mode 100644 cs/playground/FasterLogSample/FasterLogSample.csproj create mode 100644 cs/playground/FasterLogSample/Program.cs create mode 100644 cs/playground/FasterLogSample/Properties/AssemblyInfo.cs create mode 100644 cs/src/core/Index/FasterLog/FasterLog.cs create mode 100644 cs/src/core/Index/FasterLog/FasterLogIterator.cs diff --git a/cs/FASTER.sln b/cs/FASTER.sln index af8a268e5..c1f9b7a42 100644 --- a/cs/FASTER.sln +++ b/cs/FASTER.sln @@ -44,6 +44,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B1 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -156,6 +158,12 @@ Global {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64 {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|Any CPU.ActiveCfg = Debug|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.ActiveCfg = Debug|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.Build.0 = Debug|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.ActiveCfg = Release|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64 + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -176,6 +184,7 @@ Global {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496} {E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE} + {25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC} diff --git a/cs/playground/FasterLogSample/App.config b/cs/playground/FasterLogSample/App.config new file mode 100644 index 000000000..d69a9b153 --- /dev/null +++ b/cs/playground/FasterLogSample/App.config @@ -0,0 +1,6 @@ + + + + + + diff --git a/cs/playground/FasterLogSample/FasterLogSample.csproj b/cs/playground/FasterLogSample/FasterLogSample.csproj new file mode 100644 index 000000000..7b8c2eee1 --- /dev/null +++ b/cs/playground/FasterLogSample/FasterLogSample.csproj @@ -0,0 +1,38 @@ + + + + net46 + x64 + win7-x64 + + + + Exe + true + StructSample + prompt + PackageReference + true + + + + TRACE;DEBUG + full + true + bin\x64\Debug\ + + + TRACE + pdbonly + true + bin\x64\Release\ + + + + + + + + + + \ No newline at end of file diff --git a/cs/playground/FasterLogSample/Program.cs b/cs/playground/FasterLogSample/Program.cs new file mode 100644 index 000000000..1b4dd542c --- /dev/null +++ b/cs/playground/FasterLogSample/Program.cs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using FASTER.core.log; +using System; +using System.Diagnostics; +using System.Diagnostics.Eventing.Reader; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace FasterLogSample +{ + public class Program + { + const int entryLength = 100; + static FasterLog log; + + static void ReportThread() + { + long lastTime = 0; + long lastValue = log.TailAddress; + Stopwatch sw = new Stopwatch(); + sw.Start(); + + while (true) + { + Thread.Sleep(10000); + var nowTime = sw.ElapsedMilliseconds; + var nowValue = log.TailAddress; + + Console.WriteLine("Throughput: {0} MB/sec", + (nowValue - lastValue) / (1000*(nowTime - lastTime))); + lastTime = nowTime; + lastValue = nowValue; + } + } + + static void AppendThread() + { + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + while (true) + { + log.Append(entry); + } + } + + static void ScanThread() + { + Thread.Sleep(5000); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + var entrySpan = new Span(entry); + + + long lastAddress = 0; + Span result; + using (var iter = log.Scan(0, long.MaxValue)) + { + while (true) + { + while (!iter.GetNext(out result)) + Thread.Sleep(1000); + if (!result.SequenceEqual(entrySpan)) + { + throw new Exception("Invalid entry found"); + } + + if (iter.CurrentAddress - lastAddress > 500000000) + { + log.TruncateUntil(iter.CurrentAddress); + lastAddress = iter.CurrentAddress; + } + } + } + } + + static void Main(string[] args) + { + var device = Devices.CreateLogDevice("E:\\logs\\hlog.log"); + log = new FasterLog(new FasterLogSettings { LogDevice = device, MemorySizeBits = 26 }); + + new Thread(new ThreadStart(AppendThread)).Start(); + //new Thread(new ThreadStart(AppendThread)).Start(); + //new Thread(new ThreadStart(AppendThread)).Start(); + //new Thread(new ThreadStart(AppendThread)).Start(); + // new Thread(new ThreadStart(ScanThread)).Start(); + new Thread(new ThreadStart(ReportThread)).Start(); + + Thread.Sleep(500*1000); + } + } +} diff --git a/cs/playground/FasterLogSample/Properties/AssemblyInfo.cs b/cs/playground/FasterLogSample/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..5e08438c2 --- /dev/null +++ b/cs/playground/FasterLogSample/Properties/AssemblyInfo.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyDescription("")] +[assembly: AssemblyCopyright("Copyright © 2017")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")] diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs index 14a975445..4bfa0a14f 100644 --- a/cs/src/core/Device/Devices.cs +++ b/cs/src/core/Device/Devices.cs @@ -26,9 +26,10 @@ public static class Devices /// Path to file that will store the log (empty for null device) /// Whether we try to preallocate the file on creation /// Delete files on close - /// + /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + /// Whether to recover device metadata from existing files /// Device instance - public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED) + public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false) { if (string.IsNullOrWhiteSpace(logPath)) return new NullDevice(); @@ -38,12 +39,12 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = tru #if DOTNETCORE if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { - logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity); + logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity, recoverDevice); } else #endif { - logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity: capacity); + logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice); } return logDevice; } diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index c034390fa..aa6d0aa9d 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -4,6 +4,7 @@ using Microsoft.Win32.SafeHandles; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Runtime.InteropServices; @@ -24,16 +25,18 @@ public class LocalStorageDevice : StorageDeviceBase /// /// Constructor /// - /// + /// File name (or prefix) with path /// /// /// /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + /// Whether to recover device metadata from existing files public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, - long capacity = Devices.CAPACITY_UNSPECIFIED) + long capacity = Devices.CAPACITY_UNSPECIFIED, + bool recoverDevice = false) : base(filename, GetSectorSize(filename), capacity) { @@ -42,7 +45,8 @@ public LocalStorageDevice(string filename, this.deleteOnClose = deleteOnClose; this.disableFileBuffering = disableFileBuffering; logHandles = new SafeConcurrentDictionary(); - RecoverFiles(); + if (recoverDevice) + RecoverFiles(); } private void RecoverFiles() @@ -53,14 +57,19 @@ private void RecoverFiles() string bareName = fi.Name; - int prevSegmentId = -1; + List segids = new List(); foreach (FileInfo item in di.GetFiles(bareName + "*")) { - int segmentId = Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")); + segids.Add(Int32.Parse(item.Name.Replace(bareName, "").Replace(".", ""))); + } + segids.Sort(); + + int prevSegmentId = -1; + foreach (int segmentId in segids) + { if (segmentId != prevSegmentId + 1) { startSegment = segmentId; - } else { diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 35ffebeb9..255cd8132 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -4,6 +4,7 @@ using Microsoft.Win32.SafeHandles; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Runtime.InteropServices; @@ -24,11 +25,12 @@ public class ManagedLocalStorageDevice : StorageDeviceBase /// /// /// - /// + /// File name (or prefix) with path /// /// - /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit - public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED) + /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit + /// Whether to recover device metadata from existing files + public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED, bool recoverDevice = false) : base(filename, GetSectorSize(filename), capacity) { pool = new SectorAlignedBufferPool(1, 1); @@ -36,7 +38,8 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, this.preallocateFile = preallocateFile; this.deleteOnClose = deleteOnClose; logHandles = new ConcurrentDictionary(); - RecoverFiles(); + if (recoverDevice) + RecoverFiles(); } @@ -48,14 +51,19 @@ private void RecoverFiles() string bareName = fi.Name; - int prevSegmentId = -1; + List segids = new List(); foreach (FileInfo item in di.GetFiles(bareName + "*")) { - int segmentId = Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")); + segids.Add(Int32.Parse(item.Name.Replace(bareName, "").Replace(".", ""))); + } + segids.Sort(); + + int prevSegmentId = -1; + foreach (int segmentId in segids) + { if (segmentId != prevSegmentId + 1) { startSegment = segmentId; - } else { @@ -68,6 +76,7 @@ private void RecoverFiles() + class ReadCallbackWrapper { readonly IOCompletionCallback callback; diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index 1c84d708c..7ab14ab82 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -233,7 +233,7 @@ public void TruncateUntilSegment(int toSegment) public virtual void TruncateUntilAddressAsync(long toAddress, AsyncCallback callback, IAsyncResult result) { // Truncate only up to segment boundary if address is not aligned - TruncateUntilSegmentAsync((int)toAddress >> segmentSizeBits, callback, result); + TruncateUntilSegmentAsync((int)(toAddress >> segmentSizeBits), callback, result); } /// diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs index e9f53656f..7bc2e2e30 100644 --- a/cs/src/core/Epochs/FastThreadLocal.cs +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Net; using System.Threading; namespace FASTER.core @@ -16,18 +17,25 @@ internal class FastThreadLocal private const int kMaxInstances = 128; [ThreadStatic] - private static T[] values; + private static T[] tl_values; + [ThreadStatic] + private static int[] tl_iid; + + private readonly int offset; + private readonly int iid; - private readonly int id; private static readonly int[] instances = new int[kMaxInstances]; + private static int instanceId = 0; public FastThreadLocal() { + iid = Interlocked.Increment(ref instanceId); + for (int i = 0; i < kMaxInstances; i++) { - if (0 == Interlocked.CompareExchange(ref instances[i], 1, 0)) + if (0 == Interlocked.CompareExchange(ref instances[i], iid, 0)) { - id = i; + offset = i; return; } } @@ -36,22 +44,22 @@ public FastThreadLocal() public void InitializeThread() { - if (values == null) - values = new T[kMaxInstances]; + if (tl_values == null) + { + tl_values = new T[kMaxInstances]; + tl_iid = new int[kMaxInstances]; + } + if (tl_iid[offset] != iid) + { + tl_iid[offset] = iid; + tl_values[offset] = default(T); + } } public void DisposeThread() { - Value = default(T); - - // Dispose values only if there are no other - // instances active for this thread - for (int i = 0; i < kMaxInstances; i++) - { - if ((instances[i] == 1) && (i != id)) - return; - } - values = null; + tl_values[offset] = default(T); + tl_iid[offset] = 0; } /// @@ -59,15 +67,15 @@ public void DisposeThread() /// public void Dispose() { - instances[id] = 0; + instances[offset] = 0; } public T Value { - get => values[id]; - set => values[id] = value; + get => tl_values[offset]; + set => tl_values[offset] = value; } - public bool IsInitializedForThread => values != null; + public bool IsInitializedForThread => (tl_values != null) && (iid == tl_iid[offset]); } -} +} \ No newline at end of file diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index 2cd7f6232..cd47f0aaf 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -52,7 +52,14 @@ public unsafe class LightEpoch /// /// A thread's entry in the epoch table. /// - private FastThreadLocal threadEntryIndex; + [ThreadStatic] + private static int threadEntryIndex; + + /// + /// Number of instances using this entry + /// + [ThreadStatic] + private static int threadEntryIndexCount; /// /// Global current epoch value @@ -79,7 +86,7 @@ public LightEpoch(int size = kTableSize) /// unsafe void Initialize(int size) { - threadEntryIndex = new FastThreadLocal(); + // threadEntryIndex = new FastThreadLocal(); numEntries = size; // Over-allocate to do cache-line alignment @@ -112,7 +119,7 @@ public void Dispose() CurrentEpoch = 1; SafeToReclaimEpoch = 0; - threadEntryIndex.Dispose(); + // threadEntryIndex.Dispose(); } /// @@ -121,7 +128,7 @@ public void Dispose() /// Result of the check public bool IsProtected() { - return threadEntryIndex.IsInitializedForThread && kInvalidIndex != threadEntryIndex.Value; + return kInvalidIndex != threadEntryIndex; } /// @@ -131,7 +138,7 @@ public bool IsProtected() [MethodImpl(MethodImplOptions.AggressiveInlining)] public int ProtectAndDrain() { - int entry = threadEntryIndex.Value; + int entry = threadEntryIndex; (*(tableAligned + entry)).localCurrentEpoch = CurrentEpoch; @@ -175,8 +182,9 @@ private void Drain(int nextEpoch) /// public void Acquire() { - threadEntryIndex.InitializeThread(); - threadEntryIndex.Value = ReserveEntryForThread(); + if (threadEntryIndex == kInvalidIndex) + threadEntryIndex = ReserveEntryForThread(); + threadEntryIndexCount++; } @@ -185,16 +193,39 @@ public void Acquire() /// public void Release() { - int entry = threadEntryIndex.Value; + int entry = threadEntryIndex; if (kInvalidIndex == entry) { return; } - threadEntryIndex.Value = kInvalidIndex; - threadEntryIndex.DisposeThread(); - (*(tableAligned + entry)).localCurrentEpoch = 0; - (*(tableAligned + entry)).threadId = 0; + threadEntryIndexCount--; + if (threadEntryIndexCount == 0) + { + threadEntryIndex = kInvalidIndex; + (*(tableAligned + entry)).localCurrentEpoch = 0; + (*(tableAligned + entry)).threadId = 0; + } + } + + /// + /// Thread suspends its epoch entry + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Suspend() + { + (*(tableAligned + threadEntryIndex)).localCurrentEpoch = int.MaxValue; + } + + /// + /// Thread resumes its epoch entry + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Resume() + { + if (threadEntryIndex == kInvalidIndex) + Acquire(); + ProtectAndDrain(); } /// @@ -388,7 +419,7 @@ private struct EpochActionPair [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool MarkAndCheckIsComplete(int markerIdx, int version) { - int entry = threadEntryIndex.Value; + int entry = threadEntryIndex; if (kInvalidIndex == entry) { Debug.WriteLine("New Thread entered during CPR"); @@ -404,7 +435,7 @@ public bool MarkAndCheckIsComplete(int markerIdx, int version) int fc_version = (*(tableAligned + index)).markers[markerIdx]; if (0 != entry_epoch) { - if (fc_version != version) + if (fc_version != version && entry_epoch < int.MaxValue) { return false; } @@ -413,4 +444,4 @@ public bool MarkAndCheckIsComplete(int markerIdx, int version) return true; } } -} +} \ No newline at end of file diff --git a/cs/src/core/FASTER.core.csproj b/cs/src/core/FASTER.core.csproj index 1d68a5425..dca9cd6a5 100644 --- a/cs/src/core/FASTER.core.csproj +++ b/cs/src/core/FASTER.core.csproj @@ -35,6 +35,7 @@ + \ No newline at end of file diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs new file mode 100644 index 000000000..a34376ea1 --- /dev/null +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -0,0 +1,197 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma warning disable 0162 + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; +using FASTER.core; + +namespace FASTER.core.log +{ + public class FasterLogSettings + { + /// + /// Device used for log + /// + public IDevice LogDevice = new NullDevice(); + + /// + /// Size of a segment (group of pages), in bits + /// + public int PageSizeBits = 22; + + /// + /// Size of a segment (group of pages), in bits + /// + public int SegmentSizeBits = 30; + + /// + /// Total size of in-memory part of log, in bits + /// + public int MemorySizeBits = 34; + + internal LogSettings GetLogSettings() + { + return new LogSettings + { + LogDevice = LogDevice, + PageSizeBits = PageSizeBits, + SegmentSizeBits = SegmentSizeBits, + MemorySizeBits = MemorySizeBits, + CopyReadsToTail = false, + MutableFraction = 0, + ObjectLogDevice = null, + ReadCacheSettings = null + }; + } + } + + /// + /// FASTER log + /// + public class FasterLog + { + private readonly BlittableAllocator allocator; + private readonly LightEpoch epoch; + + /// + /// Beginning address of log + /// + public long BeginAddress => allocator.BeginAddress; + + /// + /// Tail address of log + /// + public long TailAddress => allocator.GetTailAddress(); + + /// + /// Flushed until address + /// + public long FlushedUntilAddress => allocator.FlushedUntilAddress; + + /// + /// Create new log instance + /// + /// + public FasterLog(FasterLogSettings logSettings) + { + this.epoch = new LightEpoch(); + allocator = new BlittableAllocator(logSettings.GetLogSettings(), null, null, epoch); + allocator.Initialize(); + } + + /// + /// Append entry to log + /// + /// + /// Logical address of added entry + public unsafe long Append(Span entry) + { + epoch.Resume(); + var length = entry.Length; + BlockAllocate(4 + length, out long logicalAddress); + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + *(int*)physicalAddress = length; + fixed (byte* bp = &entry.GetPinnableReference()) + Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); + epoch.Suspend(); + return logicalAddress; + } + + /// + /// Append entry to log + /// + /// + /// Logical address of added entry + public unsafe long Append(byte[] entry) + { + + epoch.Resume(); + var length = entry.Length; + BlockAllocate(4 + length, out long logicalAddress); + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + *(int*)physicalAddress = length; + fixed (byte* bp = entry) + Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length); + epoch.Suspend(); + return logicalAddress; + } + + /// + /// Flush the log until tail + /// + public long Flush(bool spinWait = false) + { + epoch.Resume(); + allocator.ShiftReadOnlyToTail(out long tailAddress); + epoch.Suspend(); + if (spinWait) + { + while (allocator.FlushedUntilAddress < tailAddress) + Thread.Yield(); + } + return tailAddress; + } + + /// + /// Truncate the log until, but not including, untilAddress + /// + /// + public void TruncateUntil(long untilAddress) + { + epoch.Resume(); + allocator.ShiftBeginAddress(untilAddress); + epoch.Suspend(); + } + + /// + /// Iterator interface for scanning FASTER log + /// + /// + /// + /// + /// + public FasterLogScanIterator Scan(long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode = ScanBufferingMode.DoublePageBuffering) + { + return new FasterLogScanIterator(allocator, beginAddress, endAddress, scanBufferingMode, epoch); + } + + /// + /// Dispose this thread's epoch entry. Use when you manage your own + /// threads and want to recycle a thread-local epoch entry. + /// + public void DisposeThread() + { + epoch.Release(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void BlockAllocate(int recordSize, out long logicalAddress) + { + logicalAddress = allocator.Allocate(recordSize); + if (logicalAddress >= 0) return; + + while (logicalAddress < 0 && -logicalAddress >= allocator.ReadOnlyAddress) + { + epoch.ProtectAndDrain(); + allocator.CheckForAllocateComplete(ref logicalAddress); + if (logicalAddress < 0) + { + Thread.Sleep(10); + } + } + + logicalAddress = logicalAddress < 0 ? -logicalAddress : logicalAddress; + + if (logicalAddress < allocator.ReadOnlyAddress) + { + Debug.WriteLine("Allocated address is read-only, retrying"); + BlockAllocate(recordSize, out logicalAddress); + } + } + } +} diff --git a/cs/src/core/Index/FasterLog/FasterLogIterator.cs b/cs/src/core/Index/FasterLog/FasterLogIterator.cs new file mode 100644 index 000000000..1fccba3ea --- /dev/null +++ b/cs/src/core/Index/FasterLog/FasterLogIterator.cs @@ -0,0 +1,224 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Threading; +using System.Diagnostics; + +namespace FASTER.core.log +{ + /// + /// Scan iterator for hybrid log + /// + public class FasterLogScanIterator : IDisposable + { + private readonly int frameSize; + private readonly BlittableAllocator allocator; + private readonly long beginAddress, endAddress; + private readonly BlittableFrame frame; + private readonly CountdownEvent[] loaded; + + private bool first = true; + private long currentAddress, nextAddress; + private long currentPhysicalAddress; + private LightEpoch epoch; + + /// + /// Current address + /// + public long CurrentAddress => currentAddress; + + /// + /// Constructor + /// + /// + /// + /// + /// + public unsafe FasterLogScanIterator(BlittableAllocator hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch) + { + this.allocator = hlog; + this.epoch = epoch; + + if (beginAddress == 0) + beginAddress = hlog.GetFirstValidLogicalAddress(0); + + this.beginAddress = beginAddress; + this.endAddress = endAddress; + currentAddress = -1; + nextAddress = beginAddress; + + if (scanBufferingMode == ScanBufferingMode.SinglePageBuffering) + frameSize = 1; + else if (scanBufferingMode == ScanBufferingMode.DoublePageBuffering) + frameSize = 2; + else if (scanBufferingMode == ScanBufferingMode.NoBuffering) + { + frameSize = 0; + return; + } + + frame = new BlittableFrame(frameSize, hlog.PageSize, hlog.GetDeviceSectorSize()); + loaded = new CountdownEvent[frameSize]; + + // Only load addresses flushed to disk + if (nextAddress < hlog.HeadAddress) + { + var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize; + hlog.AsyncReadPagesFromDeviceToFrame + (nextAddress >> hlog.LogPageSizeBits, + 1, endAddress, AsyncReadPagesCallback, Empty.Default, + frame, out loaded[frameNumber]); + } + } + + /// + /// Get next record in iterator + /// + /// + /// + public unsafe bool GetNext(out Span entry) + { + currentAddress = nextAddress; + while (true) + { + // Check for boundary conditions + if ((currentAddress >= endAddress) || (currentAddress >= allocator.ReadOnlyAddress)) + { + entry = default(Span); + return false; + } + + if (currentAddress < allocator.BeginAddress) + { + throw new Exception("Iterator address is less than log BeginAddress " + allocator.BeginAddress); + } + + if (frameSize == 0 && currentAddress < allocator.HeadAddress) + { + throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode"); + } + + var currentPage = currentAddress >> allocator.LogPageSizeBits; + var offset = currentAddress & allocator.PageSizeMask; + + var headAddress = allocator.HeadAddress; + var physicalAddress = default(long); + + if (currentAddress < headAddress) + { + BufferAndLoad(currentAddress, currentPage, currentPage % frameSize); + physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset); + } + else + { + epoch.Resume(); + headAddress = allocator.HeadAddress; + if (currentAddress < headAddress) // rare case + { + epoch.Suspend(); + continue; + } + + physicalAddress = allocator.GetPhysicalAddress(currentAddress); + } + + // Check if record fits on page, if not skip to next page + int length = *(int*)physicalAddress; + int recordSize = 4; + if (length > 0) + recordSize += length; + if ((currentAddress & allocator.PageSizeMask) + recordSize > allocator.PageSize) + { + if (currentAddress >= headAddress) + epoch.Suspend(); + currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits; + continue; + } + + if (length == 0) + { + if (currentAddress >= headAddress) + epoch.Suspend(); + currentAddress += recordSize; + continue; + } + + entry = new Span((void*)(physicalAddress + 4), length); + if (currentAddress >= headAddress) + { + // Have to copy out bytes within epoch protection in + // this case because this is a shared buffer + var _entry = new byte[length]; + entry.CopyTo(_entry); + entry = _entry; + epoch.Suspend(); + } + currentPhysicalAddress = physicalAddress; + nextAddress = currentAddress + recordSize; + return true; + } + } + + /// + /// Dispose the iterator + /// + public void Dispose() + { + frame?.Dispose(); + } + + private unsafe void BufferAndLoad(long currentAddress, long currentPage, long currentFrame) + { + if (first || (currentAddress & allocator.PageSizeMask) == 0) + { + // Prefetch pages based on buffering mode + if (frameSize == 1) + { + if (!first) + { + allocator.AsyncReadPagesFromDeviceToFrame(currentAddress >> allocator.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]); + } + } + else + { + var endPage = endAddress >> allocator.LogPageSizeBits; + if ((endPage > currentPage) && + ((endPage > currentPage + 1) || ((endAddress & allocator.PageSizeMask) != 0))) + { + allocator.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> allocator.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]); + } + } + first = false; + } + loaded[currentFrame].Wait(); + } + + private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap) + { + if (errorCode != 0) + { + Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); + } + + var result = (PageAsyncReadResult)Overlapped.Unpack(overlap).AsyncResult; + + if (result.freeBuffer1 != null) + { + allocator.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page); + result.freeBuffer1.Return(); + result.freeBuffer1 = null; + } + + if (result.handle != null) + { + result.handle.Signal(); + } + + Interlocked.MemoryBarrier(); + Overlapped.Free(overlap); + } + } +} + +