Skip to content

Commit

Permalink
Initial checkin of async CompletePending - no tests yet.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Aug 3, 2019
1 parent 0353678 commit 571333d
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 57 deletions.
4 changes: 2 additions & 2 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ private void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, NativeOverl
{
// The keys are same, so I/O is complete
// ctx.record = result.record;
ctx.callbackQueue.Add(ctx);
ctx.callbackQueue.Enqueue(ctx);
}
else
{
Expand All @@ -1508,7 +1508,7 @@ private void AsyncGetFromDiskCallback(uint errorCode, uint numBytes, NativeOverl
}
else
{
ctx.callbackQueue.Add(ctx);
ctx.callbackQueue.Enqueue(ctx);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/AsyncIOContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public unsafe struct AsyncIOContext<Key, Value>
/// <summary>
/// Callback queue
/// </summary>
public BlockingCollection<AsyncIOContext<Key, Value>> callbackQueue;
public AsyncQueue<AsyncIOContext<Key, Value>> callbackQueue;

/// <summary>
/// Dispose
Expand Down
4 changes: 3 additions & 1 deletion cs/src/core/Async/AsyncFasterKv.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
/*
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -90,3 +91,4 @@ public ValueTask<Guid> TakeCheckpoint(long untilSN = -1)
}
}
}
*/
2 changes: 1 addition & 1 deletion cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ internal class FasterExecutionContext : SerializedFasterExecutionContext
public long totalPending;
public Queue<PendingContext> retryRequests;
public Dictionary<long, PendingContext> ioPendingRequests;
public BlockingCollection<AsyncIOContext<Key, Value>> readyResponses;
public AsyncQueue<AsyncIOContext<Key, Value>> readyResponses;
}
}

Expand Down
41 changes: 11 additions & 30 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace FASTER.core
{
public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase, IFasterKV<Key, Value, Input, Output, Context>
public partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase, IFasterKV<Key, Value, Input, Output, Context>
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
Expand Down Expand Up @@ -321,6 +322,15 @@ public bool CompletePending(bool wait = false)
return InternalCompletePending(wait);
}

/// <summary>
/// Complete outstanding pending operations
/// </summary>
/// <returns></returns>
public async ValueTask CompletePendingAsync()
{
await InternalCompletePendingAsync();
}

/// <summary>
/// Complete the ongoing checkpoint (if any)
/// </summary>
Expand Down Expand Up @@ -370,21 +380,6 @@ public bool CompleteCheckpoint(bool wait = false)
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long monotonicSerialNum)
{
return Read(ref key, ref input, ref output, ref userContext, monotonicSerialNum);
}

/// <summary>
/// Read
/// </summary>
/// <param name="key"></param>
/// <param name="input"></param>
/// <param name="output"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, ref Context userContext, long monotonicSerialNum)
{
var context = default(PendingContext);
var internalStatus = InternalRead(ref key, ref input, ref output, ref userContext, ref context);
Expand All @@ -411,20 +406,6 @@ public Status Read(ref Key key, ref Input input, ref Output output, ref Context
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long monotonicSerialNum)
{
return Upsert(ref key, ref desiredValue, ref userContext, monotonicSerialNum);
}

/// <summary>
/// Upsert
/// </summary>
/// <param name="key"></param>
/// <param name="desiredValue"></param>
/// <param name="userContext"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Upsert(ref Key key, ref Value desiredValue, ref Context userContext, long monotonicSerialNum)
{
var context = default(PendingContext);
var internalStatus = InternalUpsert(ref key, ref desiredValue, ref userContext, ref context);
Expand Down
77 changes: 74 additions & 3 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace FASTER.core
{
public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase, IFasterKV<Key, Value, Input, Output, Context>
public partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase, IFasterKV<Key, Value, Input, Output, Context>
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
Expand Down Expand Up @@ -132,7 +133,7 @@ internal void InitContext(FasterExecutionContext ctx, Guid token)
ctx.totalPending = 0;
ctx.guid = token;
ctx.retryRequests = new Queue<PendingContext>();
ctx.readyResponses = new BlockingCollection<AsyncIOContext<Key, Value>>();
ctx.readyResponses = new AsyncQueue<AsyncIOContext<Key, Value>>();
ctx.ioPendingRequests = new Dictionary<long, PendingContext>();
}

Expand Down Expand Up @@ -196,6 +197,48 @@ internal bool InternalCompletePending(bool wait = false)
return false;
}

internal async ValueTask InternalCompletePendingAsync()
{
do
{
bool done = true;

#region Previous pending requests
if (threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING)
{
await CompleteIOPendingRequestsAsync(prevThreadCtx.Value);
Debug.Assert(prevThreadCtx.Value.ioPendingRequests.Count == 0);

InternalRefresh();
CompleteRetryRequests(prevThreadCtx.Value);

done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0);
done &= (prevThreadCtx.Value.retryRequests.Count == 0);
}
#endregion

if (!(threadCtx.Value.phase == Phase.IN_PROGRESS
||
threadCtx.Value.phase == Phase.WAIT_PENDING))
{
await CompleteIOPendingRequestsAsync(threadCtx.Value);
Debug.Assert(threadCtx.Value.ioPendingRequests.Count == 0);
}
InternalRefresh();
CompleteRetryRequests(threadCtx.Value);

done &= (threadCtx.Value.ioPendingRequests.Count == 0);
done &= (threadCtx.Value.retryRequests.Count == 0);

if (done)
{
return;
}
} while (true);
}

internal void CompleteRetryRequests(FasterExecutionContext context)
{
int count = context.retryRequests.Count;
Expand All @@ -210,8 +253,36 @@ internal void CompleteIOPendingRequests(FasterExecutionContext context)
{
if (context.readyResponses.Count == 0) return;

while (context.readyResponses.TryTake(out AsyncIOContext<Key, Value> request))
while (context.readyResponses.TryDequeue(out AsyncIOContext<Key, Value> request))
{
InternalContinuePendingRequestAndCallback(context, request);
}
}

internal async ValueTask CompleteIOPendingRequestsAsync(FasterExecutionContext context, CancellationToken token = default(CancellationToken))
{
while (context.ioPendingRequests.Count > 0)
{
AsyncIOContext<Key, Value> request;

if (context.readyResponses.Count > 0)
{
context.readyResponses.TryDequeue(out request);
}
else
{
// Save context on continuation stack (from thread local)
var prevThreadCtxCopy = prevThreadCtx.Value;
var threadCtxCopy = threadCtx.Value;

// Suspend epoch
epoch.Suspend();

request = await context.readyResponses.DequeueAsync(token);
// Restore context
SetContext(prevThreadCtxCopy, threadCtxCopy);
}

InternalContinuePendingRequestAndCallback(context, request);
}
}
Expand Down
71 changes: 71 additions & 0 deletions cs/src/core/Utilities/AsyncQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

namespace FASTER.core
{
/// <summary>
/// Async queue
/// </summary>
/// <typeparam name="T"></typeparam>
public class AsyncQueue<T>
{
private readonly SemaphoreSlim semaphore;
private readonly ConcurrentQueue<T> queue;

/// <summary>
/// Queue count
/// </summary>
public int Count => queue.Count;

/// <summary>
/// Constructor
/// </summary>
public AsyncQueue()
{
semaphore = new SemaphoreSlim(0);
queue = new ConcurrentQueue<T>();
}

/// <summary>
/// Enqueue item
/// </summary>
/// <param name="item"></param>
public void Enqueue(T item)
{
queue.Enqueue(item);
semaphore.Release();
}

/// <summary>
/// Async dequeue
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await semaphore.WaitAsync(cancellationToken);

if (queue.TryDequeue(out T item))
{
return item;
}
}
}

/// <summary>
/// Try dequeue (if item exists)
/// </summary>
/// <param name="item"></param>
/// <returns></returns>
public bool TryDequeue(out T item)
{
return queue.TryDequeue(out item);
}
}
}
24 changes: 14 additions & 10 deletions cs/src/core/Utilities/Native32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ internal static extern bool GetDiskFreeSpace(string lpRootPathName,

[DllImport("kernel32.dll", SetLastError = true)]
internal static extern bool DeleteFileW([MarshalAs(UnmanagedType.LPWStr)]string lpFileName);
#endregion
#endregion

#region Thread and NUMA functions
[DllImport("kernel32.dll")]
Expand Down Expand Up @@ -241,10 +241,19 @@ public static bool EnableProcessPrivileges()
if (!LookupPrivilegeValue(null, "SeManageVolumePrivilege",
ref token_privileges.Privileges.Luid)) return false;

if (!OpenProcessToken(GetCurrentProcess(), 0x20, out IntPtr token)) return false;
if (!OpenProcessToken(GetCurrentProcess(), 0x20, out IntPtr token))
return false;

if (!AdjustTokenPrivileges(token, 0, ref token_privileges, 0, 0, 0)) return false;
if (Marshal.GetLastWin32Error() != 0) return false;
if (!AdjustTokenPrivileges(token, 0, ref token_privileges, 0, 0, 0))
{
CloseHandle(token);
return false;
}
if (Marshal.GetLastWin32Error() != 0)
{
CloseHandle(token);
return false;
}
CloseHandle(token);
return true;
}
Expand Down Expand Up @@ -282,13 +291,8 @@ internal static bool EnableVolumePrivileges(string filename, SafeFileHandle hand
(void*)&mhi, sizeof(MARK_HANDLE_INFO), IntPtr.Zero,
0, ref bytes_returned, IntPtr.Zero);

if (!result)
{
return false;
}

volume_handle.Close();
return true;
return result;
}

/// <summary>
Expand Down
9 changes: 0 additions & 9 deletions cs/src/core/Utilities/Utility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,8 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.Security;
using System.IO;
using System.Runtime.CompilerServices;
using Microsoft.Win32.SafeHandles;
using System.Diagnostics;
using System.Threading;

namespace FASTER.core
{
Expand Down

0 comments on commit 571333d

Please sign in to comment.