-
Notifications
You must be signed in to change notification settings - Fork 570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C#] Trying out Faster as a highly concurrent object cache #403
Comments
No, this is not needed if you are not concerned with persistence.
Generally, yes. Our default SimpleFunctions handle the common cases.
No
Yes, it should be thread-safe. |
Also, I see you are trying to pool sessions, which is reasonable if you don't want to associate sessions with caller instances/threads a priori. It might be worth comparing performance with a ConcurrentQueue as an alternative data structure for this. There might be contention on the single queue with large number of threads, so you could partition the queue by creating an array of say 8 queues, and choosing a random queue to dequeue from (enqueue back into the same queue). The other option is to keep the session as a [ThreadStatic] (thread-local variable), while also keeping a global list of sessions for the purpose of disposing at the end. So you only allocate a session when the thread-local value is null. This will avoid accessing a concurrent data structure on the critical path. |
That is indeed an accurate summary! |
Thank you for the detailed response! The session/per thread suggestion makes a lot of sense, I've incorporated it and further generalized my wrapper. Unfortunately I've run into an intermittent For quick reference, the wrapper: using FASTER.core;
using MessagePack;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
namespace Repro.Faster.IndexOutOfRangeException
{
public class MixedStorageDictionary<TKey, TValue> : IMixedStorageDictionary<TKey, TValue>
{
// Serialization
private static readonly MessagePackSerializerOptions _messagePackOptions = MessagePackSerializerOptions.
Standard.
WithCompression(MessagePackCompression.Lz4BlockArray);
private static readonly SimpleFunctions<TKey, TValue> _simpleFunctions = new();
// Sessions (1 per thread)
[ThreadStatic]
private static ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>>? _session;
// Disposal
private static readonly ConcurrentBag<ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>>> _sessions = new();
private readonly FasterKV<TKey, TValue> _store;
private bool _disposed;
public MixedStorageDictionary(string? logFileDirectory = null,
string? logFileName = null,
int pageSizeBits = 12, // 4 KB pages
int inMemorySpaceBits = 13, // 2 pages
long indexNumBuckets = 1L << 20) // 64 MB index
{
var serializerSettings = new SerializerSettings<TKey, TValue>
{
valueSerializer = () => new Serializer()
};
logFileDirectory ??= Path.Combine(Path.GetTempPath(), "FasterLogs");
logFileName ??= Guid.NewGuid().ToString();
IDevice log = Devices.CreateLogDevice(Path.Combine(logFileDirectory, $"{logFileName}.log"));
IDevice objlog = Devices.CreateLogDevice(Path.Combine(logFileDirectory, $"{logFileName}.obj.log"));
var logSettings = new LogSettings
{
LogDevice = log,
ObjectLogDevice = objlog,
PageSizeBits = pageSizeBits,
MemorySizeBits = inMemorySpaceBits
};
_store = new(size: indexNumBuckets,
logSettings: logSettings,
serializerSettings: serializerSettings);
}
public void Upsert(TKey key, TValue obj)
{
(_session ?? CreateSession()).Upsert(key, obj);
}
public async ValueTask<(Status, TValue)> ReadAsync(TKey key)
{
return (await (_session ?? CreateSession()).ReadAsync(key).ConfigureAwait(false)).Complete();
}
private ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>> CreateSession()
{
_session = _store.For(_simpleFunctions).NewSession<SimpleFunctions<TKey, TValue>>();
_sessions.Add(_session);
return _session;
}
private class Serializer : BinaryObjectSerializer<TValue>
{
public override void Deserialize(out TValue obj)
{
obj = MessagePackSerializer.Deserialize<TValue>(reader.BaseStream, _messagePackOptions);
}
public override void Serialize(ref TValue obj)
{
MessagePackSerializer.Serialize(writer.BaseStream, obj, _messagePackOptions);
}
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_store.Dispose();
foreach (ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>> session in _sessions)
{
session.Dispose();
}
}
_disposed = true;
}
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
} Repro program: public static async Task Main()
{
var mixedStorageDictionary = new MixedStorageDictionary<int, string>();
int numRecords = 1000;
string dummyRecordValue = "dummyString";
// Concurrent upserts
var upsertTasks = new Task[numRecords];
for (int i = 0; i < numRecords; i++)
{
int key = i;
upsertTasks[i] = Task.Run(() => mixedStorageDictionary.Upsert(key, dummyRecordValue));
}
await Task.WhenAll(upsertTasks).ConfigureAwait(false);
// Concurrent reads
var readTasks = new Task<(Status, string)>[numRecords];
for (int i = 0; i < numRecords; i++)
{
readTasks[i] = mixedStorageDictionary.ReadAsync(i).AsTask();
}
await Task.WhenAll(readTasks).ConfigureAwait(false);
// Assert
for (int i = 0; i < numRecords; i++)
{
(Status status, string result) = readTasks[i].Result;
Assert.Equal(Status.OK, status);
Assert.Equal(dummyRecordValue, result);
}
Console.WriteLine("Success!");
} Stack trace:
The exception occurs at random. Increasing Edit: The earlier, naïve " |
At a quick glance, I think the thread local idea doesn't work because the continuation of async will continue to use the same session (on the continuation thread). At the same time a different task assigned to the original thread will start using the session from the thread local. This will break the invariant of "one session is used at most once at the same time by some thread" ("mono-threadedness"). |
Do you expect most Reads to happen synchronously, or will it go to disk most of the time? Nice, clean sample by the way, would be nice to convert it eventually into an official sample to show best way to manage sessions internally to a non-session-based KV API. |
Here is a version that combines the idea of thread-local with shared pool used only when ops go async (I found queue to be slightly faster than bag in my experiments, for the shared pool): public class MixedStorageDictionary<TKey, TValue> : IMixedStorageDictionary<TKey, TValue>
{
// Serialization
private static readonly MessagePackSerializerOptions _messagePackOptions = MessagePackSerializerOptions.
Standard.
WithCompression(MessagePackCompression.Lz4BlockArray);
private static readonly SimpleFunctions<TKey, TValue> _simpleFunctions = new();
// Sessions (1 per thread)
[ThreadStatic]
private static ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>>? _session;
// Shared pool
private static readonly ConcurrentQueue<ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>>> _sessions = new();
// Track all created sessions for dispose
private static readonly ConcurrentBag<ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>>> _allSessions = new();
private readonly FasterKV<TKey, TValue> _store;
private bool _disposed;
public MixedStorageDictionary(string? logFileDirectory = null,
string? logFileName = null,
int pageSizeBits = 12, // 4 KB pages
int inMemorySpaceBits = 13, // 2 pages
long indexNumBuckets = 1L << 20) // 64 MB index
{
var serializerSettings = new SerializerSettings<TKey, TValue>
{
valueSerializer = () => new Serializer()
};
logFileDirectory ??= Path.Combine(Path.GetTempPath(), "FasterLogs");
logFileName ??= Guid.NewGuid().ToString();
IDevice log = Devices.CreateLogDevice(Path.Combine(logFileDirectory, $"{logFileName}.log"));
IDevice objlog = Devices.CreateLogDevice(Path.Combine(logFileDirectory, $"{logFileName}.obj.log"));
var logSettings = new LogSettings
{
LogDevice = log,
ObjectLogDevice = objlog,
PageSizeBits = pageSizeBits,
MemorySizeBits = inMemorySpaceBits
};
_store = new(size: indexNumBuckets,
logSettings: logSettings,
serializerSettings: serializerSettings);
}
public void Upsert(TKey key, TValue obj)
{
(_session ?? GetSession()).Upsert(key, obj);
}
public async ValueTask<(Status, TValue)> ReadAsync(TKey key)
{
var session = _session ?? GetSession();
var t = session.ReadAsync(key);
// Retain thread-local in sync path
if (t.IsCompleted)
return t.Result.Complete();
// Going async - remove session from thread-local
_session = null;
var result = (await t.ConfigureAwait(false)).Complete();
// Return session to shared pool on async thread
_sessions.Enqueue(session);
return result;
}
private ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>> GetSession()
{
if (_sessions.TryDequeue(out ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>>? result))
{
_session = result;
return result;
}
var session = _store.For(_simpleFunctions).NewSession<SimpleFunctions<TKey, TValue>>();
_allSessions.Add(session);
_session = session;
return session;
}
private class Serializer : BinaryObjectSerializer<TValue>
{
public override void Deserialize(out TValue obj)
{
obj = MessagePackSerializer.Deserialize<TValue>(reader.BaseStream, _messagePackOptions);
}
public override void Serialize(ref TValue obj)
{
MessagePackSerializer.Serialize(writer.BaseStream, obj, _messagePackOptions);
}
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_store.Dispose();
foreach (ClientSession<TKey, TValue, TValue, TValue, Empty, SimpleFunctions<TKey, TValue>> session in _allSessions)
{
session.Dispose();
}
}
_disposed = true;
}
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
} |
Note: above will only help if reads tend to return synchronously (data is in memory) most of the time. Upserts could also benefit as they return synchronously. Otherwise, your earlier solution is fine, and even preferable for its cleanliness. |
The other thing with your example is the read concurrency is limited as you are not issuing requests in parallel (only the disk I/O completion gets parallelized). You can get better throughput and disk utilization by parallelizing the issuing of requests, as the following example shows (vary public class Program
{
public static void Main()
{
var mixedStorageDictionary = new MixedStorageDictionary<int, int>(indexNumBuckets: 1L << 22);
int numRecords = 20_000_000;
for (int i = 0; i < numRecords; i++)
mixedStorageDictionary.Upsert(i, 45);
Console.WriteLine("upsert done");
int issueParallel = 4; // vary this to increase read issue parallelism
Task[] tasks = new Task[issueParallel];
Random[] r = new Random[issueParallel];
for (int i = 0; i < issueParallel; i++)
r[i] = new Random(i);
int cnt = 0;
Stopwatch sw = new Stopwatch();
sw.Start();
while (true)
{
for (int i = 0; i < issueParallel; i++)
tasks[i] = ReadIssuer(r[i]);
Task.WaitAll(tasks);
}
async Task ReadIssuer(Random r)
{
await Task.Yield();
int batchSize = 1000;
var readTasks = new ValueTask<(Status, int)>[batchSize];
for (int i = 0; i < batchSize; i++)
{
readTasks[i] = mixedStorageDictionary.ReadAsync(r.Next(numRecords));
}
for (int i = 0; i < batchSize; i++)
await readTasks[i].ConfigureAwait(false);
if (Interlocked.Increment(ref cnt) % 1000 == 0)
{
sw.Stop();
Console.WriteLine($"Time for {1000 * batchSize} ops = {sw.ElapsedMilliseconds}ms");
sw.Restart();
}
}
}
} |
Diagnosis makes sense. As an aside, I looked up It felt wasteful placing a I've gone with your suggested solution of a fast-path for sync and a
Thanks. I'm finding Faster to be really useful. Makes it possible to use SSDs as "extended RAM" which allows for fewer round trips to the DB etc. I suspect simplified APIs could widen the user-base significantly. I've cleaned up the repro with your suggestions and a couple of syntactic changes for consistency with my other code. Will paste the full thing at the end of this thread for reference.
Yeah you're right, logic up till waiting for IO is sequential in the example program. All that said, I've run into an issue with log compaction. I've updated the repro to reproduce the issue. Some snippets for quick reference: public class MixedStorageDictionary<TKey, TValue> : IMixedStorageDictionary<TKey, TValue>
{
...
// Log compaction
private readonly LogAccessor<TKey, TValue> _logAccessor;
private readonly int _operationsBetweenLogCompactions;
private int _operationsSinceLastLogCompaction = 0; // We're using interlocked so this doesn't need to be volatile
...
public MixedStorageDictionary(string? logFileDirectory = null,
string? logFileName = null,
int pageSizeBits = 12, // 4 KB pages
int inMemorySpaceBits = 13, // 2 pages
long indexNumBuckets = 1L << 20, // 64 MB index
int operationsBetweenLogCompactions = 1000) // Number of upserts and deletes between compaction attempts
{
...
_operationsBetweenLogCompactions = operationsBetweenLogCompactions;
}
public void Upsert(TKey key, TValue obj)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(MixedStorageDictionary<TKey, TValue>));
}
GetSession().Upsert(key, obj);
CompactLogIfRequired();
}
public Status Delete(TKey key)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(MixedStorageDictionary<TKey, TValue>));
}
Status result = GetSession().Delete(key);
CompactLogIfRequired();
return result;
}
...
private void CompactLogIfRequired()
{
// Faster log addresses:
//
// (oldest entries here) BeginAddress <= HeadAddress (where the in-memory region begins) <= SafeReadOnlyAddress (entries between here and tail updated in-place) < TailAddress (entries added here)
//
// If _operationsSinceLastLogCompaction < _operationsBetweenLogCompactions, it's not yet time to compact.
// If _operationsSinceLastLogCompaction > _operationsBetweenLogCompactions, we're attempting to compact, do nothing.
if (Increment(ref _operationsSinceLastLogCompaction) != _operationsBetweenLogCompactions)
{
return;
}
if (_logAccessor.BeginAddress == _logAccessor.SafeReadOnlyAddress) // All records fit within update-in-place region, nothing to compact
{
Exchange(ref _operationsSinceLastLogCompaction, 0);
return;
}
// TOOD throws
CompactLog();
// Run asynchronously
// Task.Run(CompactLog);
}
private void CompactLog()
{
long compactUntilAddress = (long)(_logAccessor.BeginAddress + 0.2 * (_logAccessor.SafeReadOnlyAddress - _logAccessor.BeginAddress));
Console.WriteLine("compacting");
// TODO throws
_store.For(_simpleFunctions).NewSession<SimpleFunctions<TKey, TValue>>().Compact(compactUntilAddress, shiftBeginAddress: true);
// GetSession().Compact(compactUntilAddress, shiftBeginAddress: true);
Exchange(ref _operationsSinceLastLogCompaction, 0);
}
...
} I've also slightly tweaked
|
int _compact = 0;
private void CompactLog()
{
if (CompareExchange(ref _compact, 1, 0) == 0)
{
long compactUntilAddress = (long)(_logAccessor.BeginAddress + 0.2 * (_logAccessor.SafeReadOnlyAddress - _logAccessor.BeginAddress));
Console.WriteLine("compacting");
// TODO throws
_store.For(_simpleFunctions).NewSession<SimpleFunctions<TKey, TValue>>().Compact(compactUntilAddress, shiftBeginAddress: true);
// GetSession().Compact(compactUntilAddress, shiftBeginAddress: true);
Exchange(ref _operationsSinceLastLogCompaction, 0);
_compact = 0;
}
} |
Also, it would be better would be to have a separate compaction task that wakes periodically, checks if the log on disk has grown too much (by comparing Log.BeginAddress and Log.SafeReadOnlyAddress), and if it is larger than a threshold, then does the compaction and goes to sleep. Checking for this on every Upsert/Delete is rather expensive. The workload should benefit from log compaction, i.e., during compaction there should be sufficient dead records that compaction is useful. Otherwise, if all records are still live, then compaction will simply waste cycles and write the records back to the tail of the log. |
Thanks for the quick fix! I considered a threshold initially - compact when (SafeReadOnly - Begin)/RecordSize > threshold. The thing that bothered me about it was, what happens when I figured updates and deletes (once pushed outside the readonly region) pretty much guarantee gains from compaction, hence the counter approach. The insert thing bothered me too, I understand why its pointless to "compact" a sequence of inserts, but upsert couldn't be replaced with separate update and insert operations and most of my inserts occur close to application startup. That said, points taken on overhead of checking every upsert/delete and how bad the counter approach is when it's all inserts. I ended up using a Extracted my wrapper to a standalone repo for reuse across projects. Will share here after I've neatened it up. |
Great. I think some version of a session-free API should make it into this repo as well, as a sample at first but eventually supported natively. Feel free to send a pull request! |
Jering.KeyValueStore. Working Faster wrapper that handles:
Doesn't support persistence. Usage: var mixedStorageKVStore = new MixedStorageKVStore<int, string>();
// Insert
mixedStorageKVStore.Upsert(0, "dummyString1");
// Read
(Status status, string? result) = await mixedStorageKVStore.ReadAsync(0).ConfigureAwait(false);
Assert.Equal(Status.OK, status);
Assert.Equal("dummyString1", result);
// Update
mixedStorageKVStore.Upsert(0, "dummyString2");
// Verify updated
(status, result) = await mixedStorageKVStore.ReadAsync(0).ConfigureAwait(false);
Assert.Equal(Status.OK, status);
Assert.Equal("dummyString2", result);
// Delete
mixedStorageKVStore.Delete(0);
// Verify deleted
(status, result) = await mixedStorageKVStore.ReadAsync(0).ConfigureAwait(false);
Assert.Equal(Status.NOTFOUND, status);
Assert.Null(result); |
Great man, as an application developer this is a point to start, way easier to use than the core lib. |
Hi! I'm trying out faster as a highly concurrent object-cache that spans memory and disk. It's working great!
It feels like there are a lot of knobs to turn and nuances to grasp though, so I'm worried I might've done things wrongly. Would greatly appreciate comments on my wrapper class below. Some use case details:
Faster wrapper class:
Questions:
In the async store example
WaitForCommitAsync
is called afterUpsert
:FASTER/cs/samples/StoreAsyncApi/Program.cs
Lines 85 to 86 in b1d7937
Is this necessary if I'm not concerned about persisting data on disk?
Am I right to say callback functions are primarily for when you want to read/write just part of an object (partial updates) or RMW? If I'm not doing either do I need to worry about
IFunction
's other members,ConcurrentReader
,ConcurrentWriter
?In my simple use-case, do I need to worry about
log compactionand checkpoints?Is it safe to call
session.Compact(compactUntil, shiftBeginAddress: true)
while concurrently reading/writing using other session instances?Edit:
x
has versionsv1
andv2
in the log,v1
is dropped. "Truncation" drops older entries. This may result in data loss, e.g. if bothv1
andv2
ofx
exist in the set of entries dropped, the next read forx
will result inStatus.NotFound
. I've added periodicsession.Compact(compactUntil, shiftBeginAddress: true)
calls to my wrapper.The text was updated successfully, but these errors were encountered: