Skip to content

Commit

Permalink
[C#] Do not hold epochs during a large flush operation (#797)
Browse files Browse the repository at this point in the history
* Do not hold epochs during a large flush operation, as it can block other threads.

* Cannot refresh during normal page-level flushes.

* Clear segment offsets only on FreePage (at the time of closing pages), not when clearing pages e.g., during recovery replay.

* Add separate tracking on snapshot flushed until address

* Updates

* update error on incompat ckpt

* minor fixes
  • Loading branch information
badrishc authored Feb 28, 2023
1 parent 1eb93fc commit bf55be6
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 80 deletions.
149 changes: 93 additions & 56 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -443,59 +443,80 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end
deltaLog.Allocate(out int entryLength, out long destPhysicalAddress);
int destOffset = 0;

for (long p = startPage; p < endPage; p++)
// We perform delta capture under epoch protection with page-wise refresh for latency reasons
bool epochTaken = false;
if (!epoch.ThisInstanceProtected())
{
// All RCU pages need to be added to delta
// For IPU-only pages, prune based on dirty bit
if ((p < prevEndPage || endAddress == prevEndAddress) && PageStatusIndicator[p % BufferSize].Dirty < version)
continue;
epochTaken = true;
epoch.Resume();
}

var logicalAddress = p << LogPageSizeBits;
var physicalAddress = GetPhysicalAddress(logicalAddress);
try
{
for (long p = startPage; p < endPage; p++)
{
// Check if we have the page safely available to process in memory
if (HeadAddress >= (p << LogPageSizeBits) + PageSize)
continue;

var endLogicalAddress = logicalAddress + PageSize;
if (endAddress < endLogicalAddress) endLogicalAddress = endAddress;
Debug.Assert(endLogicalAddress > logicalAddress);
var endPhysicalAddress = physicalAddress + (endLogicalAddress - logicalAddress);
// All RCU pages need to be added to delta
// For IPU-only pages, prune based on dirty bit
if ((p < prevEndPage || endAddress == prevEndAddress) && PageStatusIndicator[p % BufferSize].Dirty < version)
continue;

if (p == startPage)
{
physicalAddress += (int)(startAddress & PageSizeMask);
logicalAddress += (int)(startAddress & PageSizeMask);
}
var logicalAddress = p << LogPageSizeBits;
var physicalAddress = GetPhysicalAddress(logicalAddress);

while (physicalAddress < endPhysicalAddress)
{
ref var info = ref GetInfo(physicalAddress);
var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
if (info.Dirty)
var endLogicalAddress = logicalAddress + PageSize;
if (endAddress < endLogicalAddress) endLogicalAddress = endAddress;
Debug.Assert(endLogicalAddress > logicalAddress);
var endPhysicalAddress = physicalAddress + (endLogicalAddress - logicalAddress);

if (p == startPage)
{
info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic
int size = sizeof(long) + sizeof(int) + alignedRecordSize;
if (destOffset + size > entryLength)
physicalAddress += (int)(startAddress & PageSizeMask);
logicalAddress += (int)(startAddress & PageSizeMask);
}

while (physicalAddress < endPhysicalAddress)
{
ref var info = ref GetInfo(physicalAddress);
var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
if (info.Dirty)
{
deltaLog.Seal(destOffset);
deltaLog.Allocate(out entryLength, out destPhysicalAddress);
destOffset = 0;
info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic
int size = sizeof(long) + sizeof(int) + alignedRecordSize;
if (destOffset + size > entryLength)
{
deltaLog.Seal(0);
deltaLog.Seal(destOffset);
deltaLog.Allocate(out entryLength, out destPhysicalAddress);
destOffset = 0;
if (destOffset + size > entryLength)
{
deltaLog.Seal(0);
deltaLog.Allocate(out entryLength, out destPhysicalAddress);
}
if (destOffset + size > entryLength)
throw new FasterException("Insufficient page size to write delta");
}
if (destOffset + size > entryLength)
throw new FasterException("Insufficient page size to write delta");
*(long*)(destPhysicalAddress + destOffset) = logicalAddress;
destOffset += sizeof(long);
*(int*)(destPhysicalAddress + destOffset) = alignedRecordSize;
destOffset += sizeof(int);
Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize);
destOffset += alignedRecordSize;
}
*(long*)(destPhysicalAddress + destOffset) = logicalAddress;
destOffset += sizeof(long);
*(int*)(destPhysicalAddress + destOffset) = alignedRecordSize;
destOffset += sizeof(int);
Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize);
destOffset += alignedRecordSize;
physicalAddress += alignedRecordSize;
logicalAddress += alignedRecordSize;
}
physicalAddress += alignedRecordSize;
logicalAddress += alignedRecordSize;
epoch.ProtectAndDrain();
}
}
finally
{
if (epochTaken)
epoch.Suspend();
}

if (destOffset > 0)
deltaLog.Seal(destOffset);
Expand Down Expand Up @@ -1830,29 +1851,45 @@ public void AsyncFlushPages<TContext>(long flushPageStart, int numPages, DeviceI
/// <param name="completedSemaphore"></param>
public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, long fuzzyStartLogicalAddress, IDevice device, IDevice objectLogDevice, out SemaphoreSlim completedSemaphore)
{
int totalNumPages = (int)(endPage - startPage);
completedSemaphore = new SemaphoreSlim(0);
var flushCompletionTracker = new FlushCompletionTracker(completedSemaphore, totalNumPages);
var localSegmentOffsets = new long[SegmentBufferSize];
// We drop epoch protection to perform large scale writes to disk
bool epochDropped = false;
if (epoch.ThisInstanceProtected())
{
epochDropped = true;
epoch.Suspend();
}

for (long flushPage = startPage; flushPage < endPage; flushPage++)
try
{
long flushPageAddress = flushPage << LogPageSizeBits;
var pageSize = PageSize;
if (flushPage == endPage - 1)
pageSize = (int)(endLogicalAddress - flushPageAddress);
int totalNumPages = (int)(endPage - startPage);
completedSemaphore = new SemaphoreSlim(0);
var flushCompletionTracker = new FlushCompletionTracker(completedSemaphore, totalNumPages);
var localSegmentOffsets = new long[SegmentBufferSize];

var asyncResult = new PageAsyncFlushResult<Empty>
for (long flushPage = startPage; flushPage < endPage; flushPage++)
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushPageAddress,
untilAddress = flushPageAddress + pageSize,
count = 1
};
long flushPageAddress = flushPage << LogPageSizeBits;
var pageSize = PageSize;
if (flushPage == endPage - 1)
pageSize = (int)(endLogicalAddress - flushPageAddress);

// Intended destination is flushPage
WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets, fuzzyStartLogicalAddress);
var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushPageAddress,
untilAddress = flushPageAddress + pageSize,
count = 1
};

// Intended destination is flushPage
WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets, fuzzyStartLogicalAddress);
}
}
finally
{
if (epochDropped)
epoch.Resume();
}
}

Expand Down
21 changes: 11 additions & 10 deletions cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,19 @@ protected override void WriteAsyncToDevice<TContext>
}
try
{
if (FlushedUntilAddress < (flushPage << LogPageSizeBits) + pageSize)
if (HeadAddress >= (flushPage << LogPageSizeBits) + pageSize)
{
// Requested page is unavailable in memory, ignore
callback(0, 0, asyncResult);
}
else
{
// We are writing to separate device, so use fresh segment offsets
WriteAsync(flushPage,
(ulong)(AlignedPageSizeBytes * (flushPage - startPage)),
(uint)pageSize, callback, asyncResult,
device, objectLogDevice, flushPage, localSegmentOffsets, fuzzyStartLogicalAddress);
}
else
{
// Requested page is already flushed to main log, ignore
callback(0, 0, asyncResult);
}
}
finally
{
Expand All @@ -315,6 +315,11 @@ protected override void WriteAsyncToDevice<TContext>
internal override void ClearPage(long page, int offset)
{
Array.Clear(values[page % BufferSize], offset / recordSize, values[page % BufferSize].Length - offset / recordSize);
}

internal override void FreePage(long page)
{
ClearPage(page, 0);

// Close segments
var thisCloseSegment = page >> (LogSegmentSizeBits - LogPageSizeBits);
Expand All @@ -325,11 +330,7 @@ internal override void ClearPage(long page, int offset)
// We are clearing the last page in current segment
segmentOffsets[thisCloseSegment % SegmentBufferSize] = 0;
}
}

internal override void FreePage(long page)
{
ClearPage(page, 0);
if (EmptyPageCount > 0)
{
overflowPagePool.TryAdd(values[page % BufferSize]);
Expand Down
22 changes: 16 additions & 6 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public struct CommitPoint
/// </summary>
public struct HybridLogRecoveryInfo
{
const int CheckpointVersion = 4;
const int CheckpointVersion = 5;

/// <summary>
/// Guid
Expand All @@ -381,10 +381,14 @@ public struct HybridLogRecoveryInfo
/// </summary>
public long nextVersion;
/// <summary>
/// Flushed logical address; indicates the latest immutable address on the main FASTER log at recovery time.
/// Flushed logical address; indicates the latest immutable address on the main FASTER log at checkpoint commit time.
/// </summary>
public long flushedLogicalAddress;
/// <summary>
/// Flushed logical address at snapshot start; indicates device offset for snapshot file
/// </summary>
public long snapshotStartFlushedLogicalAddress;
/// <summary>
/// Start logical address
/// </summary>
public long startLogicalAddress;
Expand Down Expand Up @@ -454,6 +458,7 @@ public void Initialize(Guid token, long _version)
useSnapshotFile = 0;
version = _version;
flushedLogicalAddress = 0;
snapshotStartFlushedLogicalAddress = 0;
startLogicalAddress = 0;
finalLogicalAddress = 0;
snapshotFinalLogicalAddress = 0;
Expand All @@ -476,6 +481,9 @@ public void Initialize(StreamReader reader)
string value = reader.ReadLine();
var cversion = int.Parse(value);

if (cversion != CheckpointVersion)
throw new FasterException($"Invalid checkpoint version {cversion} encountered, current version is {CheckpointVersion}, cannot recover with this checkpoint");

value = reader.ReadLine();
var checksum = long.Parse(value);

Expand All @@ -494,6 +502,9 @@ public void Initialize(StreamReader reader)
value = reader.ReadLine();
flushedLogicalAddress = long.Parse(value);

value = reader.ReadLine();
snapshotStartFlushedLogicalAddress = long.Parse(value);

value = reader.ReadLine();
startLogicalAddress = long.Parse(value);

Expand Down Expand Up @@ -556,9 +567,6 @@ public void Initialize(StreamReader reader)
}
}

if (cversion != CheckpointVersion)
throw new FasterException("Invalid version");

if (checksum != Checksum(continueTokens.Count))
throw new FasterException("Invalid checksum for checkpoint");
}
Expand Down Expand Up @@ -624,6 +632,7 @@ public byte[] ToByteArray()
writer.WriteLine(version);
writer.WriteLine(nextVersion);
writer.WriteLine(flushedLogicalAddress);
writer.WriteLine(snapshotStartFlushedLogicalAddress);
writer.WriteLine(startLogicalAddress);
writer.WriteLine(finalLogicalAddress);
writer.WriteLine(snapshotFinalLogicalAddress);
Expand Down Expand Up @@ -662,7 +671,7 @@ private readonly long Checksum(int checkpointTokensCount)
var bytes = guid.ToByteArray();
var long1 = BitConverter.ToInt64(bytes, 0);
var long2 = BitConverter.ToInt64(bytes, 8);
return long1 ^ long2 ^ version ^ flushedLogicalAddress ^ startLogicalAddress ^ finalLogicalAddress ^ snapshotFinalLogicalAddress ^ headAddress ^ beginAddress
return long1 ^ long2 ^ version ^ flushedLogicalAddress ^ snapshotStartFlushedLogicalAddress ^ startLogicalAddress ^ finalLogicalAddress ^ snapshotFinalLogicalAddress ^ headAddress ^ beginAddress
^ checkpointTokensCount ^ (objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length);
}

Expand All @@ -676,6 +685,7 @@ public readonly void DebugPrint(ILogger logger)
logger?.LogInformation("Next Version: {nextVersion}", nextVersion);
logger?.LogInformation("Is Snapshot?: {useSnapshotFile}", useSnapshotFile == 1);
logger?.LogInformation("Flushed LogicalAddress: {flushedLogicalAddress}", flushedLogicalAddress);
logger?.LogInformation("SnapshotStart Flushed LogicalAddress: {snapshotStartFlushedLogicalAddress}", snapshotStartFlushedLogicalAddress);
logger?.LogInformation("Start Logical Address: {startLogicalAddress}", startLogicalAddress);
logger?.LogInformation("Final Logical Address: {finalLogicalAddress}", finalLogicalAddress);
logger?.LogInformation("Snapshot Final Logical Address: {snapshotFinalLogicalAddress}", snapshotFinalLogicalAddress);
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/FASTER/Implementation/FindRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private bool TryFindRecordInMainLog(ref Key key, ref OperationStackContext<Key,
private bool TraceBackForKeyMatch(ref Key key, ref RecordSource<Key, Value> recSrc, long minOffset, bool waitForTentative = true)
{
ref var recordInfo = ref hlog.GetInfo(recSrc.PhysicalAddress);
if (comparer.Equals(ref key, ref hlog.GetKey(recSrc.PhysicalAddress)) && !recordInfo.Invalid)
if (!recordInfo.Invalid && comparer.Equals(ref key, ref hlog.GetKey(recSrc.PhysicalAddress)))
{
if (!waitForTentative || SpinWaitWhileTentativeAndReturnValidity(ref recordInfo))
return recSrc.HasMainLogSrc = true;
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private long InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
// First recover from index starting point (fromAddress) to snapshot starting point (flushedLogicalAddress)
RecoverHybridLog(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.Snapshot, options);
// Then recover snapshot into mutable region
RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.flushedLogicalAddress,
RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.snapshotStartFlushedLogicalAddress,
recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, options, recoveredHLCInfo.deltaLog, recoverTo);

readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress;
Expand Down Expand Up @@ -423,7 +423,7 @@ await RecoverHybridLogAsync(scanFromAddress, recoverFromAddress, recoveredHLCInf
await RecoverHybridLogAsync(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.Snapshot,
new RecoveryOptions(headAddress, recoveredHLCInfo.info.startLogicalAddress, undoNextVersion), cancellationToken).ConfigureAwait(false);
// Then recover snapshot into mutable region
await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.flushedLogicalAddress,
await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.snapshotStartFlushedLogicalAddress,
recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, options, recoveredHLCInfo.deltaLog, recoverTo, cancellationToken).ConfigureAwait(false);

readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress;
Expand Down
Loading

0 comments on commit bf55be6

Please sign in to comment.