Skip to content

Commit

Permalink
[C#] CompleteCheckpointAsync wait fix, scan epoch protection (#369)
Browse files Browse the repository at this point in the history
* Make sure CompleteCheckpointAsync returns only when the state machine is back to REST.
* Use epoch protection with GenericScanIterator, for correctness when concurrently scanning in-memory part of log.
* Epoch protection for scan using BlittableScanIterator.
* Completing the fix, added debug assert for epoch invariant
  • Loading branch information
badrishc authored Nov 18, 2020
1 parent e260e85 commit 0774c41
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ internal override void PopulatePage(byte* src, int required_bytes, long destinat
/// <returns></returns>
public override IFasterScanIterator<Key, Value> Scan(long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode)
{
return new BlittableScanIterator<Key, Value>(this, beginAddress, endAddress, scanBufferingMode);
return new BlittableScanIterator<Key, Value>(this, beginAddress, endAddress, scanBufferingMode, epoch);
}


Expand Down
47 changes: 39 additions & 8 deletions cs/src/core/Allocator/BlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ public sealed class BlittableScanIterator<Key, Value> : IFasterScanIterator<Key,
private readonly long endAddress;
private readonly BlittableFrame frame;
private readonly CountdownEvent[] loaded;
private readonly LightEpoch epoch;

private bool first = true;
private long currentAddress, nextAddress;
private Key currentKey;
private Value currentValue;
private long currentPhysicalAddress;

/// <summary>
Expand All @@ -39,10 +42,15 @@ public sealed class BlittableScanIterator<Key, Value> : IFasterScanIterator<Key,
/// <param name="beginAddress"></param>
/// <param name="endAddress"></param>
/// <param name="scanBufferingMode"></param>
public unsafe BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode)
/// <param name="epoch"></param>
public unsafe BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
{
this.hlog = hlog;

// If we are protected when creating the iterator, we do not need per-GetNext protection
if (!epoch.ThisInstanceProtected())
this.epoch = epoch;

if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

Expand Down Expand Up @@ -80,7 +88,9 @@ public unsafe BlittableScanIterator(BlittableAllocator<Key, Value> hlog, long be
/// <returns></returns>
public ref Key GetKey()
{
return ref hlog.GetKey(currentPhysicalAddress);
if (currentPhysicalAddress != 0)
return ref hlog.GetKey(currentPhysicalAddress);
return ref currentKey;
}

/// <summary>
Expand All @@ -89,7 +99,9 @@ public ref Key GetKey()
/// <returns></returns>
public ref Value GetValue()
{
return ref hlog.GetValue(currentPhysicalAddress);
if (currentPhysicalAddress != 0)
return ref hlog.GetValue(currentPhysicalAddress);
return ref currentValue;
}

/// <summary>
Expand All @@ -111,33 +123,44 @@ public bool GetNext(out RecordInfo recordInfo)
return false;
}

epoch?.Resume();
var headAddress = hlog.HeadAddress;

if (currentAddress < hlog.BeginAddress)
{
epoch?.Suspend();
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
}

if (frameSize == 0 && currentAddress < hlog.HeadAddress)
if (frameSize == 0 && currentAddress < headAddress)
{
epoch?.Suspend();
throw new FasterException("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> hlog.LogPageSizeBits;
var offset = currentAddress & hlog.PageSizeMask;

if (currentAddress < hlog.HeadAddress)
if (currentAddress < headAddress)
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);

long physicalAddress;
if (currentAddress >= hlog.HeadAddress)
if (currentAddress >= headAddress)
{
physicalAddress = hlog.GetPhysicalAddress(currentAddress);
currentPhysicalAddress = 0;
}
else
physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset);
{
currentPhysicalAddress = physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset);
}

// Check if record fits on page, if not skip to next page
var recordSize = hlog.GetRecordSize(physicalAddress).Item2;
if ((currentAddress & hlog.PageSizeMask) + recordSize > hlog.PageSize)
{
nextAddress = (1 + (currentAddress >> hlog.LogPageSizeBits)) << hlog.LogPageSizeBits;
epoch?.Suspend();
continue;
}

Expand All @@ -146,11 +169,17 @@ public bool GetNext(out RecordInfo recordInfo)
ref var info = ref hlog.GetInfo(physicalAddress);
if (info.Invalid || info.IsNull())
{
epoch?.Suspend();
continue;
}

currentPhysicalAddress = physicalAddress;
recordInfo = info;
if (currentPhysicalAddress == 0)
{
currentKey = hlog.GetKey(physicalAddress);
currentValue = hlog.GetValue(physicalAddress);
}
epoch?.Suspend();
return true;
}
}
Expand Down Expand Up @@ -208,7 +237,9 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu
}
first = false;
}
epoch?.Suspend();
loaded[currentFrame].Wait();
epoch?.Resume();
}

private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object context)
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ internal void PopulatePage(byte* src, int required_bytes, ref Record<Key, Value>
/// <returns></returns>
public override IFasterScanIterator<Key, Value> Scan(long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode)
{
return new GenericScanIterator<Key, Value>(this, beginAddress, endAddress, scanBufferingMode);
return new GenericScanIterator<Key, Value>(this, beginAddress, endAddress, scanBufferingMode, epoch);
}
}
}
30 changes: 26 additions & 4 deletions cs/src/core/Allocator/GenericScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public sealed class GenericScanIterator<Key, Value> : IFasterScanIterator<Key, V
private readonly GenericFrame<Key, Value> frame;
private readonly CountdownEvent[] loaded;
private readonly int recordSize;
private readonly LightEpoch epoch;

private bool first = true;
private long currentAddress, nextAddress;
Expand All @@ -41,10 +42,15 @@ public sealed class GenericScanIterator<Key, Value> : IFasterScanIterator<Key, V
/// <param name="beginAddress"></param>
/// <param name="endAddress"></param>
/// <param name="scanBufferingMode"></param>
public unsafe GenericScanIterator(GenericAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode)
/// <param name="epoch"></param>
public unsafe GenericScanIterator(GenericAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
{
this.hlog = hlog;

// If we are protected when creating the iterator, we do not need per-GetNext protection
if (!epoch.ThisInstanceProtected())
this.epoch = epoch;

if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

Expand Down Expand Up @@ -117,54 +123,68 @@ public bool GetNext(out RecordInfo recordInfo)
return false;
}

epoch?.Resume();
var headAddress = hlog.HeadAddress;

if (currentAddress < hlog.BeginAddress)
{
epoch?.Suspend();
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
}

if (frameSize == 0 && currentAddress < hlog.HeadAddress)
if (frameSize == 0 && currentAddress < headAddress)
{
epoch?.Suspend();
throw new FasterException("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> hlog.LogPageSizeBits;

var offset = (currentAddress & hlog.PageSizeMask) / recordSize;

if (currentAddress < hlog.HeadAddress)
if (currentAddress < headAddress)
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);

// Check if record fits on page, if not skip to next page
if ((currentAddress & hlog.PageSizeMask) + recordSize > hlog.PageSize)
{
nextAddress = (1 + (currentAddress >> hlog.LogPageSizeBits)) << hlog.LogPageSizeBits;
epoch?.Suspend();
continue;
}

nextAddress = currentAddress + recordSize;

if (currentAddress >= hlog.HeadAddress)
if (currentAddress >= headAddress)
{
// Read record from cached page memory
var page = currentPage % hlog.BufferSize;

if (hlog.values[page][offset].info.Invalid)
{
epoch?.Suspend();
continue;
}

recordInfo = hlog.values[page][offset].info;
currentKey = hlog.values[page][offset].key;
currentValue = hlog.values[page][offset].value;
epoch?.Suspend();
return true;
}

var currentFrame = currentPage % frameSize;

if (frame.GetInfo(currentFrame, offset).Invalid)
{
epoch?.Suspend();
continue;
}

recordInfo = frame.GetInfo(currentFrame, offset);
currentKey = frame.GetKey(currentFrame, offset);
currentValue = frame.GetValue(currentFrame, offset);
epoch?.Suspend();
return true;
}
}
Expand Down Expand Up @@ -214,7 +234,9 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu
}
first = false;
}
epoch?.Suspend();
loaded[currentFrame].Wait();
epoch?.Resume();
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/VarLenBlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ internal override void PopulatePage(byte* src, int required_bytes, long destinat
/// <returns></returns>
public override IFasterScanIterator<Key, Value> Scan(long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode)
{
return new VariableLengthBlittableScanIterator<Key, Value>(this, beginAddress, endAddress, scanBufferingMode);
return new VariableLengthBlittableScanIterator<Key, Value>(this, beginAddress, endAddress, scanBufferingMode, epoch);
}


Expand Down
38 changes: 32 additions & 6 deletions cs/src/core/Allocator/VarLenBlittableScanIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public sealed class VariableLengthBlittableScanIterator<Key, Value> : IFasterSca
private readonly long endAddress;
private readonly BlittableFrame frame;
private readonly CountdownEvent[] loaded;
private readonly LightEpoch epoch;
private SectorAlignedMemory memory;

private bool first = true;
private long currentAddress, nextAddress;
Expand All @@ -39,10 +41,15 @@ public sealed class VariableLengthBlittableScanIterator<Key, Value> : IFasterSca
/// <param name="beginAddress"></param>
/// <param name="endAddress"></param>
/// <param name="scanBufferingMode"></param>
public unsafe VariableLengthBlittableScanIterator(VariableLengthBlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode)
/// <param name="epoch"></param>
public unsafe VariableLengthBlittableScanIterator(VariableLengthBlittableAllocator<Key, Value> hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch)
{
this.hlog = hlog;

// If we are protected when creating the iterator, we do not need per-GetNext protection
if (!epoch.ThisInstanceProtected())
this.epoch = epoch;

if (beginAddress == 0)
beginAddress = hlog.GetFirstValidLogicalAddress(0);

Expand Down Expand Up @@ -97,7 +104,7 @@ public ref Value GetValue()
/// </summary>
/// <param name="recordInfo"></param>
/// <returns></returns>
public bool GetNext(out RecordInfo recordInfo)
public unsafe bool GetNext(out RecordInfo recordInfo)
{
recordInfo = default;

Expand All @@ -111,24 +118,29 @@ public bool GetNext(out RecordInfo recordInfo)
return false;
}

epoch?.Resume();
var headAddress = hlog.HeadAddress;

if (currentAddress < hlog.BeginAddress)
{
epoch?.Suspend();
throw new FasterException("Iterator address is less than log BeginAddress " + hlog.BeginAddress);
}

if (frameSize == 0 && currentAddress < hlog.HeadAddress)
if (frameSize == 0 && currentAddress < headAddress)
{
epoch?.Suspend();
throw new FasterException("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> hlog.LogPageSizeBits;
var offset = currentAddress & hlog.PageSizeMask;

if (currentAddress < hlog.HeadAddress)
if (currentAddress < headAddress)
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);

long physicalAddress;
if (currentAddress >= hlog.HeadAddress)
if (currentAddress >= headAddress)
physicalAddress = hlog.GetPhysicalAddress(currentAddress);
else
physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset);
Expand All @@ -138,6 +150,7 @@ public bool GetNext(out RecordInfo recordInfo)
if ((currentAddress & hlog.PageSizeMask) + recordSize > hlog.PageSize)
{
nextAddress = (1 + (currentAddress >> hlog.LogPageSizeBits)) << hlog.LogPageSizeBits;
epoch?.Suspend();
continue;
}

Expand All @@ -146,11 +159,20 @@ public bool GetNext(out RecordInfo recordInfo)
ref var info = ref hlog.GetInfo(physicalAddress);
if (info.Invalid || info.IsNull())
{
epoch?.Suspend();
continue;
}

currentPhysicalAddress = physicalAddress;
recordInfo = info;
if (currentAddress >= headAddress)
{
memory?.Return();
memory = hlog.bufferPool.Get(recordSize);
Buffer.MemoryCopy((byte*)currentPhysicalAddress, memory.aligned_pointer, recordSize, recordSize);
currentPhysicalAddress = (long)memory.aligned_pointer;
}
epoch?.Suspend();
return true;
}
}
Expand All @@ -172,6 +194,8 @@ public bool GetNext(out RecordInfo recordInfo, out Key key, out Value value)
/// </summary>
public void Dispose()
{
memory?.Return();
memory = null;
frame?.Dispose();
}

Expand All @@ -198,7 +222,9 @@ private unsafe void BufferAndLoad(long currentAddress, long currentPage, long cu
}
first = false;
}
epoch?.Suspend();
loaded[currentFrame].Wait();
epoch?.Resume();
}

private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object context)
Expand Down
3 changes: 3 additions & 0 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ private void Acquire()
private void Release()
{
int entry = threadEntryIndex;

Debug.Assert((*(tableAligned + entry)).localCurrentEpoch != 0, "Trying to release unprotected epoch");

(*(tableAligned + entry)).localCurrentEpoch = 0;
(*(tableAligned + entry)).threadId = 0;

Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default
ThreadStateMachineStep<Empty, Empty, Empty, NullFasterSession>(null, NullFasterSession.Instance, valueTasks, token);

if (valueTasks.Count == 0)
break;
continue; // we need to re-check loop, so we return only when we are at REST

foreach (var task in valueTasks)
{
Expand Down

0 comments on commit 0774c41

Please sign in to comment.