Skip to content

Commit

Permalink
Support large number of sessions (not limited by epoch table size). S…
Browse files Browse the repository at this point in the history
…essions load their context into the thread-local context on every operation. Currently, we do not suspend sessions automatically after each operation.

Slightly improved async API - not wired in.
  • Loading branch information
badrishc committed Aug 1, 2019
1 parent 366a8cc commit 0353678
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 182 deletions.
35 changes: 24 additions & 11 deletions cs/src/core/Async/AsyncFasterKv.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ namespace FASTER.core.async
public class AsyncFasterKV<Key, Value, Input, Output, Functions>
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Empty>
where Functions : IFunctions<Key, Value, Input, Output, TaskCompletionSource<Output>>
{
private readonly FasterKV<Key, Value, Input, Output, Empty, Functions> fht;
private readonly FasterKV<Key, Value, Input, Output, TaskCompletionSource<Output>, Functions> fht;

/// <summary>
///
Expand All @@ -32,7 +32,7 @@ public class AsyncFasterKV<Key, Value, Input, Output, Functions>
/// <param name="variableLengthStructSettings"></param>
public AsyncFasterKV(long size, Functions functions, LogSettings logSettings, CheckpointSettings checkpointSettings = null, SerializerSettings<Key, Value> serializerSettings = null, IFasterEqualityComparer<Key> comparer = null, VariableLengthStructSettings<Key, Value> variableLengthStructSettings = null)
{
fht = new FasterKV<Key, Value, Input, Output, Empty, Functions>(size, functions, logSettings, checkpointSettings, serializerSettings, comparer, variableLengthStructSettings);
fht = new FasterKV<Key, Value, Input, Output, TaskCompletionSource<Output>, Functions>(size, functions, logSettings, checkpointSettings, serializerSettings, comparer, variableLengthStructSettings);

}

Expand All @@ -44,13 +44,12 @@ public AsyncFasterKV(long size, Functions functions, LogSettings logSettings, Ch
/// <param name="output"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public async Task<Output> Read(Key key, Input input, Output output, long monotonicSerialNum)
public async ValueTask<Output> Read(Key key, Input input, Output output, long monotonicSerialNum)
{
var status = fht.Read(ref key, ref input, ref output, Empty.Default, monotonicSerialNum);
TaskCompletionSource<Output> tcs = null;
var status = fht.Read(ref key, ref input, ref output, ref tcs, monotonicSerialNum);
if (status != Status.PENDING)
return output;
var tcs = new TaskCompletionSource<Output>();

return await tcs.Task;
}

Expand All @@ -61,17 +60,31 @@ public async Task<Output> Read(Key key, Input input, Output output, long monoton
/// <param name="desiredValue"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public async Task Upsert(Key key, Value desiredValue, long monotonicSerialNum)
public async ValueTask Upsert(Key key, Value desiredValue, long monotonicSerialNum)
{
var status = fht.Upsert(ref key, ref desiredValue, Empty.Default, monotonicSerialNum);
TaskCompletionSource<Output> tcs = null;
var status = fht.Upsert(ref key, ref desiredValue, ref tcs, monotonicSerialNum);
if (status != Status.PENDING)
return;
var tcs = new TaskCompletionSource<Output>();
await tcs.Task;
}

/// <summary>
///
/// </summary>
/// <param name="wait"></param>
/// <returns></returns>
public ValueTask<bool> InternalCompletePending(bool wait = false)
{
return new ValueTask<bool>(fht.CompletePending(wait));
}

public async Task<Guid> TakeCheckpoint(long untilSN = -1)
/// <summary>
///
/// </summary>
/// <param name="untilSN"></param>
/// <returns></returns>
public ValueTask<Guid> TakeCheckpoint(long untilSN = -1)
{
throw new Exception();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,37 @@ namespace FASTER.core
/// <typeparam name="Output"></typeparam>
/// <typeparam name="Context"></typeparam>
/// <typeparam name="Functions"></typeparam>
public class Session<Key, Value, Input, Output, Context, Functions> : IDisposable
public class ClientSession<Key, Value, Input, Output, Context, Functions> : IDisposable
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
private FasterKV<Key, Value, Input, Output, Context, Functions> fht;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx;
private readonly int epochEntry;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx;

internal static Session<Key, Value, Input, Output, Context, Functions> Create(
Session<Key, Value, Input, Output, Context, Functions>[] _sessions,
FasterKV <Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx,
int epochEntry)
{
if (_sessions[epochEntry] == null)
{
_sessions[epochEntry] = new Session<Key, Value, Input, Output, Context, Functions>(fht, prevThreadCtx, threadCtx, epochEntry);
return _sessions[epochEntry];
}

var session = _sessions[epochEntry];
session.fht = fht;
session.prevThreadCtx = prevThreadCtx;
session.threadCtx = threadCtx;
return session;
}

private Session(
internal ClientSession(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx,
int epochEntry)
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx)
{
this.fht = fht;
this.prevThreadCtx = prevThreadCtx;
this.threadCtx = threadCtx;
this.epochEntry = epochEntry;
this.prevCtx = prevCtx;
this.ctx = ctx;
}

/// <summary>
/// Get session Guid
/// </summary>
public Guid ID { get { return threadCtx.guid; } }
public Guid ID { get { return ctx.guid; } }

/// <summary>
/// Dispose session
/// </summary>
public void Dispose()
{
Resume();
fht.CompletePending(true);
fht.StopSession();
}

Expand All @@ -77,16 +55,7 @@ public void Dispose()
/// </summary>
public void Resume()
{
fht.SetContext(prevThreadCtx, threadCtx, epochEntry);
}

/// <summary>
/// Return shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public void Return(bool completePending = false)
{
fht.SuspendSession(completePending);
fht.SetContext(prevCtx, ctx);
}

/// <summary>
Expand Down
74 changes: 74 additions & 0 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma warning disable 0162

using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;

namespace FASTER.core
{
public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase, IFasterKV<Key, Value, Input, Output, Context>
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
/// <summary>
/// Start new shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientSession()
{
Guid guid = Guid.NewGuid();
var ctx = new FasterExecutionContext();
InitContext(ctx, guid);
var prevCtx = new FasterExecutionContext();
InitContext(prevCtx, guid);
return new ClientSession<Key, Value, Input, Output, Context, Functions>(this, prevCtx, ctx);
}


internal void SetContext(FasterExecutionContext prevThreadCtx, FasterExecutionContext threadCtx)
{
if (!epoch.ThreadEntry.IsInitializedForThread)
{
epoch.Acquire();
epoch.ThreadEntry.InitializeThread();
this.prevThreadCtx.InitializeThread();
this.threadCtx.InitializeThread();
}
this.prevThreadCtx.Value = prevThreadCtx;
this.threadCtx.Value = threadCtx;
Refresh();
}

/// <summary>
/// Suspend session with FASTER
/// </summary>
internal void SuspendSession(bool completePending = false)
{
if (completePending)
{
while (true)
{
bool done = true;
if (threadCtx.Value.retryRequests.Count != 0 || threadCtx.Value.ioPendingRequests.Count != 0)
done = false;

if (prevThreadCtx.Value != default(FasterExecutionContext))
{
if (prevThreadCtx.Value.retryRequests.Count != 0 || prevThreadCtx.Value.ioPendingRequests.Count != 0)
done = false;
}
if (threadCtx.Value.phase != Phase.REST)
done = false;

if (done) break;
Refresh();
}
}
epoch.Release();
}
}
}
13 changes: 13 additions & 0 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,19 @@ public void Release()
(*(tableAligned + entry)).threadId = 0;
}

/// <summary>
/// Thread suspends its epoch entry
/// </summary>
public void Suspend()
{
int entry = threadEntryIndex.Value;
if (kInvalidIndex == entry)
{
return;
}
(*(tableAligned + entry)).localCurrentEpoch = int.MaxValue;
}

internal FastThreadLocal<int> ThreadEntry => threadEntryIndex;

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/FASTER.core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@

<ItemGroup>
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="4.5.2" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.3" />
</ItemGroup>
</Project>
Loading

0 comments on commit 0353678

Please sign in to comment.