Skip to content

Commit

Permalink
Improved sample, changed GetMemory to use byte[] instead of Span<byte>
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Oct 7, 2019
1 parent db68ae0 commit 5caea66
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 140 deletions.
253 changes: 126 additions & 127 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,133 +14,9 @@ public class Program
// Entry length can be between 1 and ((1 << FasterLogSettings.PageSizeBits) - 4)
const int entryLength = 1 << 10;
static readonly byte[] staticEntry = new byte[entryLength];
static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);
static FasterLog log;
static FasterLogScanIterator iter;

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

static void CommitThread()
{
while (true)
{
Thread.Sleep(5);
log.FlushAndCommit(true);

// Async version
// await Task.Delay(5);
// await log.FlushAndCommitAsync();
}
}

static void AppendThread()
{
while (true)
{
// TryAppend - can be used with throttling/back-off
// Accepts byte[] and ReadOnlySpan<byte>
while (!log.TryAppend(staticEntry, out _)) ;

// Synchronous blocking append
// Accepts byte[] and ReadOnlySpan<byte>
// log.Append(entry);

// Batched append - batch must fit on one page
// while (!log.TryAppend(spanBatch, out _)) ;
}
}

static void ScanThread()
{
Random r = new Random();

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
{
entry[i] = (byte)i;
}

var entrySpan = new Span<byte>(entry);

long lastAddress = 0;
Span<byte> result;
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result, out int length))
{
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (!result.SequenceEqual(entrySpan))
{
if (result.Length != entrySpan.Length)
throw new Exception("Invalid entry found, expected length " + entrySpan.Length + ", actual length " + result.Length);
else
throw new Exception("Invalid entry found at offset " + FindDiff(result, entrySpan));
}

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Append(result);
}

if (iter.CurrentAddress - lastAddress > 500_000_000)
{
log.TruncateUntil(iter.CurrentAddress);
lastAddress = iter.CurrentAddress;
}
}
}
}

private static int FindDiff(Span<byte> b1, Span<byte> b2)
{
for (int i = 0; i < b1.Length; i++)
{
if (b1[i] != b2[i])
{
return i;
}
}
return 0;
}

/// <summary>
/// Main program entry point
/// </summary>
Expand Down Expand Up @@ -200,6 +76,27 @@ static void Main(string[] args)
}
}


static void AppendThread()
{
while (true)
{
// TryAppend - can be used with throttling/back-off
// Accepts byte[] and ReadOnlySpan<byte>
while (!log.TryAppend(staticEntry, out _)) ;

// Synchronous blocking append
// Accepts byte[] and ReadOnlySpan<byte>
// log.Append(entry);

// Batched append - batch must fit on one page
// while (!log.TryAppend(spanBatch, out _)) ;
}
}

/// <summary>
/// Async version of append
/// </summary>
static async Task AppendAsync(int id)
{
bool batched = false;
Expand All @@ -208,7 +105,7 @@ static async Task AppendAsync(int id)

if (!batched)
{
// Unbatched version - append each item with commit
// Single commit version - append each item with commit
// Needs high parallelism (NumParallelTasks) for perf
while (true)
{
Expand All @@ -224,7 +121,7 @@ static async Task AppendAsync(int id)
}
else
{
// Batched version - we append many entries to memory,
// Group-commit version - we append many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
Expand All @@ -238,13 +135,115 @@ static async Task AppendAsync(int id)
}
}

static void ScanThread()
{
Random r = new Random();

long lastAddress = 0;
byte[] result;
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result, out int length))
{
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (Different(result, staticEntry, out int location))
{
if (result.Length != staticEntry.Length)
throw new Exception("Invalid entry found, expected length " + staticEntry.Length + ", actual length " + result.Length);
else
throw new Exception("Invalid entry found at offset " + location);
}

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Append(result);
}

if (iter.CurrentAddress - lastAddress > 500_000_000)
{
log.TruncateUntil(iter.CurrentAddress);
lastAddress = iter.CurrentAddress;
}
}
}
}

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

static void CommitThread()
{
while (true)
{
Thread.Sleep(5);
log.FlushAndCommit(true);

// Async version
// await Task.Delay(5);
// await log.FlushAndCommitAsync();
}
}

private static bool Different(byte[] b1, byte[] b2, out int location)
{
location = 0;
if (b1.Length != b2.Length) return true;
for (location = 0; location < b1.Length; location++)
{
if (b1[location] != b2[location])
{
return true;
}
}
return false;
}

// For batch append API
static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);

private struct ReadOnlySpanBatch : IReadOnlySpanBatch
{
private readonly int batchSize;
public ReadOnlySpanBatch(int batchSize) => this.batchSize = batchSize;
public ReadOnlySpan<byte> Get(int index) => staticEntry;
public int TotalEntries() => batchSize;
}

}
}
10 changes: 5 additions & 5 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace FASTER.core
/// </summary>
/// <param name="minLength">Minimum length of returned span</param>
/// <returns></returns>
public delegate Span<byte> GetMemory(int minLength);
public delegate byte[] GetMemory(int minLength);

/// <summary>
/// Scan iterator for hybrid log
Expand Down Expand Up @@ -103,7 +103,7 @@ public async ValueTask WaitAsync()
/// <param name="entry">Copy of entry, if found</param>
/// <param name="entryLength">Actual length of entry</param>
/// <returns></returns>
public unsafe bool GetNext(out Span<byte> entry, out int entryLength)
public unsafe bool GetNext(out byte[] entry, out int entryLength)
{
if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken))
{
Expand All @@ -112,15 +112,15 @@ public unsafe bool GetNext(out Span<byte> entry, out int entryLength)
// Use user delegate to allocate memory
entry = getMemory(entryLength);
if (entry.Length < entryLength)
throw new Exception("Span provided has invalid length");
throw new Exception("Byte array provided has invalid length");
}
else
{
// We allocate a byte array from heap
entry = new Span<byte>(new byte[entryLength]);
entry = new byte[entryLength];
}

fixed (byte* bp = &entry.GetPinnableReference())
fixed (byte* bp = entry)
Buffer.MemoryCopy((void*)(4 + physicalAddress), bp, entryLength, entryLength);

if (epochTaken)
Expand Down
11 changes: 3 additions & 8 deletions cs/test/FasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void FasterLogTest1()
using (var iter = log.Scan(0, long.MaxValue))
{
int count = 0;
while (iter.GetNext(out Span<byte> result, out int length))
while (iter.GetNext(out byte[] result, out int length))
{
count++;
Assert.IsTrue(result.SequenceEqual(entry));
Expand Down Expand Up @@ -90,7 +90,8 @@ public async Task FasterLogTest2()
while (!waitingReader.IsCompleted) ;
Assert.IsTrue(waitingReader.IsCompleted);

var result = GetNext(iter);
var curr = iter.GetNext(out byte[] result, out _);
Assert.IsTrue(curr);
Assert.IsTrue(result.SequenceEqual(data1));

var next = iter.GetNext(out _, out _);
Expand All @@ -99,11 +100,5 @@ public async Task FasterLogTest2()
}
log.Dispose();
}

private byte[] GetNext(FasterLogScanIterator iter)
{
iter.GetNext(out Span<byte> entry, out _);
return entry.ToArray();
}
}
}

0 comments on commit 5caea66

Please sign in to comment.