Skip to content

Commit

Permalink
Fasterlog TryAppend (#179)
Browse files Browse the repository at this point in the history
Adding truly non-blocking TryAppend functionality. See sample for how this is used.
  • Loading branch information
badrishc authored Sep 30, 2019
1 parent 2cd85e3 commit ec2a3b5
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 62 deletions.
17 changes: 10 additions & 7 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace FasterLogSample
{
public class Program
{
const int entryLength = 96;
const int entryLength = 996;
static FasterLog log;

static void ReportThread()
Expand Down Expand Up @@ -50,15 +50,18 @@ static void AppendThread()

while (true)
{
// Sync append
log.Append(entry);

// We also support a Span-based version of Append
// We also support a Span-based variant of Append

// We also support TryAppend to allow throttling/back-off:
// while (!log.TryAppend(entry, out long logicalAddress))
// {
// Thread.Sleep(10);
// }
// We also support TryAppend to allow throttling/back-off
// (expect this to be slightly slower than the sync version)
// Make sure you supply a "starting" logical address of 0
// Retries must send back the current logical address.
//
// long logicalAddress = 0;
// while (!log.TryAppend(entry, ref logicalAddress)) ;
}
}

Expand Down
89 changes: 84 additions & 5 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,90 @@ public long Allocate(int numSlots = 1)
return address;
}

/// <summary>
/// Try allocate, no thread spinning allowed
/// May return 0 in case of inability
/// May also return negative address
/// </summary>
/// <param name="numSlots"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long TryAllocate(int numSlots = 1)
{
PageOffset localTailPageOffset = default(PageOffset);

// Necessary to check because threads keep retrying and we do not
// want to overflow offset more than once per thread
if (TailPageOffset.Offset > PageSize)
return 0;

// Determine insertion index.
// ReSharper disable once CSharpWarnings::CS0420
#pragma warning disable 420
localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots);
#pragma warning restore 420

int page = localTailPageOffset.Page;
int offset = localTailPageOffset.Offset - numSlots;

#region HANDLE PAGE OVERFLOW
if (localTailPageOffset.Offset > PageSize)
{
if (offset > PageSize)
{
return 0;
}

// The thread that "makes" the offset incorrect
// is the one that is elected to fix it and
// shift read-only/head.
localTailPageOffset.Page++;
localTailPageOffset.Offset = 0;
TailPageOffset = localTailPageOffset;

long shiftAddress = ((long)(page + 1)) << LogPageSizeBits;
PageAlignedShiftReadOnlyAddress(shiftAddress);
PageAlignedShiftHeadAddress(shiftAddress);

return 0;
}
#endregion

long address = (((long)page) << LogPageSizeBits) | ((long)offset);

// Check for TailPageCache hit
if (TailPageCache == page)
{
return address;
}

// Address has been allocated. Negate the address
// if page is not ready to be used.
if (CannotAllocate(page))
{
address = -address;
}

// Update the read-only so that we can get more space for the tail
if (offset == 0)
{
if (address >= 0)
{
TailPageCache = page;
Interlocked.MemoryBarrier();
}

// Allocate next page in advance, if needed
int newPageIndex = (page + 1) % BufferSize;
if ((!IsAllocated(newPageIndex)))
{
AllocatePage(newPageIndex);
}
}

return address;
}

/// <summary>
/// If allocator cannot allocate new memory as the head has not shifted or the previous page
/// is not yet closed, it allocates but returns the negative address.
Expand All @@ -827,11 +911,6 @@ public long Allocate(int numSlots = 1)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void CheckForAllocateComplete(ref long address)
{
if (address >= 0)
{
throw new Exception("Address already allocated!");
}

PageOffset p = default(PageOffset);
p.Page = (int)((-address) >> LogPageSizeBits);
p.Offset = (int)((-address) & PageSizeMask);
Expand Down
6 changes: 4 additions & 2 deletions cs/src/core/Index/Common/LogSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ public class ReadCacheSettings
public int MemorySizeBits = 34;

/// <summary>
/// Fraction of log used for second chance copy to tail
/// Fraction of log head (in memory) used for second chance
/// copy to tail. This is (1 - MutableFraction) for the
/// underlying log
/// </summary>
public double SecondChanceFraction = 0.9;
public double SecondChanceFraction = 0.1;
}
}
6 changes: 3 additions & 3 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo
PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits,
MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits,
SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits,
MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction
MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction
}, variableLengthStructSettings, this.comparer, ReadCacheEvict, epoch);
readcache.Initialize();
ReadCache = new LogAccessor<Key, Value, Input, Output, Context>(this, readcache);
Expand All @@ -153,7 +153,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo
PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits,
MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits,
SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits,
MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction
MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction
}, this.comparer, ReadCacheEvict, epoch);
readcache.Initialize();
ReadCache = new LogAccessor<Key, Value, Input, Output, Context>(this, readcache);
Expand All @@ -174,7 +174,7 @@ public FasterKV(long size, Functions functions, LogSettings logSettings, Checkpo
PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits,
MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits,
SegmentSizeBits = logSettings.ReadCacheSettings.MemorySizeBits,
MutableFraction = logSettings.ReadCacheSettings.SecondChanceFraction
MutableFraction = 1 - logSettings.ReadCacheSettings.SecondChanceFraction
}, serializerSettings, this.comparer, ReadCacheEvict, epoch);
readcache.Initialize();
ReadCache = new LogAccessor<Key, Value, Input, Output, Context>(this, readcache);
Expand Down
160 changes: 115 additions & 45 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,82 +109,73 @@ public unsafe long Append(byte[] entry)
}

/// <summary>
/// Try to append entry to log
/// Try to append entry to log. If is returns true, we are
/// done. If it returns false with negative address, user
/// needs to call TryCompleteAppend to finalize the append.
/// See TryCompleteAppend for more info.
/// </summary>
/// <param name="entry">Entry to be appended to log</param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryAppend(byte[] entry, out long logicalAddress)
public unsafe bool TryAppend(byte[] entry, ref long logicalAddress)
{
if (logicalAddress < 0)
return TryCompleteAppend(entry, ref logicalAddress);

epoch.Resume();
logicalAddress = 0;
long tail = -allocator.GetTailAddress();
allocator.CheckForAllocateComplete(ref tail);
if (tail < 0)
{

var length = entry.Length;
var alignedLength = (length + 3) & ~3; // round up to multiple of 4

logicalAddress = allocator.TryAllocate(4 + alignedLength);
if (logicalAddress <= 0)
{
epoch.Suspend();
return false;
}
var length = entry.Length;
var alignedLength = (length + 3) & ~3; // round up to multiple of 4
BlockAllocate(4 + alignedLength, out logicalAddress);

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
fixed (byte* bp = entry)
Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length);

epoch.Suspend();
return true;
}

/// <summary>
/// Try to append entry to log
/// Try to append entry to log. If is returns true, we are
/// done. If it returns false with negative address, user
/// needs to call TryCompleteAppend to finalize the append.
/// See TryCompleteAppend for more info.
/// </summary>
/// <param name="entry">Entry to be appended to log</param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryAppend(Span<byte> entry, out long logicalAddress)
public unsafe bool TryAppend(Span<byte> entry, ref long logicalAddress)
{
if (logicalAddress < 0)
return TryCompleteAppend(entry, ref logicalAddress);

epoch.Resume();
logicalAddress = 0;
long tail = -allocator.GetTailAddress();
allocator.CheckForAllocateComplete(ref tail);
if (tail < 0)

var length = entry.Length;
var alignedLength = (length + 3) & ~3; // round up to multiple of 4

logicalAddress = allocator.TryAllocate(4 + alignedLength);
if (logicalAddress <= 0)
{
epoch.Suspend();
return false;
}
var length = entry.Length;
var alignedLength = (length + 3) & ~3; // round up to multiple of 4
BlockAllocate(4 + alignedLength, out logicalAddress);

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
fixed (byte* bp = &entry.GetPinnableReference())
Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length);
epoch.Suspend();
return true;
}

/// <summary>
/// Append batch of entries to log
/// </summary>
/// <param name="entries"></param>
/// <returns>Logical address of last added entry</returns>
public unsafe long Append(List<byte[]> entries)
{
long logicalAddress = 0;
epoch.Resume();
foreach (var entry in entries)
{
var length = entry.Length;
var alignedLength = (length + 3) & ~3; // round up to multiple of 4
BlockAllocate(4 + alignedLength, out logicalAddress);
var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
fixed (byte* bp = entry)
Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length);
}
epoch.Suspend();
return logicalAddress;
return true;
}

/// <summary>
Expand Down Expand Up @@ -280,6 +271,84 @@ private void BlockAllocate(int recordSize, out long logicalAddress)
}
}

/// <summary>
/// Try to complete partial allocation. Succeeds when address
/// turns positive. If failed with negative address, try the
/// operation. If failed with zero address, user needs to start
/// afresh with a new TryAppend operation.
/// </summary>
/// <param name="entry"></param>
/// <param name="logicalAddress"></param>
/// <returns>Whether operation succeeded</returns>
private unsafe bool TryCompleteAppend(byte[] entry, ref long logicalAddress)
{
epoch.Resume();

allocator.CheckForAllocateComplete(ref logicalAddress);

if (logicalAddress < 0)
{
epoch.Suspend();
return false;
}

if (logicalAddress < allocator.ReadOnlyAddress)
{
logicalAddress = 0;
epoch.Suspend();
return false;
}

var length = entry.Length;

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
fixed (byte* bp = entry)
Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length);

epoch.Suspend();
return true;
}

/// <summary>
/// Try to complete partial allocation. Succeeds when address
/// turns positive. If failed with negative address, try the
/// operation. If failed with zero address, user needs to start
/// afresh with a new TryAppend operation.
/// </summary>
/// <param name="entry"></param>
/// <param name="logicalAddress"></param>
/// <returns>Whether operation succeeded</returns>
private unsafe bool TryCompleteAppend(Span<byte> entry, ref long logicalAddress)
{
epoch.Resume();

allocator.CheckForAllocateComplete(ref logicalAddress);

if (logicalAddress < 0)
{
epoch.Suspend();
return false;
}

if (logicalAddress < allocator.ReadOnlyAddress)
{
logicalAddress = 0;
epoch.Suspend();
return false;
}

var length = entry.Length;

var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
fixed (byte* bp = &entry.GetPinnableReference())
Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length);

epoch.Suspend();
return true;
}

/// <summary>
/// Commit log
/// </summary>
Expand Down Expand Up @@ -316,9 +385,10 @@ private void Restore()
info.Initialize(r);
}

allocator.RestoreHybridLog(info.FlushedUntilAddress,
info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress),
info.BeginAddress);
var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress);
if (headAddress == 0) headAddress = Constants.kFirstValidAddress;

allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress);
}
}
}

0 comments on commit ec2a3b5

Please sign in to comment.