diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index a9416de7b..082aa8be8 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -109,7 +109,10 @@ public FASTER_YcsbBenchmark(int threadCount_, int numaStyle_, string distributio #endif var path = "D:\\data\\FasterYcsbBenchmark\\"; - device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true); + device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true, useIoCompletionPort: false); + + // Increase throttle limit for higher concurrency runs + if (threadCount > 8) device.ThrottleLimit *= 2; if (kSmallMemoryLog) store = new FasterKV @@ -303,6 +306,10 @@ public unsafe void Run() Console.WriteLine("Completed checkpoint"); } + // Flush and evict log from main memory + if (kSmallMemoryLog) + store.Log.FlushAndEvict(true); + // Uncomment below to dispose log from memory, use for 100% read workloads only // store.Log.DisposeFromMemory(); diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index ee557ca74..ca7b6b824 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -915,6 +915,11 @@ protected virtual void TruncateUntilAddress(long toAddress) device.TruncateUntilAddress(toAddress); } + internal virtual bool TryComplete() + { + return device.TryComplete(); + } + /// /// Seal: make sure there are no longer any threads writing to the page /// Flush: send page to secondary store @@ -1514,6 +1519,7 @@ public void AsyncGetFromDisk(long fromLogical, { while (device.Throttle()) { + device.TryComplete(); Thread.Yield(); epoch.ProtectAndDrain(); } diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 8373a70e0..3153657b8 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -172,6 +172,13 @@ public override (int, int) GetRecordSize(ref Key key, ref Value value) return (recordSize, recordSize); } + internal override bool TryComplete() + { + var b1 = objectLogDevice.TryComplete(); + var b2 = base.TryComplete(); + return b1 || b2; + } + /// /// Dispose memory allocator /// diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs index ee91e5197..9d8dc9eb4 100644 --- a/cs/src/core/Device/Devices.cs +++ b/cs/src/core/Device/Devices.cs @@ -23,8 +23,9 @@ public static class Devices /// Delete files on close /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit /// Whether to recover device metadata from existing files + /// Whether we use IO completion port with polling /// Device instance - public static IDevice CreateLogDevice(string logPath, bool preallocateFile = false, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false) + public static IDevice CreateLogDevice(string logPath, bool preallocateFile = false, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false, bool useIoCompletionPort = false) { IDevice logDevice; @@ -36,7 +37,7 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = fal else #endif { - logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice); + logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice, useIoCompletionPort); } return logDevice; } diff --git a/cs/src/core/Device/IDevice.cs b/cs/src/core/Device/IDevice.cs index aef82c7f5..2ea4df826 100644 --- a/cs/src/core/Device/IDevice.cs +++ b/cs/src/core/Device/IDevice.cs @@ -52,6 +52,12 @@ public interface IDevice : IDisposable /// int EndSegment { get; } + /// + /// Throttle limit (max number of pending I/Os) for this device instance. Device needs + /// to implement Throttle() in order to use this limit. + /// + int ThrottleLimit { get; set; } + /// /// Initialize device. This function is used to pass optional information that may only be known after /// FASTER initialization (whose constructor takes in IDevice upfront). Implementation are free to ignore @@ -66,7 +72,13 @@ public interface IDevice : IDisposable void Initialize(long segmentSize, LightEpoch epoch = null); /// - /// Whether device should be throttled + /// Try complete async IO completions + /// + /// + bool TryComplete(); + + /// + /// Whether device should be throttled at this instant (i.e., caller should stop issuing new I/Os) /// /// bool Throttle(); diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 5acf93b77..05feaab63 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -24,11 +24,17 @@ public unsafe class LocalStorageDevice : StorageDeviceBase /// public static bool UsePrivileges = true; + /// + /// Number of IO completion threads dedicated to this instance. Used only + /// if useIoCompletionPort is set to true. + /// + public static int NumCompletionThreads = 1; + private readonly bool preallocateFile; private readonly bool deleteOnClose; private readonly bool disableFileBuffering; private readonly SafeConcurrentDictionary logHandles; - + private readonly bool useIoCompletionPort; private readonly ConcurrentQueue results; private static uint sectorSize = 0; @@ -37,6 +43,7 @@ public unsafe class LocalStorageDevice : StorageDeviceBase /// private int numPending = 0; + private IntPtr ioCompletionPort; /// /// Constructor @@ -47,13 +54,14 @@ public unsafe class LocalStorageDevice : StorageDeviceBase /// /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit /// Whether to recover device metadata from existing files + /// Whether we use IO completion port with polling public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, long capacity = Devices.CAPACITY_UNSPECIFIED, - bool recoverDevice = false) - : this(filename, preallocateFile, deleteOnClose, disableFileBuffering, capacity, recoverDevice, initialLogFileHandles: null) + bool recoverDevice = false, bool useIoCompletionPort = false) + : this(filename, preallocateFile, deleteOnClose, disableFileBuffering, capacity, recoverDevice, null, useIoCompletionPort) { } @@ -66,7 +74,7 @@ void _callback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) } /// - public override bool Throttle() => numPending > 120; + public override bool Throttle() => numPending > ThrottleLimit; /// /// Constructor with more options for derived classes @@ -78,13 +86,15 @@ void _callback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit /// Whether to recover device metadata from existing files /// Optional set of preloaded safe file handles, which can speed up hydration of preexisting log file handles + /// Whether we use IO completion port with polling protected internal LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, long capacity = Devices.CAPACITY_UNSPECIFIED, bool recoverDevice = false, - IEnumerable> initialLogFileHandles = null) + IEnumerable> initialLogFileHandles = null, + bool useIoCompletionPort = true) : base(filename, GetSectorSize(filename), capacity) { #if NETSTANDARD @@ -93,6 +103,21 @@ protected internal LocalStorageDevice(string filename, throw new FasterException("Cannot use LocalStorageDevice from non-Windows OS platform, use ManagedLocalStorageDevice instead."); } #endif + ThrottleLimit = 120; + this.useIoCompletionPort = useIoCompletionPort; + if (useIoCompletionPort) + { + ThreadPool.GetMaxThreads(out int workerThreads, out _); + ioCompletionPort = Native32.CreateIoCompletionPort(new SafeFileHandle(new IntPtr(-1), false), IntPtr.Zero, UIntPtr.Zero, (uint)(workerThreads + NumCompletionThreads)); + for (int i = 0; i < NumCompletionThreads; i++) + { + var thread = new Thread(() => new LocalStorageDeviceCompletionWorker().Start(ioCompletionPort, _callback)) + { + IsBackground = true + }; + thread.Start(); + } + } if (UsePrivileges && preallocateFile) Native32.EnableProcessPrivileges(); @@ -307,16 +332,38 @@ public override void Dispose() foreach (var logHandle in logHandles.Values) logHandle.Dispose(); + if (useIoCompletionPort) + new SafeFileHandle(ioCompletionPort, true).Dispose(); + while (results.TryDequeue(out var entry)) { Overlapped.Free(entry.nativeOverlapped); } } + /// + public override bool TryComplete() + { + if (!useIoCompletionPort) return true; + + bool succeeded = Native32.GetQueuedCompletionStatus(ioCompletionPort, out uint num_bytes, out IntPtr completionKey, out NativeOverlapped* nativeOverlapped, 0); + + if (nativeOverlapped != null) + { + int errorCode = succeeded ? 0 : Marshal.GetLastWin32Error(); + _callback((uint)errorCode, num_bytes, nativeOverlapped); + return true; + } + else + { + return false; + } + } + /// /// Creates a SafeFileHandle for the specified segment. This can be used by derived classes to prepopulate logHandles in the constructor. /// - protected internal static SafeFileHandle CreateHandle(int segmentId, bool disableFileBuffering, bool deleteOnClose, bool preallocateFile, long segmentSize, string fileName) + protected internal static SafeFileHandle CreateHandle(int segmentId, bool disableFileBuffering, bool deleteOnClose, bool preallocateFile, long segmentSize, string fileName, IntPtr ioCompletionPort) { uint fileAccess = Native32.GENERIC_READ | Native32.GENERIC_WRITE; uint fileShare = unchecked(((uint)FileShare.ReadWrite & ~(uint)FileShare.Inheritable)); @@ -352,13 +399,21 @@ protected internal static SafeFileHandle CreateHandle(int segmentId, bool disabl if (preallocateFile && segmentSize != -1) SetFileSize(fileName, logHandle, segmentSize); - try + if (ioCompletionPort != IntPtr.Zero) { - ThreadPool.BindHandle(logHandle); + ThreadPool.GetMaxThreads(out int workerThreads, out _); + Native32.CreateIoCompletionPort(logHandle, ioCompletionPort, (UIntPtr)(long)logHandle.DangerousGetHandle(), (uint)(workerThreads + NumCompletionThreads)); } - catch (Exception e) + else { - throw new FasterException("Error binding log handle for " + GetSegmentName(fileName, segmentId) + ": " + e.ToString()); + try + { + ThreadPool.BindHandle(logHandle); + } + catch (Exception e) + { + throw new FasterException("Error binding log handle for " + GetSegmentName(fileName, segmentId) + ": " + e.ToString()); + } } return logHandle; } @@ -394,7 +449,7 @@ protected SafeFileHandle GetOrAddHandle(int _segmentId) } private SafeFileHandle CreateHandle(int segmentId) - => CreateHandle(segmentId, this.disableFileBuffering, this.deleteOnClose, this.preallocateFile, this.segmentSize, this.FileName); + => CreateHandle(segmentId, this.disableFileBuffering, this.deleteOnClose, this.preallocateFile, this.segmentSize, this.FileName, this.ioCompletionPort); private static uint GetSectorSize(string filename) { @@ -459,4 +514,24 @@ unsafe sealed class SimpleAsyncResult : IAsyncResult public bool IsCompleted => throw new NotImplementedException(); } + + unsafe sealed class LocalStorageDeviceCompletionWorker + { + public void Start(IntPtr ioCompletionPort, IOCompletionCallback _callback) + { + while (true) + { + Thread.Yield(); + bool succeeded = Native32.GetQueuedCompletionStatus(ioCompletionPort, out uint num_bytes, out IntPtr completionKey, out NativeOverlapped* nativeOverlapped, uint.MaxValue); + + if (nativeOverlapped != null) + { + int errorCode = succeeded ? 0 : Marshal.GetLastWin32Error(); + _callback((uint)errorCode, num_bytes, nativeOverlapped); + } + else + break; + } + } + } } diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 2d8f2bb38..1ba809171 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -38,6 +38,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, : base(filename, GetSectorSize(filename), capacity) { pool = new SectorAlignedBufferPool(1, 1); + ThrottleLimit = 120; string path = new FileInfo(filename).Directory.FullName; if (!Directory.Exists(path)) @@ -51,7 +52,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, } /// - public override bool Throttle() => numPending > 120; + public override bool Throttle() => numPending > ThrottleLimit; private void RecoverFiles() { diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index f14ddb798..37605da47 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -51,6 +51,11 @@ public abstract class StorageDeviceBase : IDevice private int segmentSizeBits; private ulong segmentSizeMask; + /// + /// Throttle limit (max number of pending I/Os) for this device instance + /// + public int ThrottleLimit { get; set; } = int.MaxValue; + /// /// Instance of the epoch protection framework in the current system. /// A device may have internal in-memory data structure that requires epoch protection under concurrent access. @@ -283,5 +288,11 @@ protected void HandleCapacity(int segment) TruncateUntilSegmentAsync(newStartSegment, r => { }, null); } } + + /// + public virtual bool TryComplete() + { + return true; + } } } \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 448ac2540..2b40ec91e 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -1707,6 +1707,7 @@ private void BlockAllocate( { while ((logicalAddress = hlog.TryAllocate(recordSize)) == 0) { + hlog.TryComplete(); InternalRefresh(ctx, fasterSession); Thread.Yield(); } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index a471a03d1..0218caad5 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -283,6 +283,8 @@ internal void InternalCompletePendingRequests { + hlog.TryComplete(); + if (opCtx.readyResponses.Count == 0) return; while (opCtx.readyResponses.TryDequeue(out AsyncIOContext request)) diff --git a/cs/src/core/Utilities/Native32.cs b/cs/src/core/Utilities/Native32.cs index e8eaa3edc..aa96f4578 100644 --- a/cs/src/core/Utilities/Native32.cs +++ b/cs/src/core/Utilities/Native32.cs @@ -124,6 +124,20 @@ internal static extern bool WriteFile( [Out] out UInt32 lpNumberOfBytesWritten, [In] NativeOverlapped* lpOverlapped); + [DllImport("kernel32.dll", SetLastError = true)] + internal static extern IntPtr CreateIoCompletionPort( + [In] SafeFileHandle hFile, + IntPtr ExistingCompletionPort, + UIntPtr CompletionKey, + uint NumberOfConcurrentThreads); + + [DllImport("kernel32.dll", SetLastError = true)] + internal static extern bool GetQueuedCompletionStatus( + [In] IntPtr hCompletionPort, + [Out] out UInt32 lpNumberOfBytesWritten, + [Out] out IntPtr lpCompletionKey, + [Out] out NativeOverlapped* lpOverlapped, + [In] UInt32 dwMilliseconds); internal enum EMoveMethod : uint { diff --git a/cs/test/SharedDirectoryTests.cs b/cs/test/SharedDirectoryTests.cs index ca03ba016..22afb81bc 100644 --- a/cs/test/SharedDirectoryTests.cs +++ b/cs/test/SharedDirectoryTests.cs @@ -132,7 +132,7 @@ public void Initialize(string checkpointDirectory, string logDirectory, bool pop for (int i = 0; i < segmentIds.Count; i++) { var segmentId = segmentIds[i]; - var handle = LocalStorageDevice.CreateHandle(segmentId, disableFileBuffering: false, deleteOnClose: true, preallocateFile: false, segmentSize: -1, fileName: deviceFileName); + var handle = LocalStorageDevice.CreateHandle(segmentId, disableFileBuffering: false, deleteOnClose: true, preallocateFile: false, segmentSize: -1, fileName: deviceFileName, IntPtr.Zero); initialHandles[i] = new KeyValuePair(segmentId, handle); } }