Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async/await session model #130

Merged
merged 118 commits into from
Dec 20, 2019
Merged
Changes from 1 commit
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
484c674
Initial checkin
badrishc May 16, 2019
fc89978
Merge branch 'master' into async-support
badrishc May 21, 2019
63d1276
Merge branch 'master' into async-support
badrishc May 21, 2019
0c7bddd
Merge branch 'master' into async-support
badrishc Jul 10, 2019
3d17a2d
Merge branch 'master' into async-support
badrishc Jul 10, 2019
2b7e24f
Rough sketch of session without touching currrent thread-local framew…
badrishc Jul 11, 2019
00dc474
Improved support, added session pooling option, added session testcases.
badrishc Jul 12, 2019
533652b
Updates to session support
badrishc Jul 13, 2019
2fff5cb
Merge branch 'master' into async-support
badrishc Jul 24, 2019
6c5420a
Merge branch 'master' into async-support
badrishc Jul 26, 2019
4412113
Merge branch 'master' of https://github.com/Microsoft/FASTER into asy…
badrishc Jul 26, 2019
366a8cc
Merge branch 'async-support' of https://github.com/Microsoft/FASTER i…
badrishc Jul 26, 2019
0353678
Support large number of sessions (not limited by epoch table size). S…
badrishc Aug 1, 2019
7a487e3
Update FASTER.core.csproj
badrishc Aug 1, 2019
571333d
Initial checkin of async CompletePending - no tests yet.
badrishc Aug 3, 2019
d63603c
Merge branch 'async-support' of https://github.com/Microsoft/FASTER i…
badrishc Aug 3, 2019
c41855e
Update azure-pipelines.yml
badrishc Aug 3, 2019
252ef54
Update azure-pipelines.yml
badrishc Aug 3, 2019
2a2d7ac
Update azure-pipelines.yml
badrishc Aug 3, 2019
c67d882
Update azure-pipelines.yml
badrishc Aug 3, 2019
bcb75e7
Update azure-pipelines.yml
badrishc Aug 3, 2019
2eafe9b
Support suspend/resume sessions
badrishc Aug 6, 2019
0bd752e
Support true async CompleteCheckpoint
badrishc Aug 7, 2019
dbe13fd
Improved async sessions interface, working now. Does not persist doma…
badrishc Aug 8, 2019
1754b68
Persist dormant sessions correctly.
badrishc Aug 8, 2019
a486834
Update testcase
badrishc Aug 8, 2019
d72c92a
Correct handling of thread switching with async
badrishc Aug 9, 2019
869a88e
Merge branch 'master' into async-support
badrishc Aug 9, 2019
e5a4859
Merge branch 'async-support' of https://github.com/Microsoft/FASTER i…
badrishc Aug 9, 2019
82a01a5
Update
badrishc Aug 9, 2019
26be7f6
update
badrishc Aug 9, 2019
733e4de
update
badrishc Aug 9, 2019
30adeca
Updates and bugfixes
badrishc Aug 12, 2019
9e84d63
Removing sample test code.
badrishc Aug 12, 2019
9c60144
Update
badrishc Aug 13, 2019
d7064e8
Updated fast threadlocal
badrishc Aug 13, 2019
ea64fdf
Revert "Update"
badrishc Aug 13, 2019
363508c
Merge remote-tracking branch 'origin/async-support-test2' into async-…
badrishc Aug 13, 2019
789d38e
update
badrishc Aug 13, 2019
572b8bd
revert max threads in pool
badrishc Aug 13, 2019
4b75494
test
badrishc Aug 14, 2019
026912c
test
badrishc Aug 14, 2019
03c4d8e
Update azure-pipelines.yml for Azure Pipelines
badrishc Aug 14, 2019
e925af3
Update azure-pipelines.yml for Azure Pipelines
badrishc Aug 14, 2019
1c33b49
Update azure-pipelines.yml
badrishc Aug 14, 2019
4dc6d27
Update azure-pipelines.yml
badrishc Aug 14, 2019
5cda2f9
more tests, phew
badrishc Aug 15, 2019
68df34d
trying to reduce async context - incomplete
badrishc Aug 17, 2019
7e80b07
Merge branch 'master' into async-support
badrishc Aug 20, 2019
81a4a82
Merging from master
badrishc Aug 29, 2019
75b9cb2
Update
badrishc Sep 3, 2019
b16212f
Updates
badrishc Sep 4, 2019
b2479ed
Merge branch 'async-support-test' of https://github.com/Microsoft/FAS…
badrishc Sep 4, 2019
d03bcea
Updates
badrishc Sep 4, 2019
00fadc5
Merge branch 'master' into async-support-test
badrishc Sep 4, 2019
27b677e
Fixes
badrishc Sep 4, 2019
94b901e
Merge branch 'master' into async-support-test
badrishc Sep 4, 2019
9afc5fb
Wired in excluded serial nos for commit points.
badrishc Sep 5, 2019
62cecc8
Cleaning up interface and comments.
badrishc Sep 5, 2019
6cab32a
Fixing thread local init, cleanup
badrishc Sep 5, 2019
990d789
Merging from work branch
badrishc Sep 5, 2019
b9404b6
Fixing merge
badrishc Sep 5, 2019
78c2649
Updates to support sessions natively without thread-local
badrishc Sep 12, 2019
e170463
updates
badrishc Sep 12, 2019
38a26ff
Added tests
badrishc Sep 13, 2019
0740f87
Merge branch 'async-support' into async-support-test
badrishc Sep 13, 2019
669881d
Cleanup of warnings
badrishc Sep 13, 2019
29c64e1
Merge branch 'master' into async-support-test
badrishc Sep 13, 2019
16edd83
Initial checkin
badrishc Sep 17, 2019
3077f52
Updates.
badrishc Sep 17, 2019
853b3ea
Updates
badrishc Sep 17, 2019
6315a14
Cleaned up epochs, improved fine grain scalability.
badrishc Sep 18, 2019
19d5d82
Fixing test change
badrishc Sep 18, 2019
4ef03bb
Merge branch 'fasterlog' into async-support
badrishc Sep 18, 2019
ce31917
Fixing break after merge.
badrishc Sep 18, 2019
88d7269
Added commit and recovery support.
badrishc Sep 19, 2019
ddcc338
Added TryAppend so users can implement log throttling.
badrishc Sep 19, 2019
2cd85e3
Fasterlog lowmem (#178)
badrishc Sep 26, 2019
ec2a3b5
Fasterlog TryAppend (#179)
badrishc Sep 30, 2019
bb4e357
minor fix
badrishc Sep 30, 2019
4504937
merge
badrishc Sep 30, 2019
b06d112
Fasterlog async (#180)
badrishc Oct 3, 2019
002b993
Merge branch 'master' into fasterlog
badrishc Oct 4, 2019
944504b
Added tailing iterator WaitAsync to wait for iteration to proceed.
badrishc Oct 4, 2019
540d1a5
Merge branch 'fasterlog' of https://github.com/Microsoft/FASTER into …
badrishc Oct 4, 2019
80a2aeb
Convert Span to ReadOnlySpan for appends
badrishc Oct 4, 2019
0050694
Added MemoryPool/IMemoryOwner variant of iterator
badrishc Oct 4, 2019
127e908
Updates
badrishc Oct 4, 2019
6dc7af6
Updated way to pin pooled memory
badrishc Oct 4, 2019
ff27448
Update azure-pipelines.yml
badrishc Oct 4, 2019
72cdfdb
Merging from FasterLog branch for epoch and allocator goodness.
badrishc Oct 5, 2019
0c0bf58
Updates to merge
badrishc Oct 5, 2019
7565a21
Merging
badrishc Oct 5, 2019
8e42a74
Support minimum buffer size of just 1 page!
badrishc Oct 5, 2019
915f01e
Merge branch 'fasterlog' into async-support
badrishc Oct 6, 2019
089d545
Cleanup and updates.
badrishc Oct 7, 2019
c55de3f
Actually checking in support for 1 page in memory, added initial draf…
badrishc Oct 7, 2019
db68ae0
Added a test
badrishc Oct 7, 2019
5caea66
Improved sample, changed GetMemory to use byte[] instead of Span<byte>
badrishc Oct 7, 2019
e0f745f
Merging from fasterlog
badrishc Oct 7, 2019
fc47dbd
Merging goodness from master.
badrishc Oct 31, 2019
c4fba23
Merge branch 'master' into async-support
badrishc Oct 31, 2019
ca15128
Merge branch 'master' into async-support
badrishc Nov 14, 2019
0f65fbb
Merge branch 'master' into async-support
badrishc Nov 20, 2019
72ecc4f
Fixed break
badrishc Nov 20, 2019
a1b87a5
Merge branch 'master' into async-support
badrishc Dec 3, 2019
6b03887
Merge branch 'master' into async-support
badrishc Dec 3, 2019
876c0d3
Merging from master
badrishc Dec 4, 2019
b3cccd0
Fix merge
badrishc Dec 4, 2019
1eca340
Major API cleanup and porting to session-based interface
badrishc Dec 9, 2019
0237a0c
Added async API to sessions, added first draft of async sample.
badrishc Dec 10, 2019
255c000
Added tests
badrishc Dec 12, 2019
a16c5eb
Update README.md
badrishc Dec 12, 2019
95c0e58
Merge branch 'master' into async-support
badrishc Dec 14, 2019
0624fd5
Updates based on review.
badrishc Dec 19, 2019
b31631b
Merge branch 'async-support' of https://github.com/microsoft/FASTER i…
badrishc Dec 19, 2019
ecbf696
Minor fixes.
badrishc Dec 19, 2019
49866bf
fixed call to newsession
badrishc Dec 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Support large number of sessions (not limited by epoch table size). S…
…essions load their context into the thread-local context on every operation. Currently, we do not suspend sessions automatically after each operation.

Slightly improved async API - not wired in.
  • Loading branch information
badrishc committed Aug 1, 2019
commit 0353678ecf60c81a5018987024e206de081f549e
35 changes: 24 additions & 11 deletions cs/src/core/Async/AsyncFasterKv.cs
Original file line number Diff line number Diff line change
@@ -16,9 +16,9 @@ namespace FASTER.core.async
public class AsyncFasterKV<Key, Value, Input, Output, Functions>
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Empty>
where Functions : IFunctions<Key, Value, Input, Output, TaskCompletionSource<Output>>
{
private readonly FasterKV<Key, Value, Input, Output, Empty, Functions> fht;
private readonly FasterKV<Key, Value, Input, Output, TaskCompletionSource<Output>, Functions> fht;

/// <summary>
///
@@ -32,7 +32,7 @@ public class AsyncFasterKV<Key, Value, Input, Output, Functions>
/// <param name="variableLengthStructSettings"></param>
public AsyncFasterKV(long size, Functions functions, LogSettings logSettings, CheckpointSettings checkpointSettings = null, SerializerSettings<Key, Value> serializerSettings = null, IFasterEqualityComparer<Key> comparer = null, VariableLengthStructSettings<Key, Value> variableLengthStructSettings = null)
{
fht = new FasterKV<Key, Value, Input, Output, Empty, Functions>(size, functions, logSettings, checkpointSettings, serializerSettings, comparer, variableLengthStructSettings);
fht = new FasterKV<Key, Value, Input, Output, TaskCompletionSource<Output>, Functions>(size, functions, logSettings, checkpointSettings, serializerSettings, comparer, variableLengthStructSettings);

}

@@ -44,13 +44,12 @@ public AsyncFasterKV(long size, Functions functions, LogSettings logSettings, Ch
/// <param name="output"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public async Task<Output> Read(Key key, Input input, Output output, long monotonicSerialNum)
public async ValueTask<Output> Read(Key key, Input input, Output output, long monotonicSerialNum)
{
var status = fht.Read(ref key, ref input, ref output, Empty.Default, monotonicSerialNum);
TaskCompletionSource<Output> tcs = null;
var status = fht.Read(ref key, ref input, ref output, ref tcs, monotonicSerialNum);
if (status != Status.PENDING)
return output;
var tcs = new TaskCompletionSource<Output>();

return await tcs.Task;
}

@@ -61,17 +60,31 @@ public async Task<Output> Read(Key key, Input input, Output output, long monoton
/// <param name="desiredValue"></param>
/// <param name="monotonicSerialNum"></param>
/// <returns></returns>
public async Task Upsert(Key key, Value desiredValue, long monotonicSerialNum)
public async ValueTask Upsert(Key key, Value desiredValue, long monotonicSerialNum)
{
var status = fht.Upsert(ref key, ref desiredValue, Empty.Default, monotonicSerialNum);
TaskCompletionSource<Output> tcs = null;
var status = fht.Upsert(ref key, ref desiredValue, ref tcs, monotonicSerialNum);
if (status != Status.PENDING)
return;
var tcs = new TaskCompletionSource<Output>();
await tcs.Task;
}

/// <summary>
///
/// </summary>
/// <param name="wait"></param>
/// <returns></returns>
public ValueTask<bool> InternalCompletePending(bool wait = false)
{
return new ValueTask<bool>(fht.CompletePending(wait));
}

public async Task<Guid> TakeCheckpoint(long untilSN = -1)
/// <summary>
///
/// </summary>
/// <param name="untilSN"></param>
/// <returns></returns>
public ValueTask<Guid> TakeCheckpoint(long untilSN = -1)
{
throw new Exception();
}
Original file line number Diff line number Diff line change
@@ -16,59 +16,37 @@ namespace FASTER.core
/// <typeparam name="Output"></typeparam>
/// <typeparam name="Context"></typeparam>
/// <typeparam name="Functions"></typeparam>
public class Session<Key, Value, Input, Output, Context, Functions> : IDisposable
public class ClientSession<Key, Value, Input, Output, Context, Functions> : IDisposable
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
private FasterKV<Key, Value, Input, Output, Context, Functions> fht;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx;
private readonly int epochEntry;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx;
private FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx;

internal static Session<Key, Value, Input, Output, Context, Functions> Create(
Session<Key, Value, Input, Output, Context, Functions>[] _sessions,
FasterKV <Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx,
int epochEntry)
{
if (_sessions[epochEntry] == null)
{
_sessions[epochEntry] = new Session<Key, Value, Input, Output, Context, Functions>(fht, prevThreadCtx, threadCtx, epochEntry);
return _sessions[epochEntry];
}

var session = _sessions[epochEntry];
session.fht = fht;
session.prevThreadCtx = prevThreadCtx;
session.threadCtx = threadCtx;
return session;
}

private Session(
internal ClientSession(
FasterKV<Key, Value, Input, Output, Context, Functions> fht,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevThreadCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext threadCtx,
int epochEntry)
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext prevCtx,
FasterKV<Key, Value, Input, Output, Context, Functions>.FasterExecutionContext ctx)
{
this.fht = fht;
this.prevThreadCtx = prevThreadCtx;
this.threadCtx = threadCtx;
this.epochEntry = epochEntry;
this.prevCtx = prevCtx;
this.ctx = ctx;
}

/// <summary>
/// Get session Guid
/// </summary>
public Guid ID { get { return threadCtx.guid; } }
public Guid ID { get { return ctx.guid; } }

/// <summary>
/// Dispose session
/// </summary>
public void Dispose()
{
Resume();
fht.CompletePending(true);
fht.StopSession();
}

@@ -77,16 +55,7 @@ public void Dispose()
/// </summary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to say how it is unsafe here

public void Resume()
{
fht.SetContext(prevThreadCtx, threadCtx, epochEntry);
}

/// <summary>
/// Return shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public void Return(bool completePending = false)
{
fht.SuspendSession(completePending);
fht.SetContext(prevCtx, ctx);
}

/// <summary>
74 changes: 74 additions & 0 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma warning disable 0162

using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;

namespace FASTER.core
{
public unsafe partial class FasterKV<Key, Value, Input, Output, Context, Functions> : FasterBase, IFasterKV<Key, Value, Input, Output, Context>
where Key : new()
where Value : new()
where Functions : IFunctions<Key, Value, Input, Output, Context>
{
/// <summary>
/// Start new shared (not thread-specific) session with FASTER
/// </summary>
/// <returns></returns>
public ClientSession<Key, Value, Input, Output, Context, Functions> StartClientSession()
{
Guid guid = Guid.NewGuid();
var ctx = new FasterExecutionContext();
InitContext(ctx, guid);
var prevCtx = new FasterExecutionContext();
InitContext(prevCtx, guid);
return new ClientSession<Key, Value, Input, Output, Context, Functions>(this, prevCtx, ctx);
}


internal void SetContext(FasterExecutionContext prevThreadCtx, FasterExecutionContext threadCtx)
{
if (!epoch.ThreadEntry.IsInitializedForThread)
{
epoch.Acquire();
epoch.ThreadEntry.InitializeThread();
this.prevThreadCtx.InitializeThread();
this.threadCtx.InitializeThread();
}
this.prevThreadCtx.Value = prevThreadCtx;
this.threadCtx.Value = threadCtx;
Refresh();
}

/// <summary>
/// Suspend session with FASTER
/// </summary>
internal void SuspendSession(bool completePending = false)
{
if (completePending)
{
while (true)
{
bool done = true;
if (threadCtx.Value.retryRequests.Count != 0 || threadCtx.Value.ioPendingRequests.Count != 0)
done = false;

if (prevThreadCtx.Value != default(FasterExecutionContext))
{
if (prevThreadCtx.Value.retryRequests.Count != 0 || prevThreadCtx.Value.ioPendingRequests.Count != 0)
done = false;
}
if (threadCtx.Value.phase != Phase.REST)
done = false;

if (done) break;
Refresh();
}
}
epoch.Release();
}
}
}
13 changes: 13 additions & 0 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
@@ -197,6 +197,19 @@ public void Release()
(*(tableAligned + entry)).threadId = 0;
}

/// <summary>
/// Thread suspends its epoch entry
/// </summary>
public void Suspend()
{
int entry = threadEntryIndex.Value;
if (kInvalidIndex == entry)
{
return;
}
(*(tableAligned + entry)).localCurrentEpoch = int.MaxValue;
}

internal FastThreadLocal<int> ThreadEntry => threadEntryIndex;

/// <summary>
1 change: 1 addition & 0 deletions cs/src/core/FASTER.core.csproj
Original file line number Diff line number Diff line change
@@ -36,5 +36,6 @@

<ItemGroup>
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="4.5.2" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.3" />
</ItemGroup>
</Project>
Loading