Skip to content

Commit

Permalink
[C#] Log compaction fix in presence of concurrent updates (#406)
Browse files Browse the repository at this point in the history
* [C#] Log compaction fix in presence of concurrent updates

* Refactor scan iterator to shared base class.
* Log compaction updates.
* Wrap functions used for compaction so that ConcurrentWriter is not triggered.
* Cleaned up iteration and compaction APIs.
* Remove faulty test.
  • Loading branch information
badrishc authored Feb 24, 2021
1 parent a35a4cd commit e163614
Show file tree
Hide file tree
Showing 20 changed files with 449 additions and 858 deletions.
1 change: 0 additions & 1 deletion cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ public override long[] GetSegmentOffsets()
internal override void PopulatePage(byte* src, int required_bytes, long destinationPage)
{
throw new FasterException("BlittableAllocator memory pages are sector aligned - use direct copy");
// Buffer.MemoryCopy(src, (void*)pointers[destinationPage % BufferSize], required_bytes, required_bytes);
}

/// <summary>
Expand Down
101 changes: 16 additions & 85 deletions cs/src/core/Allocator/BlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,16 @@ namespace FASTER.core
/// <summary>
/// Scan iterator for hybrid log
/// </summary>
public sealed class BlittableScanIterator<Key, Value> : IFasterScanIterator<Key, Value>
public sealed class BlittableScanIterator<Key, Value> : ScanIteratorBase, IFasterScanIterator<Key, Value>
{
private readonly int frameSize;
private readonly BlittableAllocator<Key, Value> hlog;
private readonly long endAddress;
private readonly BlittableFrame frame;
private readonly CountdownEvent[] loaded;
private readonly LightEpoch epoch;
private readonly bool forceInMemory;

private bool first = true;
private long currentAddress, nextAddress;
private Key currentKey;
private Value currentValue;
private long currentPhysicalAddress;

/// <summary>
/// Current address
/// </summary>
public long CurrentAddress => currentAddress;

/// <summary>
/// Next address
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// Constructor
/// </summary>
Expand All @@ -45,44 +29,14 @@ public sealed class BlittableScanIterator<Key, Value> : IFasterScanIterator<Key,
/// <param name="scanBufferingMode"></param>
/// <param name="epoch"></param>
/// <param name="forceInMemory">Provided address range is known by caller to be in memory, even if less than HeadAddress</param>
public unsafe BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch, bool forceInMemory = false)
public BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch, bool forceInMemory = false)
: base(beginAddress == 0 ? hlog.GetFirstValidLogicalAddress(0) : beginAddress, endAddress, scanBufferingMode, epoch, hlog.LogPageSizeBits)
{
this.hlog = hlog;
this.forceInMemory = forceInMemory;

// If we are protected when creating the iterator, we do not need per-GetNext protection
if (!epoch.ThisInstanceProtected())
this.epoch = epoch;

if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

this.endAddress = endAddress;
currentAddress = -1;
nextAddress = beginAddress;

if (scanBufferingMode == ScanBufferingMode.SinglePageBuffering)
frameSize = 1;
else if (scanBufferingMode == ScanBufferingMode.DoublePageBuffering)
frameSize = 2;
else if (scanBufferingMode == ScanBufferingMode.NoBuffering)
{
frameSize = 0;
return;
}

frame = new BlittableFrame(frameSize, hlog.PageSize, hlog.GetDeviceSectorSize());
loaded = new CountdownEvent[frameSize];

// Only load addresses flushed to disk
if (nextAddress < hlog.HeadAddress && !forceInMemory)
{
var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize;
hlog.AsyncReadPagesFromDeviceToFrame
(nextAddress >> hlog.LogPageSizeBits,
1, endAddress, AsyncReadPagesCallback, Empty.Default,
frame, out loaded[frameNumber]);
}
if (frameSize > 0)
frame = new BlittableFrame(frameSize, hlog.PageSize, hlog.GetDeviceSectorSize());
}

/// <summary>
Expand Down Expand Up @@ -145,7 +99,7 @@ public bool GetNext(out RecordInfo recordInfo)
var offset = currentAddress & hlog.PageSizeMask;

if (currentAddress < headAddress && !forceInMemory)
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize, headAddress, endAddress);

long physicalAddress;
if (currentAddress >= headAddress || forceInMemory)
Expand Down Expand Up @@ -210,60 +164,37 @@ public bool GetNext(out RecordInfo recordInfo, out Key key, out Value value)
}

/// <summary>
/// Dispose the iterator
/// Dispose iterator
/// </summary>
public void Dispose()
public override void Dispose()
{
base.Dispose();
frame?.Dispose();
}

private unsafe void BufferAndLoad(long currentAddress, long currentPage, long currentFrame)
{
if (first || (currentAddress & hlog.PageSizeMask) == 0)
{
// Prefetch pages based on buffering mode
if (frameSize == 1)
{
if (!first)
{
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
}
}
else
{
var endPage = endAddress >> hlog.LogPageSizeBits;
if ((endPage > currentPage) &&
((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0)))
{
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
}
}
first = false;
}
epoch?.Suspend();
loaded[currentFrame].Wait();
epoch?.Resume();
}
internal override void AsyncReadPagesFromDeviceToFrame<TContext>(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null)
=> hlog.AsyncReadPagesFromDeviceToFrame(readPageStart, numPages, untilAddress, AsyncReadPagesCallback, context, frame, out completed, devicePageOffset, device, objectLogDevice);

private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object context)
{
var result = (PageAsyncReadResult<Empty>)context;

if (errorCode != 0)
{
Trace.TraceError("AsyncReadPagesCallback error: {0}", errorCode);
result.cts?.Cancel();
}

var result = (PageAsyncReadResult<Empty>)context;

if (result.freeBuffer1 != null)
{
hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page);
result.freeBuffer1.Return();
result.freeBuffer1 = null;
}

if (result.handle != null)
if (errorCode == 0)
{
result.handle.Signal();
result.handle?.Signal();
}

Interlocked.MemoryBarrier();
Expand Down
111 changes: 17 additions & 94 deletions cs/src/core/Allocator/GenericScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,21 @@

using System.Threading;
using System.Diagnostics;
using System;

namespace FASTER.core
{
/// <summary>
/// Scan iterator for hybrid log
/// </summary>
public sealed class GenericScanIterator<Key, Value> : IFasterScanIterator<Key, Value>
public sealed class GenericScanIterator<Key, Value> : ScanIteratorBase, IFasterScanIterator<Key, Value>
{
private readonly int frameSize;
private readonly GenericAllocator<Key, Value> hlog;
private readonly long endAddress;
private readonly GenericFrame<Key, Value> frame;
private readonly CountdownEvent[] loaded;
private readonly int recordSize;
private readonly LightEpoch epoch;

private bool first = true;
private long currentAddress, nextAddress;
private Key currentKey;
private Value currentValue;

/// <summary>
/// Current address
/// </summary>
public long CurrentAddress => currentAddress;

/// <summary>
/// Next address
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// Constructor
/// </summary>
Expand All @@ -43,45 +26,13 @@ public sealed class GenericScanIterator<Key, Value> : IFasterScanIterator<Key, V
/// <param name="endAddress"></param>
/// <param name="scanBufferingMode"></param>
/// <param name="epoch"></param>
public unsafe GenericScanIterator(GenericAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
public GenericScanIterator(GenericAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
: base(beginAddress == 0 ? hlog.GetFirstValidLogicalAddress(0) : beginAddress, endAddress, scanBufferingMode, epoch, hlog.LogPageSizeBits)
{
this.hlog = hlog;

// If we are protected when creating the iterator, we do not need per-GetNext protection
if (!epoch.ThisInstanceProtected())
this.epoch = epoch;

if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

this.endAddress = endAddress;

recordSize = hlog.GetRecordSize(0).Item2;
currentAddress = -1;
nextAddress = beginAddress;

if (scanBufferingMode == ScanBufferingMode.SinglePageBuffering)
frameSize = 1;
else if (scanBufferingMode == ScanBufferingMode.DoublePageBuffering)
frameSize = 2;
else if (scanBufferingMode == ScanBufferingMode.NoBuffering)
{
frameSize = 0;
return;
}

frame = new GenericFrame<Key, Value>(frameSize, hlog.PageSize);
loaded = new CountdownEvent[frameSize];

// Only load addresses flushed to disk
if (nextAddress < hlog.HeadAddress)
{
var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize;
hlog.AsyncReadPagesFromDeviceToFrame
(nextAddress >> hlog.LogPageSizeBits,
1, endAddress, AsyncReadPagesCallback, Empty.Default,
frame, out loaded[frameNumber]);
}
if (frameSize > 0)
frame = new GenericFrame<Key, Value>(frameSize, hlog.PageSize);
}

/// <summary>
Expand Down Expand Up @@ -143,7 +94,7 @@ public bool GetNext(out RecordInfo recordInfo)
var offset = (currentAddress & hlog.PageSizeMask) / recordSize;

if (currentAddress < headAddress)
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize, headAddress, endAddress);

// Check if record fits on page, if not skip to next page
if ((currentAddress & hlog.PageSizeMask) + recordSize > hlog.PageSize)
Expand Down Expand Up @@ -211,65 +162,37 @@ public bool GetNext(out RecordInfo recordInfo, out Key key, out Value value)
return false;
}

private unsafe void BufferAndLoad(long currentAddress, long currentPage, long currentFrame)
{
if (first || (currentAddress & hlog.PageSizeMask) == 0)
{
// Prefetch pages based on buffering mode
if (frameSize == 1)
{
if (!first)
{
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
}
}
else
{
var endPage = endAddress >> hlog.LogPageSizeBits;
if ((endPage > currentPage) &&
((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0)))
{
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
}
}
first = false;
}
epoch?.Suspend();
loaded[currentFrame].Wait();
epoch?.Resume();
}

/// <summary>
/// Dispose iterator
/// </summary>
public void Dispose()
public override void Dispose()
{
if (loaded != null)
for (int i = 0; i < frameSize; i++)
loaded[i]?.Wait();

base.Dispose();
frame?.Dispose();
}

internal override void AsyncReadPagesFromDeviceToFrame<TContext>(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null)
=> hlog.AsyncReadPagesFromDeviceToFrame(readPageStart, numPages, untilAddress, AsyncReadPagesCallback, context, frame, out completed, devicePageOffset, device, objectLogDevice);

private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object context)
{
var result = (PageAsyncReadResult<Empty>)context;

if (errorCode != 0)
{
Trace.TraceError("AsyncReadPagesCallback error: {0}", errorCode);
result.cts?.Cancel();
}

var result = (PageAsyncReadResult<Empty>)context;

if (result.freeBuffer1 != null)
{
hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, ref frame.GetPage(result.page % frame.frameSize));
result.freeBuffer1.Return();
result.freeBuffer1 = null;
}

if (result.handle != null)
{
result.handle.Signal();
}
if (errorCode == 0)
result.handle?.Signal();

Interlocked.MemoryBarrier();
}
Expand Down
Loading

0 comments on commit e163614

Please sign in to comment.