Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Oct 4, 2019
1 parent 0050694 commit 127e908
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
24 changes: 19 additions & 5 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,37 @@ public class Program
static readonly byte[] staticEntry = new byte[entryLength];
static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);
static FasterLog log;
static FasterLogScanIterator iter;

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Throughput: {0} MB/sec, Tail: {1}",
Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastTime = nowTime;
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

Expand Down Expand Up @@ -71,8 +84,6 @@ static void ScanThread()
{
Random r = new Random();

Thread.Sleep(5000);

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
{
Expand All @@ -83,7 +94,7 @@ static void ScanThread()

long lastAddress = 0;
Span<byte> result;
using (var iter = log.Scan(0, long.MaxValue))
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
Expand All @@ -92,6 +103,9 @@ static void ScanThread()
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (!result.SequenceEqual(entrySpan))
{
if (result.Length != entrySpan.Length)
Expand Down
3 changes: 2 additions & 1 deletion cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public unsafe bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry,
if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken))
{
entry = pool.Rent(entryLength);
Buffer.MemoryCopy((void*)(4 + physicalAddress), (void*)((byte*)entry.Memory.Pin().Pointer + 4), entryLength, entryLength);
using (var handle = entry.Memory.Pin())
Buffer.MemoryCopy((void*)(4 + physicalAddress), handle.Pointer, entryLength, entryLength);

if (epochTaken)
epoch.Suspend();
Expand Down
2 changes: 1 addition & 1 deletion cs/test/FasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void FasterLogTest1()
using (var iter = log.Scan(0, long.MaxValue))
{
int count = 0;
while (iter.GetNext(out Span<byte> result))
while (iter.GetNext(out Span<byte> result, out int length))
{
count++;
Assert.IsTrue(result.SequenceEqual(entry));
Expand Down

0 comments on commit 127e908

Please sign in to comment.