Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Log compaction fix in presence of concurrent updates #406

Merged
merged 10 commits into from
Feb 24, 2021
1 change: 0 additions & 1 deletion cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ public override long[] GetSegmentOffsets()
internal override void PopulatePage(byte* src, int required_bytes, long destinationPage)
{
throw new FasterException("BlittableAllocator memory pages are sector aligned - use direct copy");
// Buffer.MemoryCopy(src, (void*)pointers[destinationPage % BufferSize], required_bytes, required_bytes);
}

/// <summary>
Expand Down
101 changes: 16 additions & 85 deletions cs/src/core/Allocator/BlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,16 @@ namespace FASTER.core
/// <summary>
/// Scan iterator for hybrid log
/// </summary>
public sealed class BlittableScanIterator<Key, Value> : IFasterScanIterator<Key, Value>
public sealed class BlittableScanIterator<Key, Value> : ScanIteratorBase, IFasterScanIterator<Key, Value>
{
private readonly int frameSize;
private readonly BlittableAllocator<Key, Value> hlog;
private readonly long endAddress;
private readonly BlittableFrame frame;
private readonly CountdownEvent[] loaded;
private readonly LightEpoch epoch;
private readonly bool forceInMemory;

private bool first = true;
private long currentAddress, nextAddress;
private Key currentKey;
private Value currentValue;
private long currentPhysicalAddress;

/// <summary>
/// Current address
/// </summary>
public long CurrentAddress => currentAddress;

/// <summary>
/// Next address
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// Constructor
/// </summary>
Expand All @@ -45,44 +29,14 @@ public sealed class BlittableScanIterator<Key, Value> : IFasterScanIterator<Key,
/// <param name="scanBufferingMode"></param>
/// <param name="epoch"></param>
/// <param name="forceInMemory">Provided address range is known by caller to be in memory, even if less than HeadAddress</param>
public unsafe BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch, bool forceInMemory = false)
public BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch, bool forceInMemory = false)
: base(beginAddress == 0 ? hlog.GetFirstValidLogicalAddress(0) : beginAddress, endAddress, scanBufferingMode, epoch, hlog.LogPageSizeBits)
{
this.hlog = hlog;
this.forceInMemory = forceInMemory;

// If we are protected when creating the iterator, we do not need per-GetNext protection
if (!epoch.ThisInstanceProtected())
this.epoch = epoch;

if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

this.endAddress = endAddress;
currentAddress = -1;
nextAddress = beginAddress;

if (scanBufferingMode == ScanBufferingMode.SinglePageBuffering)
frameSize = 1;
else if (scanBufferingMode == ScanBufferingMode.DoublePageBuffering)
frameSize = 2;
else if (scanBufferingMode == ScanBufferingMode.NoBuffering)
{
frameSize = 0;
return;
}

frame = new BlittableFrame(frameSize, hlog.PageSize, hlog.GetDeviceSectorSize());
loaded = new CountdownEvent[frameSize];

// Only load addresses flushed to disk
if (nextAddress < hlog.HeadAddress && !forceInMemory)
{
var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize;
hlog.AsyncReadPagesFromDeviceToFrame
(nextAddress >> hlog.LogPageSizeBits,
1, endAddress, AsyncReadPagesCallback, Empty.Default,
frame, out loaded[frameNumber]);
}
if (frameSize > 0)
frame = new BlittableFrame(frameSize, hlog.PageSize, hlog.GetDeviceSectorSize());
}

/// <summary>
Expand Down Expand Up @@ -145,7 +99,7 @@ public bool GetNext(out RecordInfo recordInfo)
var offset = currentAddress & hlog.PageSizeMask;

if (currentAddress < headAddress && !forceInMemory)
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize, headAddress, endAddress);

long physicalAddress;
if (currentAddress >= headAddress || forceInMemory)
Expand Down Expand Up @@ -210,60 +164,37 @@ public bool GetNext(out RecordInfo recordInfo, out Key key, out Value value)
}

/// <summary>
/// Dispose the iterator
/// Dispose iterator
/// </summary>
public void Dispose()
public override void Dispose()
{
base.Dispose();
frame?.Dispose();
}

private unsafe void BufferAndLoad(long currentAddress, long currentPage, long currentFrame)
{
if (first || (currentAddress & hlog.PageSizeMask) == 0)
{
// Prefetch pages based on buffering mode
if (frameSize == 1)
{
if (!first)
{
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
}
}
else
{
var endPage = endAddress >> hlog.LogPageSizeBits;
if ((endPage > currentPage) &&
((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0)))
{
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
}
}
first = false;
}
epoch?.Suspend();
loaded[currentFrame].Wait();
epoch?.Resume();
}
internal override void AsyncReadPagesFromDeviceToFrame<TContext>(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null)
=> hlog.AsyncReadPagesFromDeviceToFrame(readPageStart, numPages, untilAddress, AsyncReadPagesCallback, context, frame, out completed, devicePageOffset, device, objectLogDevice);

private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object context)
{
var result = (PageAsyncReadResult<Empty>)context;

if (errorCode != 0)
{
Trace.TraceError("AsyncReadPagesCallback error: {0}", errorCode);
result.cts?.Cancel();
}

var result = (PageAsyncReadResult<Empty>)context;

if (result.freeBuffer1 != null)
{
hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, result.page);
result.freeBuffer1.Return();
result.freeBuffer1 = null;
}

if (result.handle != null)
if (errorCode == 0)
{
result.handle.Signal();
result.handle?.Signal();
}

Interlocked.MemoryBarrier();
Expand Down
111 changes: 17 additions & 94 deletions cs/src/core/Allocator/GenericScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,21 @@

using System.Threading;
using System.Diagnostics;
using System;

namespace FASTER.core
{
/// <summary>
/// Scan iterator for hybrid log
/// </summary>
public sealed class GenericScanIterator<Key, Value> : IFasterScanIterator<Key, Value>
public sealed class GenericScanIterator<Key, Value> : ScanIteratorBase, IFasterScanIterator<Key, Value>
{
private readonly int frameSize;
private readonly GenericAllocator<Key, Value> hlog;
private readonly long endAddress;
private readonly GenericFrame<Key, Value> frame;
private readonly CountdownEvent[] loaded;
private readonly int recordSize;
private readonly LightEpoch epoch;

private bool first = true;
private long currentAddress, nextAddress;
private Key currentKey;
private Value currentValue;

/// <summary>
/// Current address
/// </summary>
public long CurrentAddress => currentAddress;

/// <summary>
/// Next address
/// </summary>
public long NextAddress => nextAddress;

/// <summary>
/// Constructor
/// </summary>
Expand All @@ -43,45 +26,13 @@ public sealed class GenericScanIterator<Key, Value> : IFasterScanIterator<Key, V
/// <param name="endAddress"></param>
/// <param name="scanBufferingMode"></param>
/// <param name="epoch"></param>
public unsafe GenericScanIterator(GenericAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
public GenericScanIterator(GenericAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
: base(beginAddress == 0 ? hlog.GetFirstValidLogicalAddress(0) : beginAddress, endAddress, scanBufferingMode, epoch, hlog.LogPageSizeBits)
{
this.hlog = hlog;

// If we are protected when creating the iterator, we do not need per-GetNext protection
if (!epoch.ThisInstanceProtected())
this.epoch = epoch;

if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

this.endAddress = endAddress;

recordSize = hlog.GetRecordSize(0).Item2;
currentAddress = -1;
nextAddress = beginAddress;

if (scanBufferingMode == ScanBufferingMode.SinglePageBuffering)
frameSize = 1;
else if (scanBufferingMode == ScanBufferingMode.DoublePageBuffering)
frameSize = 2;
else if (scanBufferingMode == ScanBufferingMode.NoBuffering)
{
frameSize = 0;
return;
}

frame = new GenericFrame<Key, Value>(frameSize, hlog.PageSize);
loaded = new CountdownEvent[frameSize];

// Only load addresses flushed to disk
if (nextAddress < hlog.HeadAddress)
{
var frameNumber = (nextAddress >> hlog.LogPageSizeBits) % frameSize;
hlog.AsyncReadPagesFromDeviceToFrame
(nextAddress >> hlog.LogPageSizeBits,
1, endAddress, AsyncReadPagesCallback, Empty.Default,
frame, out loaded[frameNumber]);
}
if (frameSize > 0)
frame = new GenericFrame<Key, Value>(frameSize, hlog.PageSize);
}

/// <summary>
Expand Down Expand Up @@ -143,7 +94,7 @@ public bool GetNext(out RecordInfo recordInfo)
var offset = (currentAddress & hlog.PageSizeMask) / recordSize;

if (currentAddress < headAddress)
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize, headAddress, endAddress);

// Check if record fits on page, if not skip to next page
if ((currentAddress & hlog.PageSizeMask) + recordSize > hlog.PageSize)
Expand Down Expand Up @@ -211,65 +162,37 @@ public bool GetNext(out RecordInfo recordInfo, out Key key, out Value value)
return false;
}

private unsafe void BufferAndLoad(long currentAddress, long currentPage, long currentFrame)
{
if (first || (currentAddress & hlog.PageSizeMask) == 0)
{
// Prefetch pages based on buffering mode
if (frameSize == 1)
{
if (!first)
{
hlog.AsyncReadPagesFromDeviceToFrame(currentAddress >> hlog.LogPageSizeBits, 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[currentFrame]);
}
}
else
{
var endPage = endAddress >> hlog.LogPageSizeBits;
if ((endPage > currentPage) &&
((endPage > currentPage + 1) || ((endAddress & hlog.PageSizeMask) != 0)))
{
hlog.AsyncReadPagesFromDeviceToFrame(1 + (currentAddress >> hlog.LogPageSizeBits), 1, endAddress, AsyncReadPagesCallback, Empty.Default, frame, out loaded[(currentPage + 1) % frameSize]);
}
}
first = false;
}
epoch?.Suspend();
loaded[currentFrame].Wait();
epoch?.Resume();
}

/// <summary>
/// Dispose iterator
/// </summary>
public void Dispose()
public override void Dispose()
{
if (loaded != null)
for (int i = 0; i < frameSize; i++)
loaded[i]?.Wait();

base.Dispose();
frame?.Dispose();
}

internal override void AsyncReadPagesFromDeviceToFrame<TContext>(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null)
=> hlog.AsyncReadPagesFromDeviceToFrame(readPageStart, numPages, untilAddress, AsyncReadPagesCallback, context, frame, out completed, devicePageOffset, device, objectLogDevice);

private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object context)
{
var result = (PageAsyncReadResult<Empty>)context;

if (errorCode != 0)
{
Trace.TraceError("AsyncReadPagesCallback error: {0}", errorCode);
result.cts?.Cancel();
}

var result = (PageAsyncReadResult<Empty>)context;

if (result.freeBuffer1 != null)
{
hlog.PopulatePage(result.freeBuffer1.GetValidPointer(), result.freeBuffer1.required_bytes, ref frame.GetPage(result.page % frame.frameSize));
result.freeBuffer1.Return();
result.freeBuffer1 = null;
}

if (result.handle != null)
{
result.handle.Signal();
}
if (errorCode == 0)
result.handle?.Signal();

Interlocked.MemoryBarrier();
}
Expand Down
Loading