Skip to content

Commit

Permalink
Support async/await session model (#130)
Browse files Browse the repository at this point in the history
Introducing fundamental change to how users interact with FASTER. We now have the notion of a "client session" that can operate across threads, i.e., it is compatible with the async framework. All key-value operations on FASTER occur through a session. There can be multiple concurrent sessions to FASTER, but a given session is expected to not be used concurrently. Operations on a session have async variants, and include a flag to indicate if they return immediately or after the commit/checkpoint.
  • Loading branch information
badrishc authored Dec 20, 2019
1 parent 2987b6e commit a8e1f3c
Show file tree
Hide file tree
Showing 88 changed files with 3,855 additions and 2,048 deletions.
9 changes: 9 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorage
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterKVAsyncSample", "playground\FasterKVAsyncSample\FasterKVAsyncSample.csproj", "{859F76F4-93D8-4D60-BF9A-363E217FA247}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -166,6 +168,12 @@ Global
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.Build.0 = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Debug|Any CPU.ActiveCfg = Debug|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Debug|x64.ActiveCfg = Debug|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Debug|x64.Build.0 = Debug|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Release|Any CPU.ActiveCfg = Release|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Release|x64.ActiveCfg = Release|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -187,6 +195,7 @@ Global
{A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
{E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{859F76F4-93D8-4D60-BF9A-363E217FA247} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
25 changes: 16 additions & 9 deletions cs/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Introduction to FASTER C#
=========================

FASTER C# works in .NET Framework and .NET core, and can be used in both a single-threaded and concurrent setting. It has been tested to work on both Windows and Linux. It exposes an API that allows one to performs a mix of Reads, Blind Updates (Upserts), and Read-Modify-Write operations. It supports data larger than memory, and accepts an `IDevice` implementation for storing logs on storage. We have provided `IDevice` implementations for local file system, but one may create new devices, for example, to write to remote file systems. Alternatively, one may mount remote storage into the local file system. FASTER may be used as a high-performance replacement for traditional concurrent data structures such as the .NET ConcurrentDictionary, and additionally supports larger-than-memory data. It also supports checkpointing of the data structure - both incremental and non-incremental.
FASTER C# works in .NET Framework and .NET core, and can be used in both a single-threaded and concurrent setting. It has been tested to work on both Windows and Linux. It exposes an API that allows one to performs a mix of Reads, Blind Updates (Upserts), and Read-Modify-Write operations. It supports data larger than memory, and accepts an `IDevice` implementation for storing logs on storage. We have provided `IDevice` implementations for local file system and Azure Page Blobs, but one may create new devices as well. We also offer meta-devices that can group device instances into sharded and tiered configurations. FASTER may be used as a high-performance replacement for traditional concurrent data structures such as the .NET ConcurrentDictionary, and additionally supports larger-than-memory data. It also supports checkpointing of the data structure - both incremental and non-incremental. Operations on FASTER can be issued synchronously or asynchronously, i.e., using the C# `async` interface.

Table of Contents
-----------
Expand All @@ -14,7 +14,7 @@ Table of Contents
## Getting FASTER

### Building From Sources
Clone the Git repo, open cs/FASTER.sln in VS 2017, and build.
Clone the Git repo, open cs/FASTER.sln in VS 2019, and build.

### NuGet
You can install FASTER binaries using Nuget, from Nuget.org. Right-click on your project, manage NuGet packages, browse for FASTER. Here is a [direct link](https://www.nuget.org/packages/FASTER).
Expand All @@ -30,7 +30,7 @@ FASTER supports three basic operations:

### Constructor

Before instantiating FASTER, you need to create storage devices that FASTER will use. If you are using blittable types, you only need the hybrid log device. If you are also using objects, you need to create a separate object log device.
Before instantiating FASTER, you need to create storage devices that FASTER will use. If you are using value (blittable) types, you only need one log device. If you are also using objects, you need to create a separate object log device.

```Csharp
IDevice log = Devices.CreateLogDevice("C:\\Temp\\hybridlog_native.log");
Expand Down Expand Up @@ -59,7 +59,7 @@ The user provides an instance of a type that implements `IFunctions<>`. This typ

1. SingleReader and ConcurrentReader: These are used to read from the store values and copy them to Output. Single reader can assume there are no concurrent operations.
2. SingleWriter and ConcurrentWriter: These are used to write values to the store, from a source value.
3. Completion callbacks: Called when various operations complete.
3. Completion callbacks: Called by FASTER when various operations complete.
4. RMW Updaters: There are three updaters that the user specifies, InitialUpdater, InPlaceUpdater, and CopyUpdater. Together, they are used to implement the RMW operation.

### Constructor Parameters
Expand All @@ -76,15 +76,22 @@ The total in-memory footprint of FASTER is controlled by the following parameter

### Sessions (Threads)

Once FASTER is instantiated, threads may use FASTER by registering themselves via the concept of a Session, using the call
Once FASTER is instantiated, one issues operations to FASTER by creating logical sessions. A session represents a sequence of operations issued to FASTER. There is no concurrency within a session, but different sessions may execute concurrently. Sessions do not need to be affinitized to threads, but if they are, FASTER can leverage the same (covered later). You create a session as follows:

```fht.StartSession();```
```var session = fht.NewSession();```

At the end, the thread calls:
You can then perform a sequence of read, upsert, and RMW operations on the session. FASTER supports sync and async versions of operations. Examples:

```fht.StopSession();```
```cs
var status = session.Read(ref key, ref input, ref output, ref context);
await session.ReadAsync(key, input);
```

At the end, the session is disposed:

```session.Dispose();```

When all threads are done operating on FASTER, you finally dispose the FASTER instance:
When all sessions are done operating on FASTER, you finally dispose the FASTER instance:

```fht.Dispose();```

Expand Down
3 changes: 1 addition & 2 deletions cs/benchmark/ConcurrentDictionaryBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ public enum Op : ulong
Input[] input_;
Input* input_ptr;

ConcurrentDictionary<Key, Value> store;
readonly ConcurrentDictionary<Key, Value> store;

long total_ops_done = 0;

const string kKeyWorkload = "a";
readonly int threadCount;
readonly int numaStyle;
readonly string distribution;
Expand Down
52 changes: 28 additions & 24 deletions cs/benchmark/FasterYcsbBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ public enum Op : ulong
ReadModifyWrite = 2
}

const bool kUseSyntheticData = false;
#if DEBUG
const bool kUseSmallData = true;
const bool kUseSyntheticData = true;
#else
const bool kUseSmallData = false;
const bool kUseSyntheticData = false;
#endif
const long kInitCount = kUseSmallData ? 2500480 : 250000000;
const long kTxnCount = kUseSmallData ? 10000000 : 1000000000;
const int kMaxKey = kUseSmallData ? 1 << 22 : 1 << 28;
Expand All @@ -42,11 +47,10 @@ public enum Op : ulong
Input[] input_;
readonly IDevice device;

FasterKV<Key, Value, Input, Output, Empty, Functions> store;
readonly FasterKV<Key, Value, Input, Output, Empty, Functions> store;

long total_ops_done = 0;

const string kKeyWorkload = "a";
readonly int threadCount;
readonly int numaStyle;
readonly string distribution;
Expand Down Expand Up @@ -97,9 +101,9 @@ private void RunYcsb(int thread_idx)
sw.Start();


Value value = default(Value);
Input input = default(Input);
Output output = default(Output);
Value value = default;
Input input = default;
Output output = default;

long reads_done = 0;
long writes_done = 0;
Expand All @@ -111,7 +115,7 @@ private void RunYcsb(int thread_idx)
int count = 0;
#endif

store.StartSession();
var session = store.NewSession(null, true);

while (!done)
{
Expand All @@ -136,25 +140,25 @@ private void RunYcsb(int thread_idx)

if (idx % 256 == 0)
{
store.Refresh();
session.Refresh();

if (idx % 65536 == 0)
{
store.CompletePending(false);
session.CompletePending(false);
}
}

switch (op)
{
case Op.Upsert:
{
store.Upsert(ref txn_keys_[idx], ref value, Empty.Default, 1);
session.Upsert(ref txn_keys_[idx], ref value, Empty.Default, 1);
++writes_done;
break;
}
case Op.Read:
{
Status result = store.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default, 1);
Status result = session.Read(ref txn_keys_[idx], ref input, ref output, Empty.Default, 1);
if (result == Status.OK)
{
++reads_done;
Expand All @@ -163,7 +167,7 @@ private void RunYcsb(int thread_idx)
}
case Op.ReadModifyWrite:
{
Status result = store.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default, 1);
Status result = session.RMW(ref txn_keys_[idx], ref input_[idx & 0x7], Empty.Default, 1);
if (result == Status.OK)
{
++writes_done;
Expand Down Expand Up @@ -192,8 +196,9 @@ private void RunYcsb(int thread_idx)
#endif
}

store.CompletePending(true);
store.StopSession();
session.CompletePending(true);
session.Dispose();

sw.Stop();

Console.WriteLine("Thread " + thread_idx + " done; " + reads_done + " reads, " +
Expand Down Expand Up @@ -312,7 +317,7 @@ private void SetupYcsb(int thread_idx)
else
Native32.AffinitizeThreadShardedNuma((uint)thread_idx, 2); // assuming two NUMA sockets

store.StartSession();
var session = store.NewSession(null, true);

#if DASHBOARD
var tstart = Stopwatch.GetTimestamp();
Expand All @@ -321,7 +326,7 @@ private void SetupYcsb(int thread_idx)
int count = 0;
#endif

Value value = default(Value);
Value value = default;

for (long chunk_idx = Interlocked.Add(ref idx_, kChunkSize) - kChunkSize;
chunk_idx < kInitCount;
Expand All @@ -331,15 +336,15 @@ private void SetupYcsb(int thread_idx)
{
if (idx % 256 == 0)
{
store.Refresh();
session.Refresh();

if (idx % 65536 == 0)
{
store.CompletePending(false);
session.CompletePending(false);
}
}

store.Upsert(ref init_keys_[idx], ref value, Empty.Default, 1);
session.Upsert(ref init_keys_[idx], ref value, Empty.Default, 1);
}
#if DASHBOARD
count += (int)kChunkSize;
Expand All @@ -357,9 +362,8 @@ private void SetupYcsb(int thread_idx)
#endif
}


store.CompletePending(true);
store.StopSession();
session.CompletePending(true);
session.Dispose();
}

#if DASHBOARD
Expand Down Expand Up @@ -433,7 +437,7 @@ void DoContinuousMeasurements()
}
#endif

#region Load Data
#region Load Data

private unsafe void LoadDataFromFile(string filePath)
{
Expand Down Expand Up @@ -575,7 +579,7 @@ private void LoadSyntheticData()
Console.WriteLine("loaded " + kTxnCount + " txns.");

}
#endregion
#endregion


}
Expand Down
5 changes: 3 additions & 2 deletions cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Runtime.CompilerServices;
using System.Diagnostics;
using FASTER.core;
using System.Collections.Generic;

namespace FASTER.benchmark
{
Expand All @@ -28,9 +29,9 @@ public void DeleteCompletionCallback(ref Key key, Empty ctx)
{
}

public void CheckpointCompletionCallback(Guid sessionId, long serialNum)
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
{
Debug.WriteLine("Session {0} reports persistence until {1}", sessionId, serialNum);
Debug.WriteLine("Session {0} reports persistence until {1}", sessionId, commitPoint.UntilSerialNo);
}

// Read functions
Expand Down
Loading

0 comments on commit a8e1f3c

Please sign in to comment.