From a328fcb98f40390c50ef3901b029079f6024046f Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 22 Apr 2021 14:38:27 -0700 Subject: [PATCH] [C#] Expand async session-free API sample in playground (#452) * added sample * cleaned up async stress * Fix memory leak + add serialized wrapper sample * added testcases for concurrency within session in async. * clean up LMD --- cs/playground/AsyncStress/AsyncStress.csproj | 1 + cs/playground/AsyncStress/FasterWrapper.cs | 71 +++--- cs/playground/AsyncStress/IFasterWrapper.cs | 20 ++ cs/playground/AsyncStress/Program.cs | 149 +++++------ .../AsyncStress/SerializedFasterWrapper.cs | 236 ++++++++++++++++++ cs/src/core/ClientSession/FASTERAsync.cs | 21 +- cs/src/core/Device/LocalMemoryDevice.cs | 2 +- cs/src/core/Index/FASTER/FASTERImpl.cs | 16 +- cs/src/core/Utilities/BufferPool.cs | 16 ++ cs/test/LowMemAsyncTests.cs | 155 ++++++++++++ 10 files changed, 555 insertions(+), 132 deletions(-) create mode 100644 cs/playground/AsyncStress/IFasterWrapper.cs create mode 100644 cs/playground/AsyncStress/SerializedFasterWrapper.cs create mode 100644 cs/test/LowMemAsyncTests.cs diff --git a/cs/playground/AsyncStress/AsyncStress.csproj b/cs/playground/AsyncStress/AsyncStress.csproj index 746767112..45846211f 100644 --- a/cs/playground/AsyncStress/AsyncStress.csproj +++ b/cs/playground/AsyncStress/AsyncStress.csproj @@ -8,6 +8,7 @@ + diff --git a/cs/playground/AsyncStress/FasterWrapper.cs b/cs/playground/AsyncStress/FasterWrapper.cs index 12a01db8c..8df6e940c 100644 --- a/cs/playground/AsyncStress/FasterWrapper.cs +++ b/cs/playground/AsyncStress/FasterWrapper.cs @@ -1,50 +1,51 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. +// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -using FASTER.core; -using Xunit; using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using Xunit; +using FASTER.core; namespace AsyncStress { - public class FasterWrapper + public class FasterWrapper : IFasterWrapper { readonly FasterKV _store; readonly AsyncPool>> _sessionPool; - - // OS Buffering is safe to use in this app because Reads are done after all updates - internal static bool useOsReadBuffering = false; + readonly bool useOsReadBuffering; + int upsertPendingCount = 0; - public FasterWrapper() + // This can be used to verify the same amount data is loaded. + public long TailAddress => _store.Log.TailAddress; + + // Indicates how many upsert operations went pending + public int UpsertPendingCount { get => upsertPendingCount; set => upsertPendingCount = value; } + // Whether OS Read buffering is enabled + public bool UseOsReadBuffering => useOsReadBuffering; + + public FasterWrapper(bool useOsReadBuffering = false) { - var logDirectory ="d:/FasterLogs"; + var logDirectory = "d:/AsyncStress"; var logFileName = Guid.NewGuid().ToString(); var logSettings = new LogSettings { LogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering), - ObjectLogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering), + ObjectLogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.obj.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering), PageSizeBits = 12, MemorySizeBits = 13 }; Console.WriteLine($" Using {logSettings.LogDevice.GetType()}"); + this.useOsReadBuffering = useOsReadBuffering; _store = new FasterKV(1L << 20, logSettings); _sessionPool = new AsyncPool>>( logSettings.LogDevice.ThrottleLimit, () => _store.For(new SimpleFunctions()).NewSession>()); } - // This can be used to verify the same amount data is loaded. - public long TailAddress => _store.Log.TailAddress; - - // Indicates how many operations went pending - public int UpsertPendingCount = 0; - public int ReadPendingCount = 0; - public async ValueTask UpsertAsync(Key key, Value value) { if (!_sessionPool.TryGet(out var session)) @@ -52,7 +53,7 @@ public async ValueTask UpsertAsync(Key key, Value value) var r = await session.UpsertAsync(key, value); while (r.Status == Status.PENDING) { - Interlocked.Increment(ref UpsertPendingCount); + Interlocked.Increment(ref upsertPendingCount); r = await r.CompleteAsync(); } _sessionPool.Return(session); @@ -63,26 +64,21 @@ public void Upsert(Key key, Value value) if (!_sessionPool.TryGet(out var session)) session = _sessionPool.GetAsync().GetAwaiter().GetResult(); var status = session.Upsert(key, value); - if (status == Status.PENDING) - { - // This should not happen for sync Upsert(). - Interlocked.Increment(ref UpsertPendingCount); - session.CompletePending(); - } + Assert.True(status != Status.PENDING); _sessionPool.Return(session); } - public async ValueTask UpsertChunkAsync((Key, Value)[] chunk) + public async ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int count) { if (!_sessionPool.TryGet(out var session)) session = _sessionPool.GetAsync().GetAwaiter().GetResult(); - for (var ii = 0; ii < chunk.Length; ++ii) + for (var i = 0; i < count; ++i) { - var r = await session.UpsertAsync(chunk[ii].Item1, chunk[ii].Item2); + var r = await session.UpsertAsync(chunk[offset + i].Item1, chunk[offset + i].Item2); while (r.Status == Status.PENDING) { - Interlocked.Increment(ref UpsertPendingCount); + Interlocked.Increment(ref upsertPendingCount); r = await r.CompleteAsync(); } } @@ -105,7 +101,6 @@ public async ValueTask UpsertChunkAsync((Key, Value)[] chunk) var result = session.Read(key); if (result.status == Status.PENDING) { - Interlocked.Increment(ref ReadPendingCount); session.CompletePendingWithOutputs(out var completedOutputs, wait: true); int count = 0; for (; completedOutputs.Next(); ++count) @@ -120,26 +115,16 @@ public async ValueTask UpsertChunkAsync((Key, Value)[] chunk) return new ValueTask<(Status, Value)>(result); } - public async ValueTask ReadChunkAsync(Key[] chunk, ValueTask<(Status, Value)>[] results, int offset) + public async ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk, int offset, int count) { if (!_sessionPool.TryGet(out var session)) session = _sessionPool.GetAsync().GetAwaiter().GetResult(); // Reads in chunk are performed serially - for (var ii = 0; ii < chunk.Length; ++ii) - results[offset + ii] = new ValueTask<(Status, Value)>((await session.ReadAsync(chunk[ii])).Complete()); - _sessionPool.Return(session); - } + (Status, Value)[] result = new (Status, Value)[count]; + for (var i = 0; i < count; ++i) + result[i] = (await session.ReadAsync(chunk[offset + i]).ConfigureAwait(false)).Complete(); - public async ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk) - { - if (!_sessionPool.TryGet(out var session)) - session = _sessionPool.GetAsync().GetAwaiter().GetResult(); - - // Reads in chunk are performed serially - (Status, Value)[] result = new (Status, Value)[chunk.Length]; - for (var ii = 0; ii < chunk.Length; ++ii) - result[ii] = (await session.ReadAsync(chunk[ii]).ConfigureAwait(false)).Complete(); _sessionPool.Return(session); return result; } diff --git a/cs/playground/AsyncStress/IFasterWrapper.cs b/cs/playground/AsyncStress/IFasterWrapper.cs new file mode 100644 index 000000000..677277114 --- /dev/null +++ b/cs/playground/AsyncStress/IFasterWrapper.cs @@ -0,0 +1,20 @@ +using FASTER.core; +using System.Threading.Tasks; + +namespace AsyncStress +{ + public interface IFasterWrapper + { + long TailAddress { get; } + int UpsertPendingCount { get; set; } + bool UseOsReadBuffering { get; } + + void Dispose(); + ValueTask<(Status, Value)> Read(Key key); + ValueTask<(Status, Value)> ReadAsync(Key key); + ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk, int offset, int count); + void Upsert(Key key, Value value); + ValueTask UpsertAsync(Key key, Value value); + ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int count); + } +} \ No newline at end of file diff --git a/cs/playground/AsyncStress/Program.cs b/cs/playground/AsyncStress/Program.cs index d3b3066fc..260505c19 100644 --- a/cs/playground/AsyncStress/Program.cs +++ b/cs/playground/AsyncStress/Program.cs @@ -6,7 +6,6 @@ using System.Threading.Tasks; using Xunit; using FASTER.core; -using System.Linq; namespace AsyncStress { @@ -17,35 +16,35 @@ enum ThreadingMode { None, Single, - ParallelAsync, - ParallelSync, + ParallelFor, Chunks } - static ThreadingMode upsertThreadingMode = ThreadingMode.ParallelAsync; - static ThreadingMode readThreadingMode = ThreadingMode.ParallelAsync; - static int numTasks = 4; + static ThreadingMode upsertThreadingMode = ThreadingMode.ParallelFor; + static ThreadingMode readThreadingMode = ThreadingMode.ParallelFor; + static int numChunks = 10; static int numOperations = 1_000_000; static void Usage() { Console.WriteLine($"Options:"); - Console.WriteLine($" -u : Upsert threading mode (listed below); default is {ThreadingMode.ParallelAsync}"); - Console.WriteLine($" -r : Read threading mode (listed below); default is {ThreadingMode.ParallelAsync}"); - Console.WriteLine($" -t #: Number of tasks for {ThreadingMode.ParallelSync} and {ThreadingMode.Chunks}; default is {numTasks}"); + Console.WriteLine($" -u : Upsert threading mode (listed below); default is {ThreadingMode.ParallelFor}"); + Console.WriteLine($" -r : Read threading mode (listed below); default is {ThreadingMode.ParallelFor}"); + Console.WriteLine($" -c #: Number of chunks for {ThreadingMode.Chunks}; default is {numChunks}"); Console.WriteLine($" -n #: Number of operations; default is {numOperations}"); - Console.WriteLine($" -b #: Use OS buffering for reads; default is {FasterWrapper.useOsReadBuffering}"); + Console.WriteLine($" -b #: Use OS buffering for reads; default is false"); Console.WriteLine($" -?, /?, --help: Show this screen"); Console.WriteLine(); Console.WriteLine($"Threading Modes:"); Console.WriteLine($" None: Do not run this operation"); Console.WriteLine($" Single: Run this operation single-threaded"); - Console.WriteLine($" ParallelAsync: Run this operation using Parallel.For with an Async lambda"); - Console.WriteLine($" ParallelSync: Run this operation using Parallel.For with an Sync lambda and parallelism limited to numTasks"); - Console.WriteLine($" Chunks: Run this operation using a set number of async tasks to operate on partitioned chunks"); + Console.WriteLine($" ParallelFor: Run this operation using Parallel.For with an Async lambda"); + Console.WriteLine($" Chunks: Run this operation using a set number of data chunks as async tasks, each chunk runs operations serially"); } public static async Task Main(string[] args) { + bool useOsReadBuffering = false; + if (args.Length > 0) { for (var ii = 0; ii < args.Length; ++ii) @@ -63,12 +62,12 @@ string nextArg() upsertThreadingMode = Enum.Parse(nextArg(), ignoreCase: true); else if (arg == "-r") readThreadingMode = Enum.Parse(nextArg(), ignoreCase: true); - else if (arg == "-t") - numTasks = int.Parse(nextArg()); + else if (arg == "-c") + numChunks = int.Parse(nextArg()); else if (arg == "-n") numOperations = int.Parse(nextArg()); else if (arg == "-b") - FasterWrapper.useOsReadBuffering = true; + useOsReadBuffering = true; else if (arg == "-?" || arg == "/?" || arg == "--help") { Usage(); @@ -78,22 +77,42 @@ string nextArg() throw new ApplicationException($"Unknown switch: {arg}"); } } - await ProfileStore(new FasterWrapper()); + + // Store with value types, no object log + // await ProfileStore(new FasterWrapper(useOsReadBuffering), e => (long)e, e => (long)e); + + // Store with reference types, using object log + // await ProfileStore(new FasterWrapper(useOsReadBuffering), e => $"key {e}", e => $"value {e}"); + + // Store with reference or value types, no object log (store serialized bytes) + await ProfileStore(new SerializedFasterWrapper(useOsReadBuffering), e => $"key {e}", e => $"value {e}"); } - private static async Task ProfileStore(FasterWrapper store) + private static async Task ProfileStore(TStore store, Func keyGen, Func valueGen) + where TStore : IFasterWrapper { static string threadingModeString(ThreadingMode threadingMode) => threadingMode switch { ThreadingMode.Single => "Single threading", - ThreadingMode.ParallelAsync => "Parallel.For using async lambda", - ThreadingMode.ParallelSync => $"Parallel.For using sync lambda and {numTasks} tasks", - ThreadingMode.Chunks => $"Chunks partitioned across {numTasks} tasks", + ThreadingMode.ParallelFor => "Parallel.For issuing async operations", + ThreadingMode.Chunks => $"Chunks partitioned across {numChunks} tasks", _ => throw new ApplicationException("Unknown threading mode") }; - int chunkSize = numOperations / numTasks; + Console.WriteLine(" Creating database"); + (TKey, TValue)[] database = new (TKey, TValue)[numOperations]; + TKey[] keys = new TKey[numOperations]; + for (int i = 0; i < numOperations; i++) + { + database[i] = (keyGen(i), valueGen(i)); + keys[i] = database[i].Item1; + } + Console.WriteLine(" Creation complete"); + + Assert.True(numOperations % numChunks == 0, $"Number of operations {numOperations} should be a multiple of number of chunks {numChunks}"); + + int chunkSize = numOperations / numChunks; // Insert if (upsertThreadingMode == ThreadingMode.None) @@ -108,34 +127,26 @@ static string threadingModeString(ThreadingMode threadingMode) if (upsertThreadingMode == ThreadingMode.Single) { for (int i = 0; i < numOperations; i++) - await store.UpsertAsync(i, i); + await store.UpsertAsync(database[i].Item1, database[i].Item2); } - else if (upsertThreadingMode == ThreadingMode.ParallelAsync) + else if (upsertThreadingMode == ThreadingMode.ParallelFor) { var writeTasks = new ValueTask[numOperations]; - Parallel.For(0, numOperations, key => writeTasks[key] = store.UpsertAsync(key, key)); + Parallel.For(0, numOperations, i => writeTasks[i] = store.UpsertAsync(database[i].Item1, database[i].Item2)); foreach (var task in writeTasks) await task; } - else if (upsertThreadingMode == ThreadingMode.ParallelSync) - { - // Without throttling parallelism, this ends up very slow with many threads waiting on FlushTask. - var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = numTasks }; - Parallel.For(0, numOperations, parallelOptions, key => store.Upsert(key, key)); - } - else + else if (upsertThreadingMode == ThreadingMode.Chunks) { - Debug.Assert(upsertThreadingMode == ThreadingMode.Chunks); - var chunkTasks = new ValueTask[numTasks]; - for (int ii = 0; ii < numTasks; ii++) - { - var chunk = new (int, int)[chunkSize]; - for (int i = 0; i < chunkSize; i++) chunk[i] = (ii * chunkSize + i, ii * chunkSize + i); - chunkTasks[ii] = store.UpsertChunkAsync(chunk); - } + var chunkTasks = new ValueTask[numChunks]; + for (int i = 0; i < numChunks; i++) + chunkTasks[i] = store.UpsertChunkAsync(database, i * chunkSize, chunkSize); foreach (var chunkTask in chunkTasks) await chunkTask; - } + } + else + throw new InvalidOperationException($"Invalid threading mode {upsertThreadingMode}"); + sw.Stop(); Console.WriteLine($" Insertion complete in {sw.ElapsedMilliseconds} ms; TailAddress = {store.TailAddress}, Pending = {store.UpsertPendingCount}"); } @@ -148,56 +159,48 @@ static string threadingModeString(ThreadingMode threadingMode) } else { - Console.WriteLine($" Reading {numOperations} records with {threadingModeString(readThreadingMode)} (OS buffering: {FasterWrapper.useOsReadBuffering}) ..."); - var readTasks = new ValueTask<(Status, int)>[numOperations]; - var readPendingString = string.Empty; + Console.WriteLine($" Reading {numOperations} records with {threadingModeString(readThreadingMode)} (OS buffering: {store.UseOsReadBuffering}) ..."); + (Status, TValue)[] results = new (Status, TValue)[numOperations]; var sw = Stopwatch.StartNew(); if (readThreadingMode == ThreadingMode.Single) { - for (int ii = 0; ii < numOperations; ii++) + for (int i = 0; i < numOperations; i++) { - readTasks[ii] = store.ReadAsync(ii); - await readTasks[ii]; + var result = await store.ReadAsync(database[i].Item1); + results[i] = result; } } - else if (readThreadingMode == ThreadingMode.ParallelAsync) + else if (readThreadingMode == ThreadingMode.ParallelFor) { - Parallel.For(0, numOperations, key => readTasks[key] = store.ReadAsync(key)); - foreach (var task in readTasks) - await task; - } - else if (readThreadingMode == ThreadingMode.ParallelSync) - { - // Without throttling parallelism, this ends up very slow with many threads waiting on completion. - var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = numTasks }; - Parallel.For(0, numOperations, parallelOptions, key => readTasks[key] = store.Read(key)); - foreach (var task in readTasks) - await task; - readPendingString = $"; Pending = {store.ReadPendingCount}"; + var readTasks = new ValueTask<(Status, TValue)>[numOperations]; + Parallel.For(0, numOperations, i => readTasks[i] = store.ReadAsync(database[i].Item1)); + for (int i = 0; i < numOperations; i++) + results[i] = await readTasks[i]; } - else + else if(readThreadingMode == ThreadingMode.Chunks) { - var chunkTasks = Enumerable.Range(0, numTasks).Select(ii => + var chunkTasks = new ValueTask<(Status, TValue)[]>[numChunks]; + for (int i = 0; i < numChunks; i++) + chunkTasks[i] = store.ReadChunkAsync(keys, i * chunkSize, chunkSize); + for (int i = 0; i < numChunks; i++) { - var chunk = new int[chunkSize]; - for (int i = 0; i < chunkSize; i++) chunk[i] = ii * chunkSize + i; - return store.ReadChunkAsync(chunk, readTasks, ii * chunkSize); - }).ToArray(); - foreach (var chunkTask in chunkTasks) - await chunkTask; + var result = await chunkTasks[i]; + Array.Copy(result, 0, results, i * chunkSize, chunkSize); + } } + else + throw new InvalidOperationException($"Invalid threading mode {readThreadingMode}"); sw.Stop(); - Console.WriteLine($" Reads complete in {sw.ElapsedMilliseconds} ms{readPendingString}"); + Console.WriteLine($" Reads complete in {sw.ElapsedMilliseconds} ms"); // Verify Console.WriteLine(" Verifying read results ..."); - Parallel.For(0, numOperations, key => + Parallel.For(0, numOperations, i => { - (Status status, int? result) = readTasks[key].Result; - Assert.Equal(Status.OK, status); - Assert.Equal(key, result); + Assert.Equal(Status.OK, results[i].Item1); + Assert.Equal(database[i].Item2, results[i].Item2); }); Console.WriteLine(" Results verified"); diff --git a/cs/playground/AsyncStress/SerializedFasterWrapper.cs b/cs/playground/AsyncStress/SerializedFasterWrapper.cs new file mode 100644 index 000000000..184ba5f6e --- /dev/null +++ b/cs/playground/AsyncStress/SerializedFasterWrapper.cs @@ -0,0 +1,236 @@ +using Xunit; +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using System.Buffers; + +using FASTER.core; +using MessagePack; + +namespace AsyncStress +{ + public class SerializedFasterWrapper : IFasterWrapper + { + readonly FasterKV _store; + readonly AsyncPool> _sessionPool; + readonly bool useOsReadBuffering; + int upsertPendingCount = 0; + + // This can be used to verify the same amount data is loaded. + public long TailAddress => _store.Log.TailAddress; + + // Indicates how many operations went pending + public int UpsertPendingCount { get => upsertPendingCount; set => upsertPendingCount = value; } + // Whether OS Read buffering is enabled + public bool UseOsReadBuffering => useOsReadBuffering; + + public SerializedFasterWrapper(bool useOsReadBuffering = false) + { + var logDirectory = "d:/FasterLogs"; + var logFileName = Guid.NewGuid().ToString(); + var logSettings = new LogSettings + { + LogDevice = new ManagedLocalStorageDevice(Path.Combine(logDirectory, $"{logFileName}.log"), deleteOnClose: true, osReadBuffering: useOsReadBuffering), + PageSizeBits = 12, + MemorySizeBits = 13 + }; + + Console.WriteLine($" Using {logSettings.LogDevice.GetType()}"); + + this.useOsReadBuffering = useOsReadBuffering; + _store = new FasterKV(1L << 20, logSettings); + _sessionPool = new AsyncPool>( + logSettings.LogDevice.ThrottleLimit, + () => _store.For(new SpanByteFunctions()).NewSession()); + } + + public async ValueTask UpsertAsync(Key key, Value value) + { + if (!_sessionPool.TryGet(out var session)) + session = await _sessionPool.GetAsync(); + + byte[] keyBytes = MessagePackSerializer.Serialize(key); + byte[] valueBytes = MessagePackSerializer.Serialize(value); + ValueTask.UpsertAsyncResult> task; + + unsafe + { + fixed (byte* kb = keyBytes) + { + fixed (byte* vb = valueBytes) + { + var keySpanByte = SpanByte.FromPointer(kb, keyBytes.Length); + var valueSpanByte = SpanByte.FromPointer(vb, valueBytes.Length); + task = session.UpsertAsync(ref keySpanByte, ref valueSpanByte); + } + } + } + var r = await task; + while (r.Status == Status.PENDING) + { + Interlocked.Increment(ref upsertPendingCount); + r = await r.CompleteAsync(); + } + _sessionPool.Return(session); + } + + public void Upsert(Key key, Value value) + { + if (!_sessionPool.TryGet(out var session)) + session = _sessionPool.GetAsync().GetAwaiter().GetResult(); + + byte[] keyBytes = MessagePackSerializer.Serialize(key); + byte[] valueBytes = MessagePackSerializer.Serialize(value); + Status status; + + unsafe + { + fixed (byte* kb = keyBytes) + { + fixed (byte* vb = valueBytes) + { + var keySpanByte = SpanByte.FromPointer(kb, keyBytes.Length); + var valueSpanByte = SpanByte.FromPointer(vb, valueBytes.Length); + status = session.Upsert(ref keySpanByte, ref valueSpanByte); + } + } + } + + Assert.True(status != Status.PENDING); + _sessionPool.Return(session); + } + + public async ValueTask UpsertChunkAsync((Key, Value)[] chunk, int offset, int count) + { + if (!_sessionPool.TryGet(out var session)) + session = _sessionPool.GetAsync().GetAwaiter().GetResult(); + + for (var i = 0; i < count; ++i) + { + byte[] keyBytes = MessagePackSerializer.Serialize(chunk[offset + i].Item1); + byte[] valueBytes = MessagePackSerializer.Serialize(chunk[offset + i].Item2); + ValueTask.UpsertAsyncResult> task; + + unsafe + { + fixed (byte* kb = keyBytes) + { + fixed (byte* vb = valueBytes) + { + var keySpanByte = SpanByte.FromPointer(kb, keyBytes.Length); + var valueSpanByte = SpanByte.FromPointer(vb, valueBytes.Length); + task = session.UpsertAsync(ref keySpanByte, ref valueSpanByte); + } + } + } + + var r = await task; + while (r.Status == Status.PENDING) + { + Interlocked.Increment(ref upsertPendingCount); + r = await r.CompleteAsync(); + } + } + _sessionPool.Return(session); + } + + public async ValueTask<(Status, Value)> ReadAsync(Key key) + { + if (!_sessionPool.TryGet(out var session)) + session = await _sessionPool.GetAsync(); + + byte[] keyBytes = MessagePackSerializer.Serialize(key); + ValueTask.ReadAsyncResult> task; + + unsafe + { + fixed (byte* kb = keyBytes) + { + var keySpanByte = SpanByte.FromPointer(kb, keyBytes.Length); + task = session.ReadAsync(ref keySpanByte); + } + } + + var (status, output) = (await task.ConfigureAwait(false)).Complete(); + _sessionPool.Return(session); + + using IMemoryOwner memoryOwner = output.Memory; + + return (status, status != Status.OK ? default : MessagePackSerializer.Deserialize(memoryOwner.Memory)); + } + + public ValueTask<(Status, Value)> Read(Key key) + { + if (!_sessionPool.TryGet(out var session)) + session = _sessionPool.GetAsync().GetAwaiter().GetResult(); + + byte[] keyBytes = MessagePackSerializer.Serialize(key); + (Status, SpanByteAndMemory) result; + (Status, Value) userResult = default; + + unsafe + { + fixed (byte* kb = keyBytes) + { + var keySpanByte = SpanByte.FromPointer(kb, keyBytes.Length); + result = session.Read(keySpanByte); + } + } + + if (result.Item1 == Status.PENDING) + { + session.CompletePendingWithOutputs(out var completedOutputs, wait: true); + int count = 0; + for (; completedOutputs.Next(); ++count) + { + using IMemoryOwner memoryOwner = completedOutputs.Current.Output.Memory; + userResult = (completedOutputs.Current.Status, completedOutputs.Current.Status != Status.OK ? default : MessagePackSerializer.Deserialize(memoryOwner.Memory)); + } + completedOutputs.Dispose(); + Assert.Equal(1, count); + } + else + { + using IMemoryOwner memoryOwner = result.Item2.Memory; + userResult = (result.Item1, result.Item1 != Status.OK ? default : MessagePackSerializer.Deserialize(memoryOwner.Memory)); + } + _sessionPool.Return(session); + return new ValueTask<(Status, Value)>(userResult); + } + + public async ValueTask<(Status, Value)[]> ReadChunkAsync(Key[] chunk, int offset, int count) + { + if (!_sessionPool.TryGet(out var session)) + session = _sessionPool.GetAsync().GetAwaiter().GetResult(); + + // Reads in chunk are performed serially + (Status, Value)[] result = new (Status, Value)[count]; + for (var i = 0; i < count; ++i) + result[i] = await ReadAsync(chunk[offset + i]).ConfigureAwait(false); + + _sessionPool.Return(session); + return result; + } + + + public void Dispose() + { + _sessionPool.Dispose(); + _store.Dispose(); + } + } + + public class SpanByteFunctions : SpanByteFunctions + { + public unsafe override void SingleReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory dst) + { + value.CopyTo(ref dst, MemoryPool.Shared); + } + + public unsafe override void ConcurrentReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory dst) + { + value.CopyTo(ref dst, MemoryPool.Shared); + } + } +} diff --git a/cs/src/core/ClientSession/FASTERAsync.cs b/cs/src/core/ClientSession/FASTERAsync.cs index 1bf1be9ce..c94ddd01c 100644 --- a/cs/src/core/ClientSession/FASTERAsync.cs +++ b/cs/src/core/ClientSession/FASTERAsync.cs @@ -315,7 +315,7 @@ internal interface IUpdelAsyncOperation { TAsyncResult CreateResult(OperationStatus internalStatus); - OperationStatus DoFastOperation(FasterKV fasterKV, PendingContext pendingContext, IFasterSession fasterSession, + OperationStatus DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession, FasterExecutionContext currentCtx); ValueTask DoSlowOperation(FasterKV fasterKV, IFasterSession fasterSession, FasterExecutionContext currentCtx, PendingContext pendingContext, @@ -333,7 +333,7 @@ internal struct UpsertAsyncOperation : IUpdelAsyncOperat { public UpsertAsyncResult CreateResult(OperationStatus internalStatus) => new UpsertAsyncResult(internalStatus); - public OperationStatus DoFastOperation(FasterKV fasterKV, PendingContext pendingContext, IFasterSession fasterSession, + public OperationStatus DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession, FasterExecutionContext currentCtx) => fasterKV.InternalUpsert(ref pendingContext.key.Get(), ref pendingContext.value.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum); @@ -346,7 +346,7 @@ internal struct DeleteAsyncOperation : IUpdelAsyncOperat { public DeleteAsyncResult CreateResult(OperationStatus internalStatus) => new DeleteAsyncResult(internalStatus); - public OperationStatus DoFastOperation(FasterKV fasterKV, PendingContext pendingContext, IFasterSession fasterSession, + public OperationStatus DoFastOperation(FasterKV fasterKV, ref PendingContext pendingContext, IFasterSession fasterSession, FasterExecutionContext currentCtx) => fasterKV.InternalDelete(ref pendingContext.key.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum); @@ -404,11 +404,14 @@ internal async ValueTask CompleteAsync(CancellationToken token = d do { _flushTask = _fasterKV.hlog.FlushTask; - internalStatus = asyncOperation.DoFastOperation(_fasterKV, _pendingContext, _fasterSession, _currentCtx); + internalStatus = asyncOperation.DoFastOperation(_fasterKV, ref _pendingContext, _fasterSession, _currentCtx); } while (internalStatus == OperationStatus.RETRY_NOW); - _pendingContext.Dispose(); + if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) + { + _pendingContext.Dispose(); return asyncOperation.CreateResult(internalStatus); + } Debug.Assert(internalStatus == OperationStatus.ALLOCATE_FAILED); } finally @@ -700,16 +703,20 @@ internal async ValueTask> CompleteAsync(C if (_flushTask is { }) { await _flushTask.WithCancellationAsync(token); - return await _fasterKV.RmwAsync(_fasterSession, _currentCtx, ref _pendingContext.key.Get(), ref _pendingContext.input.Get(), _pendingContext.userContext, _pendingContext.serialNum, token); + var task = _fasterKV.RmwAsync(_fasterSession, _currentCtx, ref _pendingContext.key.Get(), ref _pendingContext.input.Get(), _pendingContext.userContext, _pendingContext.serialNum, token); + _pendingContext.Dispose(); + return await task; } _fasterSession.UnsafeResumeThread(); try { var status = _fasterKV.InternalCompletePendingRequestFromContext(_currentCtx, _currentCtx, _fasterSession, _diskRequest, ref _pendingContext, true, out newDiskRequest); - _pendingContext.Dispose(); if (status != Status.PENDING) + { + _pendingContext.Dispose(); return new RmwAsyncResult(status, default); + } } finally { diff --git a/cs/src/core/Device/LocalMemoryDevice.cs b/cs/src/core/Device/LocalMemoryDevice.cs index e67a4b7d3..17473d0a8 100644 --- a/cs/src/core/Device/LocalMemoryDevice.cs +++ b/cs/src/core/Device/LocalMemoryDevice.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. +// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. using System; diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 8e3faab97..213e30c7f 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -245,9 +245,9 @@ internal OperationStatus InternalRead( CreatePendingContext: { pendingContext.type = OperationType.READ; - if (!pendingContext.NoKey) // If this is true, we don't have a valid key + if (!pendingContext.NoKey && pendingContext.key == default) // If this is true, we don't have a valid key pendingContext.key = hlog.GetKeyContainer(ref key); - pendingContext.input = fasterSession.GetHeapContainer(ref input); + if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input); pendingContext.output = output; if (pendingContext.output is IHeapConvertible heapConvertible) @@ -413,8 +413,8 @@ internal OperationStatus InternalUpsert( Debug.Assert(latchDestination == LatchDestination.CreatePendingContext, $"Upsert CreatePendingContext encountered latchDest == {latchDestination}"); { pendingContext.type = OperationType.UPSERT; - pendingContext.key = hlog.GetKeyContainer(ref key); - pendingContext.value = hlog.GetValueContainer(ref value); + if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key); + if (pendingContext.value == default) pendingContext.value = hlog.GetValueContainer(ref value); pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; @@ -763,8 +763,8 @@ internal OperationStatus InternalRMW( Debug.Assert(latchDestination == LatchDestination.CreatePendingContext, $"RMW CreatePendingContext encountered latchDest == {latchDestination}"); { pendingContext.type = OperationType.RMW; - pendingContext.key = hlog.GetKeyContainer(ref key); - pendingContext.input = fasterSession.GetHeapContainer(ref input); + if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key); + if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input); pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; @@ -1187,7 +1187,7 @@ internal OperationStatus InternalDelete( CreatePendingContext: { pendingContext.type = OperationType.DELETE; - pendingContext.key = hlog.GetKeyContainer(ref key); + if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key); pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; @@ -1328,7 +1328,7 @@ internal OperationStatus InternalContinuePendingRead[] queue; +#if CHECK_FOR_LEAKS + static int totalGets, totalReturns; +#endif /// /// Constructor @@ -149,6 +154,10 @@ public SectorAlignedBufferPool(int recordSize, int sectorSize) [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Return(SectorAlignedMemory page) { +#if CHECK_FOR_LEAKS + Interlocked.Increment(ref totalReturns); +#endif + Debug.Assert(queue[page.level] != null); page.available_bytes = 0; page.required_bytes = 0; @@ -187,6 +196,10 @@ private static int Position(int v) [MethodImpl(MethodImplOptions.AggressiveInlining)] public unsafe SectorAlignedMemory Get(int numRecords) { +#if CHECK_FOR_LEAKS + Interlocked.Increment(ref totalGets); +#endif + int requiredSize = sectorSize + (((numRecords) * recordSize + (sectorSize - 1)) & ~(sectorSize - 1)); int index = Position(requiredSize / sectorSize); if (queue[index] == null) @@ -218,6 +231,9 @@ public unsafe SectorAlignedMemory Get(int numRecords) [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Free() { +#if CHECK_FOR_LEAKS + Debug.Assert(totalGets == totalReturns); +#endif for (int i = 0; i < levels; i++) { if (queue[i] == null) continue; diff --git a/cs/test/LowMemAsyncTests.cs b/cs/test/LowMemAsyncTests.cs new file mode 100644 index 000000000..b2a224333 --- /dev/null +++ b/cs/test/LowMemAsyncTests.cs @@ -0,0 +1,155 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using System.IO; +using NUnit.Framework; +using FASTER.test.recovery.sumstore; +using System.Threading.Tasks; +using System.Threading; + +namespace FASTER.test.async +{ + + [TestFixture] + public class LowMemAsyncTests + { + IDevice log; + FasterKV fht1; + const int numOps = 5000; + string path; + + [SetUp] + public void Setup() + { + path = TestContext.CurrentContext.TestDirectory + "/SimpleAsyncTests/"; + log = new LocalMemoryDevice(1L << 30, 1L << 25, 1, latencyMs: 20); + // log = Devices.CreateLogDevice(path + "Async.log", deleteOnClose: true); + Directory.CreateDirectory(path); + fht1 = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 12 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + } + + [TearDown] + public void TearDown() + { + fht1.Dispose(); + log.Dispose(); + new DirectoryInfo(path).Delete(true); + } + + [Test] + [Category("FasterKV")] + [Ignore("Concurrency after await being addressed")] + public async Task ConcurrentUpsertReadAsyncTest() + { + await Task.Yield(); + using var s1 = fht1.NewSession(new SimpleFunctions((a, b) => a + b)); + + // First Upsert all keys + var tasks = new ValueTask.UpsertAsyncResult>[numOps]; + for (long key = 0; key < numOps; key++) + { + tasks[key] = s1.UpsertAsync(ref key, ref key); + } + + bool done = false; + while (!done) + { + done = true; + for (long key = 0; key < numOps; key++) + { + var result = await tasks[key].ConfigureAwait(false); + if (result.Status == Status.PENDING) + { + done = false; + tasks[key] = result.CompleteAsync(); + + } + } + } + + // Then Read all keys + var readtasks = new ValueTask.ReadAsyncResult>[numOps]; + for (long key = 0; key < numOps; key++) + { + readtasks[key] = s1.ReadAsync(ref key, ref key); + } + + for (long key = 0; key < numOps; key++) + { + var result = (await readtasks[key].ConfigureAwait(false)).Complete(); + Assert.IsTrue(result.status == Status.OK && result.output == key); + } + } + + [Test] + [Category("FasterKV")] + [Ignore("Concurrency after await being addressed")] + public async Task ConcurrentUpsertRMWReadAsyncTest() + { + await Task.Yield(); + using var s1 = fht1.NewSession(new SimpleFunctions((a, b) => a + b)); + + // First upsert all keys + var tasks = new ValueTask.UpsertAsyncResult>[numOps]; + for (long key = 0; key < numOps; key++) + { + tasks[key] = s1.UpsertAsync(ref key, ref key); + } + + bool done = false; + while (!done) + { + done = true; + for (long key = 0; key < numOps; key++) + { + var result = await tasks[key].ConfigureAwait(false); + if (result.Status == Status.PENDING) + { + done = false; + tasks[key] = result.CompleteAsync(); + } + } + } + + // Then RMW all keys + var rmwtasks = new ValueTask.RmwAsyncResult>[numOps]; + for (long key = 0; key < numOps; key++) + { + rmwtasks[key] = s1.RMWAsync(ref key, ref key); + } + + done = false; + while (!done) + { + done = true; + for (long key = 0; key < numOps; key++) + { + var result = await rmwtasks[key].ConfigureAwait(false); + if (result.Status == Status.PENDING) + { + done = false; + rmwtasks[key] = result.CompleteAsync(); + } + } + } + + // Then Read all keys + var readtasks = new ValueTask.ReadAsyncResult>[numOps]; + for (long key = 0; key < numOps; key++) + { + readtasks[key] = s1.ReadAsync(ref key, ref key); + } + + for (long key = 0; key < numOps; key++) + { + var result = (await readtasks[key].ConfigureAwait(false)).Complete(); + Assert.IsTrue(result.status == Status.OK && result.output == key + key); + } + } + } +}