Skip to content

Commit

Permalink
Added MemoryPool/IMemoryOwner variant of iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Oct 4, 2019
1 parent 80a2aeb commit 0050694
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 79 deletions.
4 changes: 2 additions & 2 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ static void ScanThread()
{
while (true)
{
while (!iter.GetNext(out result))
while (!iter.GetNext(out result, out int length))
{
Thread.Sleep(1000);
iter.WaitAsync().GetAwaiter().GetResult();
}

if (!result.SequenceEqual(entrySpan))
Expand Down
200 changes: 123 additions & 77 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using System.Buffers;

namespace FASTER.core
{
/// <summary>
/// Delegate for getting memory from user
/// </summary>
/// <param name="length"></param>
/// <param name="minLength">Minimum length of returned span</param>
/// <returns></returns>
public delegate Span<byte> GetMemory(int length);
public delegate Span<byte> GetMemory(int minLength);

/// <summary>
/// Scan iterator for hybrid log
Expand Down Expand Up @@ -84,7 +85,7 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
/// Wait for iteration to be ready to continue
/// </summary>
/// <returns></returns>
public async void WaitAsync()
public async ValueTask WaitAsync()
{
while (true)
{
Expand All @@ -99,100 +100,60 @@ public async void WaitAsync()
/// <summary>
/// Get next record in iterator
/// </summary>
/// <param name="entry"></param>
/// <param name="entry">Copy of entry, if found</param>
/// <param name="entryLength">Actual length of entry</param>
/// <returns></returns>
public unsafe bool GetNext(out Span<byte> entry)
public unsafe bool GetNext(out Span<byte> entry, out int entryLength)
{
currentAddress = nextAddress;
while (true)
if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken))
{
// Check for boundary conditions
if (currentAddress < allocator.BeginAddress)
{
Debug.WriteLine("Iterator address is less than log BeginAddress " + allocator.BeginAddress + ", adjusting iterator address");
currentAddress = allocator.BeginAddress;
}

if ((currentAddress >= endAddress) || (currentAddress >= fasterLog.CommittedUntilAddress))
{
entry = default(Span<byte>);
return false;
}


if (frameSize == 0 && currentAddress < allocator.HeadAddress)
{
throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> allocator.LogPageSizeBits;
var offset = currentAddress & allocator.PageSizeMask;

var headAddress = allocator.HeadAddress;
var physicalAddress = default(long);

if (currentAddress < headAddress)
{
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset);
}
else
{
epoch.Resume();
headAddress = allocator.HeadAddress;
if (currentAddress < headAddress) // rare case
{
epoch.Suspend();
continue;
}

physicalAddress = allocator.GetPhysicalAddress(currentAddress);
}

// Check if record fits on page, if not skip to next page
int length = *(int*)physicalAddress;
int recordSize = 4 + Align(length);

if ((currentAddress & allocator.PageSizeMask) + recordSize > allocator.PageSize)
throw new Exception();

if (length == 0) // we are at end of page, skip to next
{
// If record
if (currentAddress >= headAddress)
epoch.Suspend();
currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
continue;
}

if (getMemory != null)
{
// Use user delegate to allocate memory
entry = getMemory(length);
if (entry.Length != length)
entry = getMemory(entryLength);
if (entry.Length < entryLength)
throw new Exception("Span provided has invalid length");
}
else
{
// We allocate a byte array from heap
entry = new Span<byte>(new byte[length]);
entry = new Span<byte>(new byte[entryLength]);
}

fixed (byte* bp = &entry.GetPinnableReference())
Buffer.MemoryCopy((void*)(4 + physicalAddress), bp, length, length);
Buffer.MemoryCopy((void*)(4 + physicalAddress), bp, entryLength, entryLength);

if (currentAddress >= headAddress)
if (epochTaken)
epoch.Suspend();

Debug.Assert((currentAddress & allocator.PageSizeMask) + recordSize <= allocator.PageSize);
return true;
}
entry = default;
return false;
}

/// <summary>
/// GetNext supporting memory pools
/// </summary>
/// <param name="pool"></param>
/// <param name="entry"></param>
/// <param name="entryLength"></param>
/// <returns></returns>
public unsafe bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry, out int entryLength)
{
if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken))
{
entry = pool.Rent(entryLength);
Buffer.MemoryCopy((void*)(4 + physicalAddress), (void*)((byte*)entry.Memory.Pin().Pointer + 4), entryLength, entryLength);

if (epochTaken)
epoch.Suspend();

if ((currentAddress & allocator.PageSizeMask) + recordSize == allocator.PageSize)
nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
else
nextAddress = currentAddress + recordSize;

return true;
}
entry = default;
entryLength = default;
return false;
}

/// <summary>
Expand Down Expand Up @@ -260,6 +221,91 @@ private int Align(int length)
return (length + 3) & ~3;
}

/// <summary>
/// Retrieve physical address of next iterator value
/// (under epoch protection if it is from main page buffer)
/// </summary>
/// <param name="physicalAddress"></param>
/// <param name="entryLength"></param>
/// <param name="epochTaken"></param>
/// <returns></returns>
private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out bool epochTaken)
{
physicalAddress = 0;
entryLength = 0;
epochTaken = false;

currentAddress = nextAddress;
while (true)
{
// Check for boundary conditions
if (currentAddress < allocator.BeginAddress)
{
Debug.WriteLine("Iterator address is less than log BeginAddress " + allocator.BeginAddress + ", adjusting iterator address");
currentAddress = allocator.BeginAddress;
}

if ((currentAddress >= endAddress) || (currentAddress >= fasterLog.CommittedUntilAddress))
{
return false;
}

if (frameSize == 0 && currentAddress < allocator.HeadAddress)
{
throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> allocator.LogPageSizeBits;
var offset = currentAddress & allocator.PageSizeMask;

var headAddress = allocator.HeadAddress;

if (currentAddress < headAddress)
{
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset);
}
else
{
epoch.Resume();
headAddress = allocator.HeadAddress;
if (currentAddress < headAddress) // rare case
{
epoch.Suspend();
continue;
}

physicalAddress = allocator.GetPhysicalAddress(currentAddress);
}

// Check if record fits on page, if not skip to next page
entryLength = *(int*)physicalAddress;
int recordSize = 4 + Align(entryLength);

if ((currentAddress & allocator.PageSizeMask) + recordSize > allocator.PageSize)
throw new Exception();

if (entryLength == 0) // we are at end of page, skip to next
{
// If record
if (currentAddress >= headAddress)
epoch.Suspend();
currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
continue;
}

Debug.Assert((currentAddress & allocator.PageSizeMask) + recordSize <= allocator.PageSize);

if ((currentAddress & allocator.PageSizeMask) + recordSize == allocator.PageSize)
nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
else
nextAddress = currentAddress + recordSize;

epochTaken = currentAddress >= headAddress;
return true;
}
}

}
}

Expand Down

0 comments on commit 0050694

Please sign in to comment.