Skip to content

Commit

Permalink
Added commit and recovery support.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 19, 2019
1 parent 19d5d82 commit 88d7269
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 103 deletions.
10 changes: 10 additions & 0 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ static void ReportThread()
}
}

static void CommitThread()
{
while (true)
{
Thread.Sleep(100);
log.FlushAndCommit(true);
}
}

static void AppendThread()
{
byte[] entry = new byte[entryLength];
Expand Down Expand Up @@ -90,6 +99,7 @@ static void Main(string[] args)
new Thread(new ThreadStart(AppendThread)).Start();
new Thread(new ThreadStart(ScanThread)).Start();
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();

Thread.Sleep(500*1000);
}
Expand Down
22 changes: 18 additions & 4 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal struct PageOffset
/// </summary>
/// <typeparam name="Key"></typeparam>
/// <typeparam name="Value"></typeparam>
public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
public unsafe abstract partial class AllocatorBase<Key, Value> : IDisposable
where Key : new()
where Value : new()
{
Expand Down Expand Up @@ -221,6 +221,11 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// </summary>
protected readonly Action<long, long> EvictCallback = null;

/// <summary>
/// Flush callback
/// </summary>
protected readonly Action<long> FlushCallback = null;

/// <summary>
/// Observer for records entering read-only region
/// </summary>
Expand Down Expand Up @@ -380,7 +385,8 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// Clear page
/// </summary>
/// <param name="page">Page number to be cleared</param>
protected abstract void ClearPage(long page);
/// <param name="offset">Offset to clear from (if partial clear)</param>
protected abstract void ClearPage(long page, int offset = 0);
/// <summary>
/// Write page (async)
/// </summary>
Expand Down Expand Up @@ -469,13 +475,15 @@ public unsafe abstract class AllocatorBase<Key, Value> : IDisposable
/// <param name="comparer"></param>
/// <param name="evictCallback"></param>
/// <param name="epoch"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback, LightEpoch epoch)
/// <param name="flushCallback"></param>
public AllocatorBase(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback, LightEpoch epoch, Action<long> flushCallback)
{
if (evictCallback != null)
{
ReadCache = true;
EvictCallback = evictCallback;
}
FlushCallback = flushCallback;

this.comparer = comparer;
if (epoch == null)
Expand Down Expand Up @@ -1100,7 +1108,10 @@ protected void ShiftFlushedUntilAddress()

if (update)
{
Utility.MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress);
if (Utility.MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress))
{
FlushCallback?.Invoke(FlushedUntilAddress);
}
}
}

Expand Down Expand Up @@ -1146,6 +1157,9 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress)
PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageCloseStatus = PMMCloseStatus.Open;
PageStatusIndicator[pageIndex].PageFlushCloseStatus.PageFlushStatus = PMMFlushStatus.Flushed;

// clear the last page starting from tail address
ClearPage(pageIndex, (int)GetOffsetInPage(tailAddress));

// Printing debug info
Debug.WriteLine("******* Recovered HybridLog Stats *******");
Debug.WriteLine("Head Address: {0}", HeadAddress);
Expand Down
15 changes: 11 additions & 4 deletions cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public unsafe sealed class BlittableAllocator<Key, Value> : AllocatorBase<Key, V
private static readonly int keySize = Utility.GetSize(default(Key));
private static readonly int valueSize = Utility.GetSize(default(Value));

public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null)
: base(settings, comparer, evictCallback, epoch)
public BlittableAllocator(LogSettings settings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null, Action<long> flushCallback = null)
: base(settings, comparer, evictCallback, epoch, flushCallback)
{
values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
Expand Down Expand Up @@ -198,9 +198,16 @@ public override long GetFirstValidLogicalAddress(long page)
return page << LogPageSizeBits;
}

protected override void ClearPage(long page)
protected override void ClearPage(long page, int offset)
{
Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length);
if (offset == 0)
Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset);
else
{
// Adjust array offset for cache alignment
offset += (int)(pointers[page % BufferSize] - (long)handles[page % BufferSize].AddrOfPinnedObject());
Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset);
}
}

/// <summary>
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public unsafe sealed class GenericAllocator<Key, Value> : AllocatorBase<Key, Val
private readonly bool valueBlittable = Utility.IsBlittable<Value>();

public GenericAllocator(LogSettings settings, SerializerSettings<Key, Value> serializerSettings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null)
: base(settings, comparer, evictCallback, epoch)
: base(settings, comparer, evictCallback, epoch, null)
{
SerializerSettings = serializerSettings;

Expand Down Expand Up @@ -254,9 +254,9 @@ protected override void WriteAsyncToDevice<TContext>



protected override void ClearPage(long page)
protected override void ClearPage(long page, int offset)
{
Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length);
Array.Clear(values[page % BufferSize], offset / recordSize, values[page % BufferSize].Length - offset / recordSize);

// Close segments
var thisCloseSegment = page >> (LogSegmentSizeBits - LogPageSizeBits);
Expand Down
13 changes: 10 additions & 3 deletions cs/src/core/Allocator/VarLenBlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public unsafe sealed class VariableLengthBlittableAllocator<Key, Value> : Alloca
internal readonly IVariableLengthStruct<Value> ValueLength;

public VariableLengthBlittableAllocator(LogSettings settings, VariableLengthStructSettings<Key, Value> vlSettings, IFasterEqualityComparer<Key> comparer, Action<long, long> evictCallback = null, LightEpoch epoch = null)
: base(settings, comparer, evictCallback, epoch)
: base(settings, comparer, evictCallback, epoch, null)
{
values = new byte[BufferSize][];
handles = new GCHandle[BufferSize];
Expand Down Expand Up @@ -282,9 +282,16 @@ public override long GetFirstValidLogicalAddress(long page)
return page << LogPageSizeBits;
}

protected override void ClearPage(long page)
protected override void ClearPage(long page, int offset)
{
Array.Clear(values[page % BufferSize], 0, values[page % BufferSize].Length);
if (offset == 0)
Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset);
else
{
// Adjust array offset for cache alignment
offset += (int)(pointers[page % BufferSize] - (long)handles[page % BufferSize].AddrOfPinnedObject());
Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset);
}
}

/// <summary>
Expand Down
72 changes: 67 additions & 5 deletions cs/src/core/Index/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;

Expand All @@ -32,19 +33,76 @@ public class FasterLog : IDisposable
public long TailAddress => allocator.GetTailAddress();

/// <summary>
/// Flushed until address
/// Log flushed until address
/// </summary>
public long FlushedUntilAddress => allocator.FlushedUntilAddress;

/// <summary>
/// Log commit until address
/// </summary>
public long CommitUntilAddress;

private ILogCommitManager logCommitManager;

/// <summary>
/// Create new log instance
/// </summary>
/// <param name="logSettings"></param>
public FasterLog(FasterLogSettings logSettings)
{
logCommitManager = logSettings.LogCommitManager ??
new LocalLogCommitManager(logSettings.LogCommitFile ??
logSettings.LogDevice.FileName + ".commit");

epoch = new LightEpoch();
allocator = new BlittableAllocator<Empty, byte>(logSettings.GetLogSettings(), null, null, epoch);
allocator = new BlittableAllocator<Empty, byte>(
logSettings.GetLogSettings(), null,
null, epoch, e => Commit(e));
allocator.Initialize();
Restore();
}

/// <summary>
/// Commit log
/// </summary>
private void Commit(long flushAddress)
{
epoch.Resume();
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
info.FlushedUntilAddress = allocator.FlushedUntilAddress;
info.BeginAddress = allocator.BeginAddress;
epoch.Suspend();

// We can only allow serial monotonic synchronous commit
lock (this)
{
if (flushAddress > CommitUntilAddress)
{
logCommitManager.Commit(info.ToByteArray());
CommitUntilAddress = flushAddress;
info.DebugPrint();
}
}
}

/// <summary>
/// Restore log
/// </summary>
private void Restore()
{
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
var commitInfo = logCommitManager.GetCommitMetadata();

if (commitInfo == null) return;

using (var r = new BinaryReader(new MemoryStream(commitInfo)))
{
info.Initialize(r);
}

allocator.RestoreHybridLog(info.FlushedUntilAddress,
info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress),
info.BeginAddress);
}

/// <summary>
Expand Down Expand Up @@ -117,16 +175,20 @@ public unsafe long Append(List<byte[]> entries)
/// <summary>
/// Flush the log until tail
/// </summary>
public long Flush(bool spinWait = false)
public long FlushAndCommit(bool spinWait = false)
{
epoch.Resume();
allocator.ShiftReadOnlyToTail(out long tailAddress);
epoch.Suspend();

if (spinWait)
{
while (allocator.FlushedUntilAddress < tailAddress)
while (CommitUntilAddress < tailAddress)
{
epoch.ProtectAndDrain();
Thread.Yield();
}
}
epoch.Suspend();
return tailAddress;
}

Expand Down
Loading

0 comments on commit 88d7269

Please sign in to comment.