Skip to content

Commit

Permalink
Fasterlog async (#180)
Browse files Browse the repository at this point in the history
* Added support for TryAppend. Removed List-based batch support.
* Added non-blocking TryAppend
* Added span variant
* Fix definition of SecondChanceFraction for read cache, to be 1 - MutableFraction of the log.
* Added async FlushAndCommit
* Added batched version by separating out in-memory append and wait for commit - gives better perf as the first operation is usually sync
* Tweak async sample to get back to 2GB/sec

* Other updates:
1) Allocations can handle thousands of parallel tasks
2) Removed concept of negative address - allocations are always over available pages
3) Improved scan interface to allow user memory pooling
4) Exposed commit task
5) Cleaned up sample

* Added check for entry fitting on single page
* Added batch interface (sync and async) to log append.
  • Loading branch information
badrishc authored Oct 3, 2019
1 parent 4504937 commit b06d112
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 246 deletions.
2 changes: 1 addition & 1 deletion cs/playground/FasterLogSample/FasterLogSample.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net46</TargetFramework>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifier>win7-x64</RuntimeIdentifier>
</PropertyGroup>
Expand Down
170 changes: 136 additions & 34 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;

namespace FasterLogSample
{
public class Program
{
const int entryLength = 996;
// Entry length can be between 1 and ((1 << FasterLogSettings.PageSizeBits) - 4)
const int entryLength = 1 << 10;
static readonly byte[] staticEntry = new byte[entryLength];
static readonly SpanBatch spanBatch = new SpanBatch(10);
static FasterLog log;

static void ReportThread()
Expand All @@ -26,8 +30,8 @@ static void ReportThread()
var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Throughput: {0} MB/sec",
(nowValue - lastValue) / (1000*(nowTime - lastTime)));
Console.WriteLine("Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastTime = nowTime;
lastValue = nowValue;
}
Expand All @@ -37,31 +41,29 @@ static void CommitThread()
{
while (true)
{
Thread.Sleep(100);
Thread.Sleep(5);
log.FlushAndCommit(true);

// Async version
// await Task.Delay(5);
// await log.FlushAndCommitAsync();
}
}

static void AppendThread()
{
byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
entry[i] = (byte)i;

while (true)
{
// Sync append
log.Append(entry);

// We also support a Span-based variant of Append

// We also support TryAppend to allow throttling/back-off
// (expect this to be slightly slower than the sync version)
// Make sure you supply a "starting" logical address of 0
// Retries must send back the current logical address.
//
// long logicalAddress = 0;
// while (!log.TryAppend(entry, ref logicalAddress)) ;
// TryAppend - can be used with throttling/back-off
// Accepts byte[] and Span<byte>
while (!log.TryAppend(staticEntry, out _)) ;

// Synchronous blocking append
// Accepts byte[] and Span<byte>
// log.Append(entry);

// Batched append - batch must fit on one page
// while (!log.TryAppend(spanBatch, out _)) ;
}
}

Expand All @@ -73,9 +75,11 @@ static void ScanThread()

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
{
entry[i] = (byte)i;
var entrySpan = new Span<byte>(entry);
}

var entrySpan = new Span<byte>(entry);

long lastAddress = 0;
Span<byte> result;
Expand All @@ -84,16 +88,25 @@ static void ScanThread()
while (true)
{
while (!iter.GetNext(out result))
{
Thread.Sleep(1000);
}

if (!result.SequenceEqual(entrySpan))
{
throw new Exception("Invalid entry found at offset " + FindDiff(result, entrySpan));
if (result.Length != entrySpan.Length)
throw new Exception("Invalid entry found, expected length " + entrySpan.Length + ", actual length " + result.Length);
else
throw new Exception("Invalid entry found at offset " + FindDiff(result, entrySpan));
}

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Append(result);
}

if (iter.CurrentAddress - lastAddress > 500000000)
if (iter.CurrentAddress - lastAddress > 500_000_000)
{
log.TruncateUntil(iter.CurrentAddress);
lastAddress = iter.CurrentAddress;
Expand All @@ -102,9 +115,9 @@ static void ScanThread()
}
}

static int FindDiff(Span<byte> b1, Span<byte> b2)
private static int FindDiff(Span<byte> b1, Span<byte> b2)
{
for (int i=0; i<b1.Length; i++)
for (int i = 0; i < b1.Length; i++)
{
if (b1[i] != b2[i])
{
Expand All @@ -114,21 +127,110 @@ static int FindDiff(Span<byte> b1, Span<byte> b2)
return 0;
}

/// <summary>
/// Main program entry point
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
bool sync = true;
var device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
log = new FasterLog(new FasterLogSettings { LogDevice = device });

new Thread(new ThreadStart(AppendThread)).Start();

// Can have multiple append threads if needed
// new Thread(new ThreadStart(AppendThread)).Start();

new Thread(new ThreadStart(ScanThread)).Start();
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();
// Populate entry being inserted
for (int i = 0; i < entryLength; i++)
{
staticEntry[i] = (byte)i;
}

if (sync)
{
// Append thread: create as many as needed
new Thread(new ThreadStart(AppendThread)).Start();

// Threads for scan, reporting, commit
var t1 = new Thread(new ThreadStart(ScanThread));
var t2 = new Thread(new ThreadStart(ReportThread));
var t3 = new Thread(new ThreadStart(CommitThread));
t1.Start(); t2.Start(); t3.Start();
t1.Join(); t2.Join(); t3.Join();
}
else
{
// Async version of demo: expect lower performance
// particularly for small payload sizes

const int NumParallelTasks = 10_000;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
int local = i;
tasks[i] = Task.Run(() => AppendAsync(local));
}

Thread.Sleep(500*1000);
// Threads for scan, reporting, commit
var t1 = new Thread(new ThreadStart(ScanThread));
var t2 = new Thread(new ThreadStart(ReportThread));
var t3 = new Thread(new ThreadStart(CommitThread));
t1.Start(); t2.Start(); t3.Start();
t1.Join(); t2.Join(); t3.Join();

Task.WaitAll(tasks);
}
}

static async Task AppendAsync(int id)
{
bool batched = false;

await Task.Yield();

if (!batched)
{
// Unbatched version - append each item with commit
// Needs high parallelism (NumParallelTasks) for perf
while (true)
{
try
{
await log.AppendAsync(staticEntry);
}
catch (Exception ex)
{
Console.WriteLine($"{nameof(AppendAsync)}({id}): {ex}");
}
}
}
else
{
// Batched version - we append many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
{
await log.AppendToMemoryAsync(staticEntry);
if (count++ % 100 == 0)
{
await log.WaitForCommitAsync();
}
}
}
}

private struct SpanBatch : ISpanBatch
{
private readonly int batchSize;
public SpanBatch(int batchSize) => this.batchSize = batchSize;
public Span<byte> Get(int index) => staticEntry;
public int TotalEntries() => batchSize;
}

}
}
68 changes: 30 additions & 38 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ public long Allocate(int numSlots = 1)
}

// Negate the address if page not ready to be used
if (CannotAllocate(page))
if (CannotAllocateNext(page))
{
address = -address;
}
Expand Down Expand Up @@ -820,14 +820,16 @@ public long Allocate(int numSlots = 1)

/// <summary>
/// Try allocate, no thread spinning allowed
/// May return 0 in case of inability
/// May also return negative address
/// May return 0 in case of inability to allocate
/// </summary>
/// <param name="numSlots"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long TryAllocate(int numSlots = 1)
{
if (numSlots > PageSize)
throw new Exception("Entry does not fit on page");

PageOffset localTailPageOffset = default(PageOffset);

// Necessary to check because threads keep retrying and we do not
Expand Down Expand Up @@ -855,51 +857,36 @@ public long TryAllocate(int numSlots = 1)
// The thread that "makes" the offset incorrect
// is the one that is elected to fix it and
// shift read-only/head.
localTailPageOffset.Page++;
localTailPageOffset.Offset = 0;
TailPageOffset = localTailPageOffset;

long shiftAddress = ((long)(page + 1)) << LogPageSizeBits;
long shiftAddress = ((long)(localTailPageOffset.Page + 1)) << LogPageSizeBits;
PageAlignedShiftReadOnlyAddress(shiftAddress);
PageAlignedShiftHeadAddress(shiftAddress);

return 0;
}
#endregion

long address = (((long)page) << LogPageSizeBits) | ((long)offset);

// Check for TailPageCache hit
if (TailPageCache == page)
{
return address;
}

// Address has been allocated. Negate the address
// if page is not ready to be used.
if (CannotAllocate(page))
{
address = -address;
}

// Update the read-only so that we can get more space for the tail
if (offset == 0)
{
if (address >= 0)
if (CannotAllocate(localTailPageOffset.Page + 1))
{
TailPageCache = page;
Interlocked.MemoryBarrier();
// We should not allocate the next page; reset to end of page
// so that next attempt can retry
localTailPageOffset.Offset = PageSize;
Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset);
return 0;
}

// Allocate next page in advance, if needed
int newPageIndex = (page + 1) % BufferSize;
if ((!IsAllocated(newPageIndex)))
int nextPageIndex = (localTailPageOffset.Page + 2) % BufferSize;
if ((!IsAllocated(nextPageIndex)))
{
AllocatePage(newPageIndex);
AllocatePage(nextPageIndex);
}

localTailPageOffset.Page++;
localTailPageOffset.Offset = 0;
TailPageOffset = localTailPageOffset;

return 0;
}
#endregion

return address;
return (((long)page) << LogPageSizeBits) | ((long)offset);
}

/// <summary>
Expand All @@ -925,7 +912,7 @@ public void CheckForAllocateComplete(ref long address)
PageAlignedShiftHeadAddress(GetTailAddress());

// Check if we can allocate pageIndex
if (CannotAllocate(p.Page))
if (CannotAllocateNext(p.Page))
{
return;
}
Expand All @@ -940,13 +927,18 @@ public void CheckForAllocateComplete(ref long address)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool CannotAllocate(int page)
private bool CannotAllocateNext(int page)
{
return
(page >= BufferSize + (ClosedUntilAddress >> LogPageSizeBits) - 1) ||
!IsAllocated(page % BufferSize);
}

private bool CannotAllocate(int page)
{
return
(page >= BufferSize + (ClosedUntilAddress >> LogPageSizeBits));
}
/// <summary>
/// Used by applications to make the current state of the database immutable quickly
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public int BumpCurrentEpoch(Action onDrain)

if (++i == kDrainListSize)
{
ProtectAndDrain();
i = 0;
if (++j == 500)
{
Expand Down
Loading

0 comments on commit b06d112

Please sign in to comment.