Skip to content

Commit

Permalink
[C#] Support IOCP-based operations in LocalStorageDevice (#402)
Browse files Browse the repository at this point in the history
* [C#] Support IO Completion Port based file IO in LocalStorageDevice

Checkpoint/Recovery does not yet work as we no longer have separate threads polling for continuations.

* updated default in benchmark

* Fine tuned performance

* Updated throttle limit to apply per device instance
  • Loading branch information
badrishc authored Feb 20, 2021
1 parent 20fdc91 commit c8bf594
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 17 deletions.
9 changes: 8 additions & 1 deletion cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ public FASTER_YcsbBenchmark(int threadCount_, int numaStyle_, string distributio
#endif

var path = "D:\\data\\FasterYcsbBenchmark\\";
device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true);
device = Devices.CreateLogDevice(path + "hlog", preallocateFile: true, useIoCompletionPort: false);

// Increase throttle limit for higher concurrency runs
if (threadCount > 8) device.ThrottleLimit *= 2;

if (kSmallMemoryLog)
store = new FasterKV<Key, Value>
Expand Down Expand Up @@ -303,6 +306,10 @@ public unsafe void Run()
Console.WriteLine("Completed checkpoint");
}

// Flush and evict log from main memory
if (kSmallMemoryLog)
store.Log.FlushAndEvict(true);

// Uncomment below to dispose log from memory, use for 100% read workloads only
// store.Log.DisposeFromMemory();

Expand Down
6 changes: 6 additions & 0 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,11 @@ protected virtual void TruncateUntilAddress(long toAddress)
device.TruncateUntilAddress(toAddress);
}

internal virtual bool TryComplete()
{
return device.TryComplete();
}

/// <summary>
/// Seal: make sure there are no longer any threads writing to the page
/// Flush: send page to secondary store
Expand Down Expand Up @@ -1514,6 +1519,7 @@ public void AsyncGetFromDisk(long fromLogical,
{
while (device.Throttle())
{
device.TryComplete();
Thread.Yield();
epoch.ProtectAndDrain();
}
Expand Down
7 changes: 7 additions & 0 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ public override (int, int) GetRecordSize(ref Key key, ref Value value)
return (recordSize, recordSize);
}

internal override bool TryComplete()
{
var b1 = objectLogDevice.TryComplete();
var b2 = base.TryComplete();
return b1 || b2;
}

/// <summary>
/// Dispose memory allocator
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions cs/src/core/Device/Devices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public static class Devices
/// <param name="deleteOnClose">Delete files on close</param>
/// <param name="capacity">The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit</param>
/// <param name="recoverDevice">Whether to recover device metadata from existing files</param>
/// <param name="useIoCompletionPort">Whether we use IO completion port with polling</param>
/// <returns>Device instance</returns>
public static IDevice CreateLogDevice(string logPath, bool preallocateFile = false, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false)
public static IDevice CreateLogDevice(string logPath, bool preallocateFile = false, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false, bool useIoCompletionPort = false)
{
IDevice logDevice;

Expand All @@ -36,7 +37,7 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = fal
else
#endif
{
logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice);
logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice, useIoCompletionPort);
}
return logDevice;
}
Expand Down
14 changes: 13 additions & 1 deletion cs/src/core/Device/IDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public interface IDevice : IDisposable
/// </summary>
int EndSegment { get; }

/// <summary>
/// Throttle limit (max number of pending I/Os) for this device instance. Device needs
/// to implement Throttle() in order to use this limit.
/// </summary>
int ThrottleLimit { get; set; }

/// <summary>
/// Initialize device. This function is used to pass optional information that may only be known after
/// FASTER initialization (whose constructor takes in IDevice upfront). Implementation are free to ignore
Expand All @@ -66,7 +72,13 @@ public interface IDevice : IDisposable
void Initialize(long segmentSize, LightEpoch epoch = null);

/// <summary>
/// Whether device should be throttled
/// Try complete async IO completions
/// </summary>
/// <returns></returns>
bool TryComplete();

/// <summary>
/// Whether device should be throttled at this instant (i.e., caller should stop issuing new I/Os)
/// </summary>
/// <returns></returns>
bool Throttle();
Expand Down
97 changes: 86 additions & 11 deletions cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ public unsafe class LocalStorageDevice : StorageDeviceBase
/// </summary>
public static bool UsePrivileges = true;

/// <summary>
/// Number of IO completion threads dedicated to this instance. Used only
/// if useIoCompletionPort is set to true.
/// </summary>
public static int NumCompletionThreads = 1;

private readonly bool preallocateFile;
private readonly bool deleteOnClose;
private readonly bool disableFileBuffering;
private readonly SafeConcurrentDictionary<int, SafeFileHandle> logHandles;

private readonly bool useIoCompletionPort;
private readonly ConcurrentQueue<SimpleAsyncResult> results;
private static uint sectorSize = 0;

Expand All @@ -37,6 +43,7 @@ public unsafe class LocalStorageDevice : StorageDeviceBase
/// </summary>
private int numPending = 0;

private IntPtr ioCompletionPort;

/// <summary>
/// Constructor
Expand All @@ -47,13 +54,14 @@ public unsafe class LocalStorageDevice : StorageDeviceBase
/// <param name="disableFileBuffering"></param>
/// <param name="capacity">The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit </param>
/// <param name="recoverDevice">Whether to recover device metadata from existing files</param>
/// <param name="useIoCompletionPort">Whether we use IO completion port with polling</param>
public LocalStorageDevice(string filename,
bool preallocateFile = false,
bool deleteOnClose = false,
bool disableFileBuffering = true,
long capacity = Devices.CAPACITY_UNSPECIFIED,
bool recoverDevice = false)
: this(filename, preallocateFile, deleteOnClose, disableFileBuffering, capacity, recoverDevice, initialLogFileHandles: null)
bool recoverDevice = false, bool useIoCompletionPort = false)
: this(filename, preallocateFile, deleteOnClose, disableFileBuffering, capacity, recoverDevice, null, useIoCompletionPort)
{
}

Expand All @@ -66,7 +74,7 @@ void _callback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP)
}

/// <inheritdoc />
public override bool Throttle() => numPending > 120;
public override bool Throttle() => numPending > ThrottleLimit;

/// <summary>
/// Constructor with more options for derived classes
Expand All @@ -78,13 +86,15 @@ void _callback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP)
/// <param name="capacity">The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit </param>
/// <param name="recoverDevice">Whether to recover device metadata from existing files</param>
/// <param name="initialLogFileHandles">Optional set of preloaded safe file handles, which can speed up hydration of preexisting log file handles</param>
/// <param name="useIoCompletionPort">Whether we use IO completion port with polling</param>
protected internal LocalStorageDevice(string filename,
bool preallocateFile = false,
bool deleteOnClose = false,
bool disableFileBuffering = true,
long capacity = Devices.CAPACITY_UNSPECIFIED,
bool recoverDevice = false,
IEnumerable<KeyValuePair<int, SafeFileHandle>> initialLogFileHandles = null)
IEnumerable<KeyValuePair<int, SafeFileHandle>> initialLogFileHandles = null,
bool useIoCompletionPort = true)
: base(filename, GetSectorSize(filename), capacity)
{
#if NETSTANDARD
Expand All @@ -93,6 +103,21 @@ protected internal LocalStorageDevice(string filename,
throw new FasterException("Cannot use LocalStorageDevice from non-Windows OS platform, use ManagedLocalStorageDevice instead.");
}
#endif
ThrottleLimit = 120;
this.useIoCompletionPort = useIoCompletionPort;
if (useIoCompletionPort)
{
ThreadPool.GetMaxThreads(out int workerThreads, out _);
ioCompletionPort = Native32.CreateIoCompletionPort(new SafeFileHandle(new IntPtr(-1), false), IntPtr.Zero, UIntPtr.Zero, (uint)(workerThreads + NumCompletionThreads));
for (int i = 0; i < NumCompletionThreads; i++)
{
var thread = new Thread(() => new LocalStorageDeviceCompletionWorker().Start(ioCompletionPort, _callback))
{
IsBackground = true
};
thread.Start();
}
}

if (UsePrivileges && preallocateFile)
Native32.EnableProcessPrivileges();
Expand Down Expand Up @@ -307,16 +332,38 @@ public override void Dispose()
foreach (var logHandle in logHandles.Values)
logHandle.Dispose();

if (useIoCompletionPort)
new SafeFileHandle(ioCompletionPort, true).Dispose();

while (results.TryDequeue(out var entry))
{
Overlapped.Free(entry.nativeOverlapped);
}
}

/// <inheritdoc/>
public override bool TryComplete()
{
if (!useIoCompletionPort) return true;

bool succeeded = Native32.GetQueuedCompletionStatus(ioCompletionPort, out uint num_bytes, out IntPtr completionKey, out NativeOverlapped* nativeOverlapped, 0);

if (nativeOverlapped != null)
{
int errorCode = succeeded ? 0 : Marshal.GetLastWin32Error();
_callback((uint)errorCode, num_bytes, nativeOverlapped);
return true;
}
else
{
return false;
}
}

/// <summary>
/// Creates a SafeFileHandle for the specified segment. This can be used by derived classes to prepopulate logHandles in the constructor.
/// </summary>
protected internal static SafeFileHandle CreateHandle(int segmentId, bool disableFileBuffering, bool deleteOnClose, bool preallocateFile, long segmentSize, string fileName)
protected internal static SafeFileHandle CreateHandle(int segmentId, bool disableFileBuffering, bool deleteOnClose, bool preallocateFile, long segmentSize, string fileName, IntPtr ioCompletionPort)
{
uint fileAccess = Native32.GENERIC_READ | Native32.GENERIC_WRITE;
uint fileShare = unchecked(((uint)FileShare.ReadWrite & ~(uint)FileShare.Inheritable));
Expand Down Expand Up @@ -352,13 +399,21 @@ protected internal static SafeFileHandle CreateHandle(int segmentId, bool disabl
if (preallocateFile && segmentSize != -1)
SetFileSize(fileName, logHandle, segmentSize);

try
if (ioCompletionPort != IntPtr.Zero)
{
ThreadPool.BindHandle(logHandle);
ThreadPool.GetMaxThreads(out int workerThreads, out _);
Native32.CreateIoCompletionPort(logHandle, ioCompletionPort, (UIntPtr)(long)logHandle.DangerousGetHandle(), (uint)(workerThreads + NumCompletionThreads));
}
catch (Exception e)
else
{
throw new FasterException("Error binding log handle for " + GetSegmentName(fileName, segmentId) + ": " + e.ToString());
try
{
ThreadPool.BindHandle(logHandle);
}
catch (Exception e)
{
throw new FasterException("Error binding log handle for " + GetSegmentName(fileName, segmentId) + ": " + e.ToString());
}
}
return logHandle;
}
Expand Down Expand Up @@ -394,7 +449,7 @@ protected SafeFileHandle GetOrAddHandle(int _segmentId)
}

private SafeFileHandle CreateHandle(int segmentId)
=> CreateHandle(segmentId, this.disableFileBuffering, this.deleteOnClose, this.preallocateFile, this.segmentSize, this.FileName);
=> CreateHandle(segmentId, this.disableFileBuffering, this.deleteOnClose, this.preallocateFile, this.segmentSize, this.FileName, this.ioCompletionPort);

private static uint GetSectorSize(string filename)
{
Expand Down Expand Up @@ -459,4 +514,24 @@ unsafe sealed class SimpleAsyncResult : IAsyncResult

public bool IsCompleted => throw new NotImplementedException();
}

unsafe sealed class LocalStorageDeviceCompletionWorker
{
public void Start(IntPtr ioCompletionPort, IOCompletionCallback _callback)
{
while (true)
{
Thread.Yield();
bool succeeded = Native32.GetQueuedCompletionStatus(ioCompletionPort, out uint num_bytes, out IntPtr completionKey, out NativeOverlapped* nativeOverlapped, uint.MaxValue);

if (nativeOverlapped != null)
{
int errorCode = succeeded ? 0 : Marshal.GetLastWin32Error();
_callback((uint)errorCode, num_bytes, nativeOverlapped);
}
else
break;
}
}
}
}
3 changes: 2 additions & 1 deletion cs/src/core/Device/ManagedLocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false,
: base(filename, GetSectorSize(filename), capacity)
{
pool = new SectorAlignedBufferPool(1, 1);
ThrottleLimit = 120;

string path = new FileInfo(filename).Directory.FullName;
if (!Directory.Exists(path))
Expand All @@ -51,7 +52,7 @@ public ManagedLocalStorageDevice(string filename, bool preallocateFile = false,
}

/// <inheritdoc />
public override bool Throttle() => numPending > 120;
public override bool Throttle() => numPending > ThrottleLimit;

private void RecoverFiles()
{
Expand Down
11 changes: 11 additions & 0 deletions cs/src/core/Device/StorageDeviceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public abstract class StorageDeviceBase : IDevice
private int segmentSizeBits;
private ulong segmentSizeMask;

/// <summary>
/// Throttle limit (max number of pending I/Os) for this device instance
/// </summary>
public int ThrottleLimit { get; set; } = int.MaxValue;

/// <summary>
/// Instance of the epoch protection framework in the current system.
/// A device may have internal in-memory data structure that requires epoch protection under concurrent access.
Expand Down Expand Up @@ -283,5 +288,11 @@ protected void HandleCapacity(int segment)
TruncateUntilSegmentAsync(newStartSegment, r => { }, null);
}
}

/// <inheritdoc/>
public virtual bool TryComplete()
{
return true;
}
}
}
1 change: 1 addition & 0 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1707,6 +1707,7 @@ private void BlockAllocate<Input, Output, Context, FasterSession>(
{
while ((logicalAddress = hlog.TryAllocate(recordSize)) == 0)
{
hlog.TryComplete();
InternalRefresh(ctx, fasterSession);
Thread.Yield();
}
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ internal void InternalCompletePendingRequests<Input, Output, Context, FasterSess
FasterSession fasterSession)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
hlog.TryComplete();

if (opCtx.readyResponses.Count == 0) return;

while (opCtx.readyResponses.TryDequeue(out AsyncIOContext<Key, Value> request))
Expand Down
14 changes: 14 additions & 0 deletions cs/src/core/Utilities/Native32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ internal static extern bool WriteFile(
[Out] out UInt32 lpNumberOfBytesWritten,
[In] NativeOverlapped* lpOverlapped);

[DllImport("kernel32.dll", SetLastError = true)]
internal static extern IntPtr CreateIoCompletionPort(
[In] SafeFileHandle hFile,
IntPtr ExistingCompletionPort,
UIntPtr CompletionKey,
uint NumberOfConcurrentThreads);

[DllImport("kernel32.dll", SetLastError = true)]
internal static extern bool GetQueuedCompletionStatus(
[In] IntPtr hCompletionPort,
[Out] out UInt32 lpNumberOfBytesWritten,
[Out] out IntPtr lpCompletionKey,
[Out] out NativeOverlapped* lpOverlapped,
[In] UInt32 dwMilliseconds);

internal enum EMoveMethod : uint
{
Expand Down
2 changes: 1 addition & 1 deletion cs/test/SharedDirectoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void Initialize(string checkpointDirectory, string logDirectory, bool pop
for (int i = 0; i < segmentIds.Count; i++)
{
var segmentId = segmentIds[i];
var handle = LocalStorageDevice.CreateHandle(segmentId, disableFileBuffering: false, deleteOnClose: true, preallocateFile: false, segmentSize: -1, fileName: deviceFileName);
var handle = LocalStorageDevice.CreateHandle(segmentId, disableFileBuffering: false, deleteOnClose: true, preallocateFile: false, segmentSize: -1, fileName: deviceFileName, IntPtr.Zero);
initialHandles[i] = new KeyValuePair<int, SafeFileHandle>(segmentId, handle);
}
}
Expand Down

0 comments on commit c8bf594

Please sign in to comment.