From 37b19d1069f634a24063c269f6e90f460b8a5687 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 10 Feb 2021 19:39:04 -0800 Subject: [PATCH 1/4] [C#] Support IO Completion Port based file IO in LocalStorageDevice Checkpoint/Recovery does not yet work as we no longer have separate threads polling for continuations. --- cs/benchmark/FasterYcsbBenchmark.cs | 2 +- cs/src/core/Allocator/AllocatorBase.cs | 6 +++ cs/src/core/Allocator/GenericAllocator.cs | 7 +++ cs/src/core/Device/Devices.cs | 5 +- cs/src/core/Device/IDevice.cs | 6 +++ cs/src/core/Device/LocalStorageDevice.cs | 59 ++++++++++++++++++----- cs/src/core/Device/StorageDeviceBase.cs | 6 +++ cs/src/core/Index/FASTER/FASTERImpl.cs | 1 + cs/src/core/Index/FASTER/FASTERThread.cs | 2 + cs/src/core/Utilities/Native32.cs | 15 ++++++ cs/test/SharedDirectoryTests.cs | 2 +- 11 files changed, 95 insertions(+), 16 deletions(-) diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index a9416de7b..7ca4195f1 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -109,7 +109,7 @@ public FASTER_YcsbBenchmark(int threadCount_, int numaStyle_, string distributio #endif var path = "D:\\data\\FasterYcsbBenchmark\\"; - device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true); + device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true, useIoCompletionPort: true); if (kSmallMemoryLog) store = new FasterKV diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index ee557ca74..ca7b6b824 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -915,6 +915,11 @@ protected virtual void TruncateUntilAddress(long toAddress) device.TruncateUntilAddress(toAddress); } + internal virtual bool TryComplete() + { + return device.TryComplete(); + } + /// /// Seal: make sure there are no longer any threads writing to the page /// Flush: send page to secondary store @@ -1514,6 +1519,7 @@ public void AsyncGetFromDisk(long fromLogical, { while (device.Throttle()) { + device.TryComplete(); Thread.Yield(); epoch.ProtectAndDrain(); } diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 8373a70e0..3153657b8 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -172,6 +172,13 @@ public override (int, int) GetRecordSize(ref Key key, ref Value value) return (recordSize, recordSize); } + internal override bool TryComplete() + { + var b1 = objectLogDevice.TryComplete(); + var b2 = base.TryComplete(); + return b1 || b2; + } + /// /// Dispose memory allocator /// diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs index ee91e5197..9d8dc9eb4 100644 --- a/cs/src/core/Device/Devices.cs +++ b/cs/src/core/Device/Devices.cs @@ -23,8 +23,9 @@ public static class Devices /// Delete files on close /// The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit /// Whether to recover device metadata from existing files + /// Whether we use IO completion port with polling /// Device instance - public static IDevice CreateLogDevice(string logPath, bool preallocateFile = false, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false) + public static IDevice CreateLogDevice(string logPath, bool preallocateFile = false, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false, bool useIoCompletionPort = false) { IDevice logDevice; @@ -36,7 +37,7 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = fal else #endif { - logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice); + logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice, useIoCompletionPort); } return logDevice; } diff --git a/cs/src/core/Device/IDevice.cs b/cs/src/core/Device/IDevice.cs index aef82c7f5..09c664704 100644 --- a/cs/src/core/Device/IDevice.cs +++ b/cs/src/core/Device/IDevice.cs @@ -65,6 +65,12 @@ public interface IDevice : IDisposable /// void Initialize(long segmentSize, LightEpoch epoch = null); + /// + /// Try complete async IO completions + /// + /// + bool TryComplete(); + /// /// Whether device should be throttled /// diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 5acf93b77..bf01a34b8 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -28,7 +28,7 @@ public unsafe class LocalStorageDevice : StorageDeviceBase private readonly bool deleteOnClose; private readonly bool disableFileBuffering; private readonly SafeConcurrentDictionary logHandles; - + private readonly bool useIoCompletionPort; private readonly ConcurrentQueue results; private static uint sectorSize = 0; @@ -37,6 +37,7 @@ public unsafe class LocalStorageDevice : StorageDeviceBase /// private int numPending = 0; + private IntPtr ioCompletionPort; /// /// Constructor @@ -47,13 +48,14 @@ public unsafe class LocalStorageDevice : StorageDeviceBase /// /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit /// Whether to recover device metadata from existing files + /// Whether we use IO completion port with polling public LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, long capacity = Devices.CAPACITY_UNSPECIFIED, - bool recoverDevice = false) - : this(filename, preallocateFile, deleteOnClose, disableFileBuffering, capacity, recoverDevice, initialLogFileHandles: null) + bool recoverDevice = false, bool useIoCompletionPort = false) + : this(filename, preallocateFile, deleteOnClose, disableFileBuffering, capacity, recoverDevice, null, useIoCompletionPort) { } @@ -78,13 +80,15 @@ void _callback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) /// The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit /// Whether to recover device metadata from existing files /// Optional set of preloaded safe file handles, which can speed up hydration of preexisting log file handles + /// Whether we use IO completion port with polling protected internal LocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, bool disableFileBuffering = true, long capacity = Devices.CAPACITY_UNSPECIFIED, bool recoverDevice = false, - IEnumerable> initialLogFileHandles = null) + IEnumerable> initialLogFileHandles = null, + bool useIoCompletionPort = true) : base(filename, GetSectorSize(filename), capacity) { #if NETSTANDARD @@ -94,6 +98,10 @@ protected internal LocalStorageDevice(string filename, } #endif + this.useIoCompletionPort = useIoCompletionPort; + if (useIoCompletionPort) + ioCompletionPort = Native32.CreateIoCompletionPort(new SafeFileHandle(new IntPtr(-1), false), IntPtr.Zero, UIntPtr.Zero, 96); + if (UsePrivileges && preallocateFile) Native32.EnableProcessPrivileges(); @@ -307,16 +315,38 @@ public override void Dispose() foreach (var logHandle in logHandles.Values) logHandle.Dispose(); + if (useIoCompletionPort) + new SafeFileHandle(ioCompletionPort, true).Dispose(); + while (results.TryDequeue(out var entry)) { Overlapped.Free(entry.nativeOverlapped); } } + /// + public override bool TryComplete() + { + if (!useIoCompletionPort) return true; + + bool succeeded = Native32.GetQueuedCompletionStatus(ioCompletionPort, out uint num_bytes, out IntPtr completionKey, out NativeOverlapped* nativeOverlapped, 0); + + if (nativeOverlapped != null) + { + int errorCode = succeeded ? 0 : 4; + _callback((uint)errorCode, num_bytes, nativeOverlapped); + return true; + } + else + { + return false; + } + } + /// /// Creates a SafeFileHandle for the specified segment. This can be used by derived classes to prepopulate logHandles in the constructor. /// - protected internal static SafeFileHandle CreateHandle(int segmentId, bool disableFileBuffering, bool deleteOnClose, bool preallocateFile, long segmentSize, string fileName) + protected internal static SafeFileHandle CreateHandle(int segmentId, bool disableFileBuffering, bool deleteOnClose, bool preallocateFile, long segmentSize, string fileName, IntPtr ioCompletionPort) { uint fileAccess = Native32.GENERIC_READ | Native32.GENERIC_WRITE; uint fileShare = unchecked(((uint)FileShare.ReadWrite & ~(uint)FileShare.Inheritable)); @@ -352,13 +382,18 @@ protected internal static SafeFileHandle CreateHandle(int segmentId, bool disabl if (preallocateFile && segmentSize != -1) SetFileSize(fileName, logHandle, segmentSize); - try - { - ThreadPool.BindHandle(logHandle); - } - catch (Exception e) + if (ioCompletionPort != IntPtr.Zero) + Native32.CreateIoCompletionPort(logHandle, ioCompletionPort, (UIntPtr)(long)logHandle.DangerousGetHandle(), 96); + else { - throw new FasterException("Error binding log handle for " + GetSegmentName(fileName, segmentId) + ": " + e.ToString()); + try + { + ThreadPool.BindHandle(logHandle); + } + catch (Exception e) + { + throw new FasterException("Error binding log handle for " + GetSegmentName(fileName, segmentId) + ": " + e.ToString()); + } } return logHandle; } @@ -394,7 +429,7 @@ protected SafeFileHandle GetOrAddHandle(int _segmentId) } private SafeFileHandle CreateHandle(int segmentId) - => CreateHandle(segmentId, this.disableFileBuffering, this.deleteOnClose, this.preallocateFile, this.segmentSize, this.FileName); + => CreateHandle(segmentId, this.disableFileBuffering, this.deleteOnClose, this.preallocateFile, this.segmentSize, this.FileName, this.ioCompletionPort); private static uint GetSectorSize(string filename) { diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index f14ddb798..348eb028e 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -283,5 +283,11 @@ protected void HandleCapacity(int segment) TruncateUntilSegmentAsync(newStartSegment, r => { }, null); } } + + /// + public virtual bool TryComplete() + { + return true; + } } } \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 448ac2540..2b40ec91e 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -1707,6 +1707,7 @@ private void BlockAllocate( { while ((logicalAddress = hlog.TryAllocate(recordSize)) == 0) { + hlog.TryComplete(); InternalRefresh(ctx, fasterSession); Thread.Yield(); } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index a471a03d1..0218caad5 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -283,6 +283,8 @@ internal void InternalCompletePendingRequests { + hlog.TryComplete(); + if (opCtx.readyResponses.Count == 0) return; while (opCtx.readyResponses.TryDequeue(out AsyncIOContext request)) diff --git a/cs/src/core/Utilities/Native32.cs b/cs/src/core/Utilities/Native32.cs index e8eaa3edc..86ef47d43 100644 --- a/cs/src/core/Utilities/Native32.cs +++ b/cs/src/core/Utilities/Native32.cs @@ -124,6 +124,21 @@ internal static extern bool WriteFile( [Out] out UInt32 lpNumberOfBytesWritten, [In] NativeOverlapped* lpOverlapped); + [DllImport("kernel32.dll", SetLastError = true)] + internal static extern IntPtr CreateIoCompletionPort( + [In] SafeFileHandle hFile, + IntPtr ExistingCompletionPort, + UIntPtr CompletionKey, + uint NumberOfConcurrentThreads); + + [DllImport("kernel32.dll", SetLastError = true)] + [return: MarshalAs(UnmanagedType.Bool)] + internal static extern unsafe bool GetQueuedCompletionStatus( + [In] IntPtr hCompletionPort, + [Out] out UInt32 lpNumberOfBytesWritten, + [Out] out IntPtr lpCompletionKey, + [Out] out NativeOverlapped* lpOverlapped, + [In] int dwMilliseconds); internal enum EMoveMethod : uint { diff --git a/cs/test/SharedDirectoryTests.cs b/cs/test/SharedDirectoryTests.cs index ca03ba016..22afb81bc 100644 --- a/cs/test/SharedDirectoryTests.cs +++ b/cs/test/SharedDirectoryTests.cs @@ -132,7 +132,7 @@ public void Initialize(string checkpointDirectory, string logDirectory, bool pop for (int i = 0; i < segmentIds.Count; i++) { var segmentId = segmentIds[i]; - var handle = LocalStorageDevice.CreateHandle(segmentId, disableFileBuffering: false, deleteOnClose: true, preallocateFile: false, segmentSize: -1, fileName: deviceFileName); + var handle = LocalStorageDevice.CreateHandle(segmentId, disableFileBuffering: false, deleteOnClose: true, preallocateFile: false, segmentSize: -1, fileName: deviceFileName, IntPtr.Zero); initialHandles[i] = new KeyValuePair(segmentId, handle); } } From 4dc2005246c7347d149b94d4a00b7090e7e66197 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 10 Feb 2021 19:40:04 -0800 Subject: [PATCH 2/4] updated default in benchmark --- cs/benchmark/FasterYcsbBenchmark.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 7ca4195f1..fbc75cb98 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -109,7 +109,7 @@ public FASTER_YcsbBenchmark(int threadCount_, int numaStyle_, string distributio #endif var path = "D:\\data\\FasterYcsbBenchmark\\"; - device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true, useIoCompletionPort: true); + device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true, useIoCompletionPort: false); if (kSmallMemoryLog) store = new FasterKV From 80413fe09fec8907cb922d04b0d50df3e7edc9a1 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 11 Feb 2021 13:17:56 -0800 Subject: [PATCH 3/4] Fine tuned performance --- cs/benchmark/FasterYcsbBenchmark.cs | 7 ++++ cs/src/core/Device/LocalStorageDevice.cs | 53 ++++++++++++++++++++++-- cs/src/core/Utilities/Native32.cs | 5 +-- 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index fbc75cb98..ce0f4acf4 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -108,6 +108,9 @@ public FASTER_YcsbBenchmark(int threadCount_, int numaStyle_, string distributio freq = Stopwatch.Frequency; #endif + // Increase throttle limit for higher concurrency runs + if (threadCount > 8) LocalStorageDevice.ThrottleLimit *= 2; + var path = "D:\\data\\FasterYcsbBenchmark\\"; device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true, useIoCompletionPort: false); @@ -303,6 +306,10 @@ public unsafe void Run() Console.WriteLine("Completed checkpoint"); } + // Flush and evict log from main memory + if (kSmallMemoryLog) + store.Log.FlushAndEvict(true); + // Uncomment below to dispose log from memory, use for 100% read workloads only // store.Log.DisposeFromMemory(); diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index bf01a34b8..b722adaa9 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -23,6 +23,17 @@ public unsafe class LocalStorageDevice : StorageDeviceBase /// are concurrently created. /// public static bool UsePrivileges = true; + + /// + /// Number of IO completion threads dedicated to this instance. Used only + /// if useIoCompletionPort is set to true. + /// + public static int NumCompletionThreads = 1; + + /// + /// Throttle I/O when this limit is reached + /// + public static int ThrottleLimit = 120; private readonly bool preallocateFile; private readonly bool deleteOnClose; @@ -68,7 +79,7 @@ void _callback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) } /// - public override bool Throttle() => numPending > 120; + public override bool Throttle() => numPending > ThrottleLimit; /// /// Constructor with more options for derived classes @@ -100,7 +111,18 @@ protected internal LocalStorageDevice(string filename, this.useIoCompletionPort = useIoCompletionPort; if (useIoCompletionPort) - ioCompletionPort = Native32.CreateIoCompletionPort(new SafeFileHandle(new IntPtr(-1), false), IntPtr.Zero, UIntPtr.Zero, 96); + { + ThreadPool.GetMaxThreads(out int workerThreads, out _); + ioCompletionPort = Native32.CreateIoCompletionPort(new SafeFileHandle(new IntPtr(-1), false), IntPtr.Zero, UIntPtr.Zero, (uint)(workerThreads + NumCompletionThreads)); + for (int i = 0; i < NumCompletionThreads; i++) + { + var thread = new Thread(() => new LocalStorageDeviceCompletionWorker().Start(ioCompletionPort, _callback)) + { + IsBackground = true + }; + thread.Start(); + } + } if (UsePrivileges && preallocateFile) Native32.EnableProcessPrivileges(); @@ -333,7 +355,7 @@ public override bool TryComplete() if (nativeOverlapped != null) { - int errorCode = succeeded ? 0 : 4; + int errorCode = succeeded ? 0 : Marshal.GetLastWin32Error(); _callback((uint)errorCode, num_bytes, nativeOverlapped); return true; } @@ -383,7 +405,10 @@ protected internal static SafeFileHandle CreateHandle(int segmentId, bool disabl SetFileSize(fileName, logHandle, segmentSize); if (ioCompletionPort != IntPtr.Zero) - Native32.CreateIoCompletionPort(logHandle, ioCompletionPort, (UIntPtr)(long)logHandle.DangerousGetHandle(), 96); + { + ThreadPool.GetMaxThreads(out int workerThreads, out _); + Native32.CreateIoCompletionPort(logHandle, ioCompletionPort, (UIntPtr)(long)logHandle.DangerousGetHandle(), (uint)(workerThreads + NumCompletionThreads)); + } else { try @@ -494,4 +519,24 @@ unsafe sealed class SimpleAsyncResult : IAsyncResult public bool IsCompleted => throw new NotImplementedException(); } + + unsafe sealed class LocalStorageDeviceCompletionWorker + { + public void Start(IntPtr ioCompletionPort, IOCompletionCallback _callback) + { + while (true) + { + Thread.Yield(); + bool succeeded = Native32.GetQueuedCompletionStatus(ioCompletionPort, out uint num_bytes, out IntPtr completionKey, out NativeOverlapped* nativeOverlapped, uint.MaxValue); + + if (nativeOverlapped != null) + { + int errorCode = succeeded ? 0 : Marshal.GetLastWin32Error(); + _callback((uint)errorCode, num_bytes, nativeOverlapped); + } + else + break; + } + } + } } diff --git a/cs/src/core/Utilities/Native32.cs b/cs/src/core/Utilities/Native32.cs index 86ef47d43..aa96f4578 100644 --- a/cs/src/core/Utilities/Native32.cs +++ b/cs/src/core/Utilities/Native32.cs @@ -132,13 +132,12 @@ internal static extern IntPtr CreateIoCompletionPort( uint NumberOfConcurrentThreads); [DllImport("kernel32.dll", SetLastError = true)] - [return: MarshalAs(UnmanagedType.Bool)] - internal static extern unsafe bool GetQueuedCompletionStatus( + internal static extern bool GetQueuedCompletionStatus( [In] IntPtr hCompletionPort, [Out] out UInt32 lpNumberOfBytesWritten, [Out] out IntPtr lpCompletionKey, [Out] out NativeOverlapped* lpOverlapped, - [In] int dwMilliseconds); + [In] UInt32 dwMilliseconds); internal enum EMoveMethod : uint { From 35760031a0d88b856ee6b7c9a8da60c1602137cc Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 17 Feb 2021 18:34:59 -0800 Subject: [PATCH 4/4] Updated throttle limit to apply per device instance --- cs/benchmark/FasterYcsbBenchmark.cs | 6 +++--- cs/src/core/Device/IDevice.cs | 8 +++++++- cs/src/core/Device/LocalStorageDevice.cs | 9 ++------- cs/src/core/Device/ManagedLocalStorageDevice.cs | 3 ++- cs/src/core/Device/StorageDeviceBase.cs | 5 +++++ 5 files changed, 19 insertions(+), 12 deletions(-) diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index ce0f4acf4..082aa8be8 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -108,12 +108,12 @@ public FASTER_YcsbBenchmark(int threadCount_, int numaStyle_, string distributio freq = Stopwatch.Frequency; #endif - // Increase throttle limit for higher concurrency runs - if (threadCount > 8) LocalStorageDevice.ThrottleLimit *= 2; - var path = "D:\\data\\FasterYcsbBenchmark\\"; device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true, useIoCompletionPort: false); + // Increase throttle limit for higher concurrency runs + if (threadCount > 8) device.ThrottleLimit *= 2; + if (kSmallMemoryLog) store = new FasterKV (kMaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true, PageSizeBits = 22, SegmentSizeBits = 26, MemorySizeBits = 26 }, new CheckpointSettings { CheckPointType = CheckpointType.FoldOver, CheckpointDir = path }); diff --git a/cs/src/core/Device/IDevice.cs b/cs/src/core/Device/IDevice.cs index 09c664704..2ea4df826 100644 --- a/cs/src/core/Device/IDevice.cs +++ b/cs/src/core/Device/IDevice.cs @@ -52,6 +52,12 @@ public interface IDevice : IDisposable /// int EndSegment { get; } + /// + /// Throttle limit (max number of pending I/Os) for this device instance. Device needs + /// to implement Throttle() in order to use this limit. + /// + int ThrottleLimit { get; set; } + /// /// Initialize device. This function is used to pass optional information that may only be known after /// FASTER initialization (whose constructor takes in IDevice upfront). Implementation are free to ignore @@ -72,7 +78,7 @@ public interface IDevice : IDisposable bool TryComplete(); /// - /// Whether device should be throttled + /// Whether device should be throttled at this instant (i.e., caller should stop issuing new I/Os) /// /// bool Throttle(); diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index b722adaa9..05feaab63 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -23,18 +23,13 @@ public unsafe class LocalStorageDevice : StorageDeviceBase /// are concurrently created. /// public static bool UsePrivileges = true; - + /// /// Number of IO completion threads dedicated to this instance. Used only /// if useIoCompletionPort is set to true. /// public static int NumCompletionThreads = 1; - /// - /// Throttle I/O when this limit is reached - /// - public static int ThrottleLimit = 120; - private readonly bool preallocateFile; private readonly bool deleteOnClose; private readonly bool disableFileBuffering; @@ -108,7 +103,7 @@ protected internal LocalStorageDevice(string filename, throw new FasterException("Cannot use LocalStorageDevice from non-Windows OS platform, use ManagedLocalStorageDevice instead."); } #endif - + ThrottleLimit = 120; this.useIoCompletionPort = useIoCompletionPort; if (useIoCompletionPort) { diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs index 2d8f2bb38..1ba809171 100644 --- a/cs/src/core/Device/ManagedLocalStorageDevice.cs +++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs @@ -38,6 +38,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, : base(filename, GetSectorSize(filename), capacity) { pool = new SectorAlignedBufferPool(1, 1); + ThrottleLimit = 120; string path = new FileInfo(filename).Directory.FullName; if (!Directory.Exists(path)) @@ -51,7 +52,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, } /// - public override bool Throttle() => numPending > 120; + public override bool Throttle() => numPending > ThrottleLimit; private void RecoverFiles() { diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs index 348eb028e..37605da47 100644 --- a/cs/src/core/Device/StorageDeviceBase.cs +++ b/cs/src/core/Device/StorageDeviceBase.cs @@ -51,6 +51,11 @@ public abstract class StorageDeviceBase : IDevice private int segmentSizeBits; private ulong segmentSizeMask; + /// + /// Throttle limit (max number of pending I/Os) for this device instance + /// + public int ThrottleLimit { get; set; } = int.MaxValue; + /// /// Instance of the epoch protection framework in the current system. /// A device may have internal in-memory data structure that requires epoch protection under concurrent access.