-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exposing FASTER log as a first-class abstraction #177
Conversation
Note that this log can live on top of any device that implements the
|
Checked in log commit and recovery support. By default, we flush the log at page boundaries. But, the user can manually flush more frequently, e.g., every 10ms. The playground sample has been updated to show this facility as well. You can kill and restart the sample as many times as you want, it will auto-resume. |
As for end to end performance, for 100 byte payloads, we get around 1GB/sec to a modern NVMe SSD with a single thread, with log tail increment being the bottleneck as we increase the number of append threads. For 1000 byte payloads, we get close to the SSD bandwidth limit (1.8 - 2 GB/sec) using one thread. |
BlockAllocate(4 + length, out long logicalAddress); | ||
var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); | ||
*(int*)physicalAddress = length; | ||
fixed (byte* bp = &entry.GetPinnableReference()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of pinning the entry, you can use the CopyTo API (which is also apparently faster in many cases than other memory copy APIs).
entry.CopyTo(new Span((void*)(4 + physicalAddress), length));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually benchmarked the two alternatives and went with pinning as it was non-trivially faster.
In Iterator.GetNext, there is a code path that allocates a byte[] and copies the read data to it.
|
/// <returns>Device instance</returns> | ||
public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED) | ||
public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anticipating the meta-device package, this static factory will not be sufficient to encapsulate the variety of device created. Certainly not every device will care about all of these flags.
Maybe instead of adding another boolean flag, we should start moving away from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, but the goal was to unblock current users because the recovery path was causing slowdown and incorrectness in certain cases. The goal was to make "no recovery" as the default so current users default to that.
if (segmentId != prevSegmentId + 1) | ||
{ | ||
startSegment = segmentId; | ||
|
||
} | ||
else | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to truncate the segments that are not tracked by this device?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean delete existing files when the device is created? That would depend upon whether the device is being used as new or for recovery.
@@ -48,14 +51,19 @@ private void RecoverFiles() | |||
|
|||
string bareName = fi.Name; | |||
|
|||
int prevSegmentId = -1; | |||
List<int> segids = new List<int>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am missing something, but it's unclear to me why the list is required. Is it just for readability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list of files comes in alphabetical order by default, not numerical. Caused a bug without sorting.
/// </summary> | ||
/// <param name="address">Address committed until (for information only, not necessary to persist)</param> | ||
/// <param name="commitMetadata">Commit metadata</param> | ||
void Commit(long address, byte[] commitMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if byte[]
is too low level of an abstraction. Perhaps an interface with a custom serialization/deserialization method is a better choice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of that and even tried to prototype it but it was not as convenient in the end because the user had to implement too many callbacks. This can be revisited though.
/// </summary> | ||
/// <param name="entries"></param> | ||
/// <returns>Logical address of last added entry</returns> | ||
public unsafe long Append(List<byte[]> entries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to having a batch append API. However this API forces too many allocations - 1 for the list, and 1 array allocation for each entry in this case:
If the serialized byte array is generated by a serializer that pools its' buffers (like Microsoft Bond), most likely it'll return the result as an ArraySegment / Span / IMemoryOwner.
How does something like this look:
(note that Span cannot be used here in place of ArraySegment, details here: https://adamsitnik.com/Span/#span-must-not-be-a-generic-type-argument)
(this can be further optimized to ask the caller for the totalLength as a function parameter and do only one BlockAllocate outside the for loop, not sure if that's worth it)
public unsafe bool TryAppend<TState>(int numEntries, Func<int, TState, ArraySegment<byte>> getBytes, TState state, out long logicalAddress)
{
epoch.Resume();
logicalAddress = 0;
long tail = -allocator.GetTailAddress();
allocator.CheckForAllocateComplete(ref tail);
if (tail < 0)
{
epoch.Suspend();
return false;
}
for (int i = 0; i < numEntries; i++)
{
Span<byte> entry = getBytes(i, state);
var length = entry.Length;
BlockAllocate(4 + length, out logicalAddress);
var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
entry.CopyTo(new Span<byte>((void*)(4 + physicalAddress), length));
}
epoch.Suspend();
return true;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an aside - Does the batch partially show up to the iterator while its still being written? Or does the reader only see the first event from the batch after epoch.Suspend() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hiteshmadan
I don't understand everything thats going on in this PR, but if you want to use a span instead of an array segment you just need to declare a custom delegate like
public delegate ReadOnlySpan<byte> ByteGetter<TState>(int index, TState state);
or something like it. That sidesteps the generic parameter constraint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After all the optimizations and inlining, there isn't much benefit to a batched interface in terms of performance, because acquire and release epoch - which were the only calls getting amortized - have been made very inexpensive. Also, we can't allocate for the batch in bulk because the page sizes are pretty small. Maybe we should stick to the unbatched interface?
return false; | ||
} | ||
var length = entry.Length; | ||
BlockAllocate(4 + length, out logicalAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is already pending data waiting to be flushed + the incoming entry is big enough to not fit on the space remaining on the current page + no more pages can be allocated, BlockAllocate will block the thread waiting for the allocation to go through.
Is there any way for the early-exit check to account for entry.Length too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only made the common path non blocking right now. Do we need it to always be non blocking? The system atomically allocates space when when it's not yet ready to use, then waits for it to become usable. If we want it to be non blocking, we will need to surface this in the API, i.e. TryAppend will return an incomplete address that the user will need to compete by calling TryCompleteAppend. Is this what you are looking for?
Correct, when reads and writes share the buffer, we have to copy out because we can only access memory safely under epoch protection. Agreed that memory pool is a possibility. If we had that, a cleaner design might be to always copy out to the user-provided buffer regardless of where we are reading from. This will address your concern in point 2 as well (currently, you must give up the span as soon as you invoke another GetNext). |
Fyi, there is a check-in coming in to reduce the number of pages needed in memory. This required a couple of internal changes for correctness. |
Adding support for low memory footprint (4 pages) Added support for odd-sized payloads in presence of holes in log Fixed concurrency issue that occurs with low num of pages Improved max throughput by eliminating a 10ms sleep in BlockAllocate Misc cleanup of logic to track flush and close addresses in log
Adding truly non-blocking TryAppend functionality. See sample for how this is used.
We now have both low-mem support (as low as 4 pages) as well as truly non-blocking TryAppend. We also have an async API variant for append (AppendAsync) that returns only when append flush is done, but this is a rough API that needs further feedback/review. There are also significant performance implications of using this interface. Find the prototype here: #180 |
* Added support for TryAppend. Removed List-based batch support. * Added non-blocking TryAppend * Added span variant * Fix definition of SecondChanceFraction for read cache, to be 1 - MutableFraction of the log. * Added async FlushAndCommit * Added batched version by separating out in-memory append and wait for commit - gives better perf as the first operation is usually sync * Tweak async sample to get back to 2GB/sec * Other updates: 1) Allocations can handle thousands of parallel tasks 2) Removed concept of negative address - allocations are always over available pages 3) Improved scan interface to allow user memory pooling 4) Exposed commit task 5) Cleaned up sample * Added check for entry fitting on single page * Added batch interface (sync and async) to log append.
This PR is almost ready to merge, and definitely ready to try out. Features include: (1) Support for blocking Append, TryAppend, and AppendAsync See https://github.com/microsoft/FASTER/blob/master/docs/cs/FasterLog.md for a first draft of usage guide, and the comprehensive sample with comments at https://github.com/microsoft/FASTER/blob/fasterlog/cs/playground/FasterLogSample/Program.cs PendingWe need to refine/enhance the ways that pooled Span can be provided to the iterator; currently we defined a delegate called |
…emory to FasterLogSettings instead of Scan. Speed up TruncateUntil. Updated nuspec.
…not change. Added CommittedBeginAddress metric.
Summary of Interface to FasterLog// Enqueue log entry (to memory) with spain-wait
long Enqueue(byte[] entry)
long Enqueue(ReadOnlySpan<byte> entry)
long Enqueue(IReadOnlySpanBatch readOnlySpanBatch)
// Try to enqueue log entry (to memory)
bool TryEnqueue(byte[] entry, out long logicalAddress)
bool TryEnqueue(ReadOnlySpan<byte> entry, out long logicalAddress)
bool TryEnqueue(IReadOnlySpanBatch readOnlySpanBatch, out long logicalAddress)
// Async enqueue log entry (to memory)
async ValueTask<long> EnqueueAsync(byte[] entry)
async ValueTask<long> EnqueueAsync(ReadOnlyMemory<byte> entry)
async ValueTask<long> EnqueueAsync(IReadOnlySpanBatch readOnlySpanBatch)
// Wait for commit
void WaitForCommit(long untilAddress = 0) // spin-wait
async ValueTask WaitForCommitAsync(long untilAddress = 0)
// Commit
void Commit(bool spinWait = false)
async ValueTask CommitAsync()
// Helper: enqueue log entry and spin-wait for commit
long EnqueueAndWaitForCommit(byte[] entry)
long EnqueueAndWaitForCommit(ReadOnlySpan<byte> entry)
long EnqueueAndWaitForCommit(IReadOnlySpanBatch readOnlySpanBatch)
// Helper: enqueue log entry and async wait for commit
async ValueTask<long> EnqueueAndWaitForCommitAsync(byte[] entry)
async ValueTask<long> EnqueueAndWaitForCommitAsync(ReadOnlyMemory<byte> entry)
async ValueTask<long> EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch readOnlySpanBatch)
// Truncate log (from head)
void TruncateUntil(long untilAddress)
// Scan interface
FasterLogScanIterator Scan(long beginAddress, long endAddress)
// FasterLogScanIterator interface
bool GetNext(out byte[] entry, out int entryLength)
bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry, out int entryLength)
async ValueTask WaitAsync()
// Random read
async ValueTask<(byte[], int)> ReadAsync(long address, int estimatedLength = 0) |
…sk to completed state.
Added checksum support for log verification during scan/read. Enable by setting FasterLogSettings.LogChecksum. |
Update: we are in the final stages of this PR. Working on correctly handling and surfacing exceptional cases, such as transient and permanent storage failures. This is WIP in a branch (fasterlog-exceptions). ETA within a couple of days. Edit: exception support is taking a bit longer due to subtle corner cases involving parallel flush and error conditions. Will update here when merged. |
…ved spin-wait for adjacent flush completion.
* Added storage exception handling, connecting to tasks. * Cleanup of error handling, control when exception is bubbled up to user. * Added yield in NeedToWait * Improved iterator support in case of exception
IAsyncEnumerable support has now been added for the iterator. |
In addition to the previously supported TruncateUntil on the log, we now support persistent iterators. You can create any number of iterators over the log, and "name" them if you need them to be part of commits. During recovery, if you create an iterator with the same name, we will resume iteration from the last committed iterator location. Example: using (iter = log.Scan(log.BeginAddress, long.MaxValue, name: "foo"))
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
{
...
} |
Detailed documentation is now available at: |
Merging to master and closing PR as the functionality is complete at this point. We can continue discussions here, and create new PRs for further enhancements as well. |
We are exposing our high-speed latch-free log (or write-ahead log) facility as a generally usable first-class citizen, called
FasterLog
. You can append variable-sized chunks (asbyte[]
orSpan<byte>
) to the log. There is no additional header in the log. Users can perform (potentially tailing) pull-based iteration over any range of the log (including until a future log address). Users can truncate from the head of the log. They can also flush the log at very fine granularity, lower than the usual per-page flushing. All aspects of the log (page size, number of pages in memory, segment size on disk, etc.) are tunable. The log works with async code as well, and we will add async versions of the API going forward.Note that there is no random lookup index on top of this log - if you want such an indexed log, use
FasterKv
as usual.Improvements to Epochs
As part of this PR, we have also significantly optimized the epoch framework, removing important performance bottlenecks that now make it feasible to acquire and release epochs at a very fine granularity. We can perform 35 million acquire/release operation pairs per second per thread, with linear scalability. If users are able to pre-acquire a thread for the operations, we are able to achieve 100 million acquire/release operation pairs per second per thread. This improvement now makes it possible to make FASTER work at high performance in task-based async environments.