Skip to content

Commit

Permalink
Added TryAppend so users can implement log throttling.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 19, 2019
1 parent 88d7269 commit ddcc338
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 53 deletions.
12 changes: 12 additions & 0 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ static void AppendThread()
while (true)
{
log.Append(entry);

// We also support a Span-based version of Append

// We also support TryAppend to allow throttling/back-off:
// while (!log.TryAppend(entry, out long logicalAddress))
// {
// Thread.Sleep(10);
// }
}
}

Expand Down Expand Up @@ -97,6 +105,10 @@ static void Main(string[] args)
log = new FasterLog(new FasterLogSettings { LogDevice = device });

new Thread(new ThreadStart(AppendThread)).Start();

// Can have multiple append threads if needed
// new Thread(new ThreadStart(AppendThread)).Start();

new Thread(new ThreadStart(ScanThread)).Start();
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();
Expand Down
154 changes: 106 additions & 48 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class FasterLog : IDisposable
{
private readonly BlittableAllocator<Empty, byte> allocator;
private readonly LightEpoch epoch;
private ILogCommitManager logCommitManager;

/// <summary>
/// Beginning address of log
Expand All @@ -40,9 +41,7 @@ public class FasterLog : IDisposable
/// <summary>
/// Log commit until address
/// </summary>
public long CommitUntilAddress;

private ILogCommitManager logCommitManager;
public long CommittedUntilAddress;

/// <summary>
/// Create new log instance
Expand All @@ -62,49 +61,6 @@ public FasterLog(FasterLogSettings logSettings)
Restore();
}

/// <summary>
/// Commit log
/// </summary>
private void Commit(long flushAddress)
{
epoch.Resume();
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
info.FlushedUntilAddress = allocator.FlushedUntilAddress;
info.BeginAddress = allocator.BeginAddress;
epoch.Suspend();

// We can only allow serial monotonic synchronous commit
lock (this)
{
if (flushAddress > CommitUntilAddress)
{
logCommitManager.Commit(info.ToByteArray());
CommitUntilAddress = flushAddress;
info.DebugPrint();
}
}
}

/// <summary>
/// Restore log
/// </summary>
private void Restore()
{
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
var commitInfo = logCommitManager.GetCommitMetadata();

if (commitInfo == null) return;

using (var r = new BinaryReader(new MemoryStream(commitInfo)))
{
info.Initialize(r);
}

allocator.RestoreHybridLog(info.FlushedUntilAddress,
info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress),
info.BeginAddress);
}

/// <summary>
/// Dispose
/// </summary>
Expand Down Expand Up @@ -150,6 +106,60 @@ public unsafe long Append(byte[] entry)
return logicalAddress;
}

/// <summary>
/// Try to append entry to log
/// </summary>
/// <param name="entry">Entry to be appended to log</param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryAppend(byte[] entry, out long logicalAddress)
{
epoch.Resume();
logicalAddress = 0;
long tail = -allocator.GetTailAddress();
allocator.CheckForAllocateComplete(ref tail);
if (tail < 0)
{
epoch.Suspend();
return false;
}
var length = entry.Length;
BlockAllocate(4 + length, out logicalAddress);
var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
fixed (byte* bp = entry)
Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length);
epoch.Suspend();
return true;
}

/// <summary>
/// Try to append entry to log
/// </summary>
/// <param name="entry">Entry to be appended to log</param>
/// <param name="logicalAddress">Logical address of added entry</param>
/// <returns>Whether the append succeeded</returns>
public unsafe bool TryAppend(Span<byte> entry, out long logicalAddress)
{
epoch.Resume();
logicalAddress = 0;
long tail = -allocator.GetTailAddress();
allocator.CheckForAllocateComplete(ref tail);
if (tail < 0)
{
epoch.Suspend();
return false;
}
var length = entry.Length;
BlockAllocate(4 + length, out logicalAddress);
var physicalAddress = allocator.GetPhysicalAddress(logicalAddress);
*(int*)physicalAddress = length;
fixed (byte* bp = &entry.GetPinnableReference())
Buffer.MemoryCopy(bp, (void*)(4 + physicalAddress), length, length);
epoch.Suspend();
return true;
}

/// <summary>
/// Append batch of entries to log
/// </summary>
Expand Down Expand Up @@ -182,7 +192,7 @@ public long FlushAndCommit(bool spinWait = false)

if (spinWait)
{
while (CommitUntilAddress < tailAddress)
while (CommittedUntilAddress < tailAddress)
{
epoch.ProtectAndDrain();
Thread.Yield();
Expand All @@ -204,7 +214,7 @@ public void TruncateUntil(long untilAddress)
}

/// <summary>
/// Iterator interface for scanning FASTER log
/// Pull-based iterator interface for scanning FASTER log
/// </summary>
/// <param name="beginAddress"></param>
/// <param name="endAddress"></param>
Expand Down Expand Up @@ -235,6 +245,11 @@ public void ReleaseThread()
epoch.Release();
}

/// <summary>
/// Block allocate
/// </summary>
/// <param name="recordSize"></param>
/// <param name="logicalAddress"></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void BlockAllocate(int recordSize, out long logicalAddress)
{
Expand All @@ -259,5 +274,48 @@ private void BlockAllocate(int recordSize, out long logicalAddress)
BlockAllocate(recordSize, out logicalAddress);
}
}

/// <summary>
/// Commit log
/// </summary>
private void Commit(long flushAddress)
{
epoch.Resume();
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
info.FlushedUntilAddress = allocator.FlushedUntilAddress;
info.BeginAddress = allocator.BeginAddress;
epoch.Suspend();

// We can only allow serial monotonic synchronous commit
lock (this)
{
if (flushAddress > CommittedUntilAddress)
{
logCommitManager.Commit(flushAddress, info.ToByteArray());
CommittedUntilAddress = flushAddress;
info.DebugPrint();
}
}
}

/// <summary>
/// Restore log
/// </summary>
private void Restore()
{
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
var commitInfo = logCommitManager.GetCommitMetadata();

if (commitInfo == null) return;

using (var r = new BinaryReader(new MemoryStream(commitInfo)))
{
info.Initialize(r);
}

allocator.RestoreHybridLog(info.FlushedUntilAddress,
info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress),
info.BeginAddress);
}
}
}
5 changes: 3 additions & 2 deletions cs/src/core/Index/FasterLog/ILogCommitManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ public interface ILogCommitManager
/// <summary>
/// Perform (synchronous) commit with specified metadata
/// </summary>
/// <param name="commitMetadata"></param>
void Commit(byte[] commitMetadata);
/// <param name="address">Address committed until (for information only, not necessary to persist)</param>
/// <param name="commitMetadata">Commit metadata</param>
void Commit(long address, byte[] commitMetadata);

/// <summary>
/// Return prior commit metadata during recovery
Expand Down
7 changes: 4 additions & 3 deletions cs/src/core/Index/FasterLog/LocalLogCommitManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ public LocalLogCommitManager(string CommitFile)
}

/// <summary>
/// Commit log
/// Perform (synchronous) commit with specified metadata
/// </summary>
/// <param name="commitMetadata"></param>
public void Commit(byte[] commitMetadata)
/// <param name="address">Address committed until (for information only, not necessary to persist)</param>
/// <param name="commitMetadata">Commit metadata</param>
public void Commit(long address, byte[] commitMetadata)
{
// Two phase to ensure we write metadata in single Write operation
using (var ms = new MemoryStream())
Expand Down

0 comments on commit ddcc338

Please sign in to comment.