diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index b5768ffe2..2a53f7b36 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -443,59 +443,80 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end deltaLog.Allocate(out int entryLength, out long destPhysicalAddress); int destOffset = 0; - for (long p = startPage; p < endPage; p++) + // We perform delta capture under epoch protection with page-wise refresh for latency reasons + bool epochTaken = false; + if (!epoch.ThisInstanceProtected()) { - // All RCU pages need to be added to delta - // For IPU-only pages, prune based on dirty bit - if ((p < prevEndPage || endAddress == prevEndAddress) && PageStatusIndicator[p % BufferSize].Dirty < version) - continue; + epochTaken = true; + epoch.Resume(); + } - var logicalAddress = p << LogPageSizeBits; - var physicalAddress = GetPhysicalAddress(logicalAddress); + try + { + for (long p = startPage; p < endPage; p++) + { + // Check if we have the page safely available to process in memory + if (HeadAddress >= (p << LogPageSizeBits) + PageSize) + continue; - var endLogicalAddress = logicalAddress + PageSize; - if (endAddress < endLogicalAddress) endLogicalAddress = endAddress; - Debug.Assert(endLogicalAddress > logicalAddress); - var endPhysicalAddress = physicalAddress + (endLogicalAddress - logicalAddress); + // All RCU pages need to be added to delta + // For IPU-only pages, prune based on dirty bit + if ((p < prevEndPage || endAddress == prevEndAddress) && PageStatusIndicator[p % BufferSize].Dirty < version) + continue; - if (p == startPage) - { - physicalAddress += (int)(startAddress & PageSizeMask); - logicalAddress += (int)(startAddress & PageSizeMask); - } + var logicalAddress = p << LogPageSizeBits; + var physicalAddress = GetPhysicalAddress(logicalAddress); - while (physicalAddress < endPhysicalAddress) - { - ref var info = ref GetInfo(physicalAddress); - var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress); - if (info.Dirty) + var endLogicalAddress = logicalAddress + PageSize; + if (endAddress < endLogicalAddress) endLogicalAddress = endAddress; + Debug.Assert(endLogicalAddress > logicalAddress); + var endPhysicalAddress = physicalAddress + (endLogicalAddress - logicalAddress); + + if (p == startPage) { - info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic - int size = sizeof(long) + sizeof(int) + alignedRecordSize; - if (destOffset + size > entryLength) + physicalAddress += (int)(startAddress & PageSizeMask); + logicalAddress += (int)(startAddress & PageSizeMask); + } + + while (physicalAddress < endPhysicalAddress) + { + ref var info = ref GetInfo(physicalAddress); + var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress); + if (info.Dirty) { - deltaLog.Seal(destOffset); - deltaLog.Allocate(out entryLength, out destPhysicalAddress); - destOffset = 0; + info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic + int size = sizeof(long) + sizeof(int) + alignedRecordSize; if (destOffset + size > entryLength) { - deltaLog.Seal(0); + deltaLog.Seal(destOffset); deltaLog.Allocate(out entryLength, out destPhysicalAddress); + destOffset = 0; + if (destOffset + size > entryLength) + { + deltaLog.Seal(0); + deltaLog.Allocate(out entryLength, out destPhysicalAddress); + } + if (destOffset + size > entryLength) + throw new FasterException("Insufficient page size to write delta"); } - if (destOffset + size > entryLength) - throw new FasterException("Insufficient page size to write delta"); + *(long*)(destPhysicalAddress + destOffset) = logicalAddress; + destOffset += sizeof(long); + *(int*)(destPhysicalAddress + destOffset) = alignedRecordSize; + destOffset += sizeof(int); + Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize); + destOffset += alignedRecordSize; } - *(long*)(destPhysicalAddress + destOffset) = logicalAddress; - destOffset += sizeof(long); - *(int*)(destPhysicalAddress + destOffset) = alignedRecordSize; - destOffset += sizeof(int); - Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize); - destOffset += alignedRecordSize; + physicalAddress += alignedRecordSize; + logicalAddress += alignedRecordSize; } - physicalAddress += alignedRecordSize; - logicalAddress += alignedRecordSize; + epoch.ProtectAndDrain(); } } + finally + { + if (epochTaken) + epoch.Suspend(); + } if (destOffset > 0) deltaLog.Seal(destOffset); @@ -1830,29 +1851,45 @@ public void AsyncFlushPages(long flushPageStart, int numPages, DeviceI /// public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, long fuzzyStartLogicalAddress, IDevice device, IDevice objectLogDevice, out SemaphoreSlim completedSemaphore) { - int totalNumPages = (int)(endPage - startPage); - completedSemaphore = new SemaphoreSlim(0); - var flushCompletionTracker = new FlushCompletionTracker(completedSemaphore, totalNumPages); - var localSegmentOffsets = new long[SegmentBufferSize]; + // We drop epoch protection to perform large scale writes to disk + bool epochDropped = false; + if (epoch.ThisInstanceProtected()) + { + epochDropped = true; + epoch.Suspend(); + } - for (long flushPage = startPage; flushPage < endPage; flushPage++) + try { - long flushPageAddress = flushPage << LogPageSizeBits; - var pageSize = PageSize; - if (flushPage == endPage - 1) - pageSize = (int)(endLogicalAddress - flushPageAddress); + int totalNumPages = (int)(endPage - startPage); + completedSemaphore = new SemaphoreSlim(0); + var flushCompletionTracker = new FlushCompletionTracker(completedSemaphore, totalNumPages); + var localSegmentOffsets = new long[SegmentBufferSize]; - var asyncResult = new PageAsyncFlushResult + for (long flushPage = startPage; flushPage < endPage; flushPage++) { - flushCompletionTracker = flushCompletionTracker, - page = flushPage, - fromAddress = flushPageAddress, - untilAddress = flushPageAddress + pageSize, - count = 1 - }; + long flushPageAddress = flushPage << LogPageSizeBits; + var pageSize = PageSize; + if (flushPage == endPage - 1) + pageSize = (int)(endLogicalAddress - flushPageAddress); - // Intended destination is flushPage - WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets, fuzzyStartLogicalAddress); + var asyncResult = new PageAsyncFlushResult + { + flushCompletionTracker = flushCompletionTracker, + page = flushPage, + fromAddress = flushPageAddress, + untilAddress = flushPageAddress + pageSize, + count = 1 + }; + + // Intended destination is flushPage + WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets, fuzzyStartLogicalAddress); + } + } + finally + { + if (epochDropped) + epoch.Resume(); } } diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index a1ccf8ce2..24e903960 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -291,7 +291,12 @@ protected override void WriteAsyncToDevice } try { - if (FlushedUntilAddress < (flushPage << LogPageSizeBits) + pageSize) + if (HeadAddress >= (flushPage << LogPageSizeBits) + pageSize) + { + // Requested page is unavailable in memory, ignore + callback(0, 0, asyncResult); + } + else { // We are writing to separate device, so use fresh segment offsets WriteAsync(flushPage, @@ -299,11 +304,6 @@ protected override void WriteAsyncToDevice (uint)pageSize, callback, asyncResult, device, objectLogDevice, flushPage, localSegmentOffsets, fuzzyStartLogicalAddress); } - else - { - // Requested page is already flushed to main log, ignore - callback(0, 0, asyncResult); - } } finally { @@ -315,6 +315,11 @@ protected override void WriteAsyncToDevice internal override void ClearPage(long page, int offset) { Array.Clear(values[page % BufferSize], offset / recordSize, values[page % BufferSize].Length - offset / recordSize); + } + + internal override void FreePage(long page) + { + ClearPage(page, 0); // Close segments var thisCloseSegment = page >> (LogSegmentSizeBits - LogPageSizeBits); @@ -325,11 +330,7 @@ internal override void ClearPage(long page, int offset) // We are clearing the last page in current segment segmentOffsets[thisCloseSegment % SegmentBufferSize] = 0; } - } - internal override void FreePage(long page) - { - ClearPage(page, 0); if (EmptyPageCount > 0) { overflowPagePool.TryAdd(values[page % BufferSize]); diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index a8a0f5c7e..81ec26d1b 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -362,7 +362,7 @@ public struct CommitPoint /// public struct HybridLogRecoveryInfo { - const int CheckpointVersion = 4; + const int CheckpointVersion = 5; /// /// Guid @@ -381,10 +381,14 @@ public struct HybridLogRecoveryInfo /// public long nextVersion; /// - /// Flushed logical address; indicates the latest immutable address on the main FASTER log at recovery time. + /// Flushed logical address; indicates the latest immutable address on the main FASTER log at checkpoint commit time. /// public long flushedLogicalAddress; /// + /// Flushed logical address at snapshot start; indicates device offset for snapshot file + /// + public long snapshotStartFlushedLogicalAddress; + /// /// Start logical address /// public long startLogicalAddress; @@ -454,6 +458,7 @@ public void Initialize(Guid token, long _version) useSnapshotFile = 0; version = _version; flushedLogicalAddress = 0; + snapshotStartFlushedLogicalAddress = 0; startLogicalAddress = 0; finalLogicalAddress = 0; snapshotFinalLogicalAddress = 0; @@ -476,6 +481,9 @@ public void Initialize(StreamReader reader) string value = reader.ReadLine(); var cversion = int.Parse(value); + if (cversion != CheckpointVersion) + throw new FasterException($"Invalid checkpoint version {cversion} encountered, current version is {CheckpointVersion}, cannot recover with this checkpoint"); + value = reader.ReadLine(); var checksum = long.Parse(value); @@ -494,6 +502,9 @@ public void Initialize(StreamReader reader) value = reader.ReadLine(); flushedLogicalAddress = long.Parse(value); + value = reader.ReadLine(); + snapshotStartFlushedLogicalAddress = long.Parse(value); + value = reader.ReadLine(); startLogicalAddress = long.Parse(value); @@ -556,9 +567,6 @@ public void Initialize(StreamReader reader) } } - if (cversion != CheckpointVersion) - throw new FasterException("Invalid version"); - if (checksum != Checksum(continueTokens.Count)) throw new FasterException("Invalid checksum for checkpoint"); } @@ -624,6 +632,7 @@ public byte[] ToByteArray() writer.WriteLine(version); writer.WriteLine(nextVersion); writer.WriteLine(flushedLogicalAddress); + writer.WriteLine(snapshotStartFlushedLogicalAddress); writer.WriteLine(startLogicalAddress); writer.WriteLine(finalLogicalAddress); writer.WriteLine(snapshotFinalLogicalAddress); @@ -662,7 +671,7 @@ private readonly long Checksum(int checkpointTokensCount) var bytes = guid.ToByteArray(); var long1 = BitConverter.ToInt64(bytes, 0); var long2 = BitConverter.ToInt64(bytes, 8); - return long1 ^ long2 ^ version ^ flushedLogicalAddress ^ startLogicalAddress ^ finalLogicalAddress ^ snapshotFinalLogicalAddress ^ headAddress ^ beginAddress + return long1 ^ long2 ^ version ^ flushedLogicalAddress ^ snapshotStartFlushedLogicalAddress ^ startLogicalAddress ^ finalLogicalAddress ^ snapshotFinalLogicalAddress ^ headAddress ^ beginAddress ^ checkpointTokensCount ^ (objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length); } @@ -676,6 +685,7 @@ public readonly void DebugPrint(ILogger logger) logger?.LogInformation("Next Version: {nextVersion}", nextVersion); logger?.LogInformation("Is Snapshot?: {useSnapshotFile}", useSnapshotFile == 1); logger?.LogInformation("Flushed LogicalAddress: {flushedLogicalAddress}", flushedLogicalAddress); + logger?.LogInformation("SnapshotStart Flushed LogicalAddress: {snapshotStartFlushedLogicalAddress}", snapshotStartFlushedLogicalAddress); logger?.LogInformation("Start Logical Address: {startLogicalAddress}", startLogicalAddress); logger?.LogInformation("Final Logical Address: {finalLogicalAddress}", finalLogicalAddress); logger?.LogInformation("Snapshot Final Logical Address: {snapshotFinalLogicalAddress}", snapshotFinalLogicalAddress); diff --git a/cs/src/core/Index/FASTER/Implementation/FindRecord.cs b/cs/src/core/Index/FASTER/Implementation/FindRecord.cs index 05fb0b90a..93cdb4c09 100644 --- a/cs/src/core/Index/FASTER/Implementation/FindRecord.cs +++ b/cs/src/core/Index/FASTER/Implementation/FindRecord.cs @@ -34,7 +34,7 @@ private bool TryFindRecordInMainLog(ref Key key, ref OperationStackContext recSrc, long minOffset, bool waitForTentative = true) { ref var recordInfo = ref hlog.GetInfo(recSrc.PhysicalAddress); - if (comparer.Equals(ref key, ref hlog.GetKey(recSrc.PhysicalAddress)) && !recordInfo.Invalid) + if (!recordInfo.Invalid && comparer.Equals(ref key, ref hlog.GetKey(recSrc.PhysicalAddress))) { if (!waitForTentative || SpinWaitWhileTentativeAndReturnValidity(ref recordInfo)) return recSrc.HasMainLogSrc = true; diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index f8779e73f..933c4e913 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -387,7 +387,7 @@ private long InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck // First recover from index starting point (fromAddress) to snapshot starting point (flushedLogicalAddress) RecoverHybridLog(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.Snapshot, options); // Then recover snapshot into mutable region - RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.flushedLogicalAddress, + RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.snapshotStartFlushedLogicalAddress, recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, options, recoveredHLCInfo.deltaLog, recoverTo); readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress; @@ -423,7 +423,7 @@ await RecoverHybridLogAsync(scanFromAddress, recoverFromAddress, recoveredHLCInf await RecoverHybridLogAsync(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.Snapshot, new RecoveryOptions(headAddress, recoveredHLCInfo.info.startLogicalAddress, undoNextVersion), cancellationToken).ConfigureAwait(false); // Then recover snapshot into mutable region - await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.flushedLogicalAddress, + await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.snapshotStartFlushedLogicalAddress, recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, options, recoveredHLCInfo.deltaLog, recoverTo, cancellationToken).ConfigureAwait(false); readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress; diff --git a/cs/src/core/Index/Synchronization/FasterStateMachine.cs b/cs/src/core/Index/Synchronization/FasterStateMachine.cs index 9ffe8b52d..0f876d60a 100644 --- a/cs/src/core/Index/Synchronization/FasterStateMachine.cs +++ b/cs/src/core/Index/Synchronization/FasterStateMachine.cs @@ -6,6 +6,7 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace FASTER.core { @@ -83,7 +84,7 @@ private bool StartStateMachine(ISynchronizationStateMachine stateMachine) GlobalStateMachineStep(systemState); return true; } - + // Atomic transition from expectedState -> nextState [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool MakeTransition(SystemState expectedState, SystemState nextState) @@ -91,6 +92,9 @@ private bool MakeTransition(SystemState expectedState, SystemState nextState) if (Interlocked.CompareExchange(ref systemState.Word, nextState.Word, expectedState.Word) != expectedState.Word) return false; Debug.WriteLine("Moved to {0}, {1}", nextState.Phase, nextState.Version); + logger?.LogTrace("Moved to {0}, {1}", + (nextState.Phase & Phase.INTERMEDIATE) == 0 ? nextState.Phase : "Intermediate (" + (nextState.Phase & ~Phase.INTERMEDIATE) + ")", + nextState.Version); return true; } diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs index 565841275..664cb28b2 100644 --- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -219,8 +219,8 @@ public override void GlobalBeforeEnteringState(SystemState next, Fas faster._hybridLogCheckpoint.snapshotFileDevice.Initialize(faster.hlog.GetSegmentSize()); faster._hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(-1); - faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress; - long startPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.flushedLogicalAddress); + faster._hybridLogCheckpoint.info.snapshotStartFlushedLogicalAddress = faster.hlog.FlushedUntilAddress; + long startPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.snapshotStartFlushedLogicalAddress); long endPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.finalLogicalAddress); if (faster._hybridLogCheckpoint.info.finalLogicalAddress > faster.hlog.GetStartLogicalAddress(endPage)) @@ -242,6 +242,8 @@ public override void GlobalBeforeEnteringState(SystemState next, Fas out faster._hybridLogCheckpoint.flushedSemaphore); break; case Phase.PERSISTENCE_CALLBACK: + // Set actual FlushedUntil to the latest possible data in main log that is on disk + faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress; base.GlobalBeforeEnteringState(next, faster); faster._lastSnapshotCheckpoint = faster._hybridLogCheckpoint.Transfer(); break; @@ -309,7 +311,7 @@ public override void GlobalBeforeEnteringState(SystemState next, Fas case Phase.WAIT_FLUSH: base.GlobalBeforeEnteringState(next, faster); faster._hybridLogCheckpoint.info.finalLogicalAddress = faster.hlog.GetTailAddress(); - faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress; + faster._hybridLogCheckpoint.info.snapshotStartFlushedLogicalAddress = faster.hlog.FlushedUntilAddress; if (faster._hybridLogCheckpoint.deltaLog == null) { @@ -319,14 +321,20 @@ public override void GlobalBeforeEnteringState(SystemState next, Fas faster._hybridLogCheckpoint.deltaLog.InitializeForWrites(faster.hlog.bufferPool); } + // We are writing delta records outside epoch protection, so callee should be able to + // handle corrupted or unexpected concurrent page changes during the flush, e.g., by + // resuming epoch protection if necessary. Correctness is not affected as we will + // only read safe pages during recovery. faster.hlog.AsyncFlushDeltaToDevice( - faster._hybridLogCheckpoint.info.flushedLogicalAddress, + faster._hybridLogCheckpoint.info.snapshotStartFlushedLogicalAddress, faster._hybridLogCheckpoint.info.finalLogicalAddress, faster._lastSnapshotCheckpoint.info.finalLogicalAddress, faster._hybridLogCheckpoint.prevVersion, faster._hybridLogCheckpoint.deltaLog); break; case Phase.PERSISTENCE_CALLBACK: + // Set actual FlushedUntil to the latest possible data in main log that is on disk + faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress; CollectMetadata(next, faster); faster.WriteHybridLogIncrementalMetaInfo(faster._hybridLogCheckpoint.deltaLog); faster._hybridLogCheckpoint.info.deltaTailAddress = faster._hybridLogCheckpoint.deltaLog.TailAddress;