From b4f1b5e1dd97c9dbd20e7caf8a6dcd3911ca7fd8 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 22 Oct 2021 00:08:37 -0700 Subject: [PATCH 1/8] Removed WAIT_PENDING --- cs/src/core/Async/CompletePendingAsync.cs | 4 +- cs/src/core/Index/FASTER/FASTERImpl.cs | 49 -------------- cs/src/core/Index/FASTER/FASTERThread.cs | 2 +- .../FullCheckpointStateMachine.cs | 2 +- .../HybridLogCheckpointTask.cs | 2 +- .../Index/Synchronization/StateTransitions.cs | 3 - .../VersionChangeStateMachine.cs | 21 ------ cs/test/StateMachineTests.cs | 66 ++++++++----------- 8 files changed, 32 insertions(+), 117 deletions(-) diff --git a/cs/src/core/Async/CompletePendingAsync.cs b/cs/src/core/Async/CompletePendingAsync.cs index 2170ae906..7c779af2b 100644 --- a/cs/src/core/Async/CompletePendingAsync.cs +++ b/cs/src/core/Async/CompletePendingAsync.cs @@ -24,7 +24,7 @@ internal async ValueTask ReadyToCompletePendingAsync(Fas #region Previous pending requests if (!RelaxedCPR) { - if (currentCtx.phase == Phase.IN_PROGRESS || currentCtx.phase == Phase.WAIT_PENDING) + if (currentCtx.phase == Phase.IN_PROGRESS) { if (currentCtx.prevCtx.SyncIoPendingCount != 0) await currentCtx.prevCtx.readyResponses.WaitForEntryAsync(token).ConfigureAwait(false); @@ -52,7 +52,7 @@ internal async ValueTask CompletePendingAsync(IFasterSes #region Previous pending requests if (!RelaxedCPR) { - if (currentCtx.phase == Phase.IN_PROGRESS || currentCtx.phase == Phase.WAIT_PENDING) + if (currentCtx.phase == Phase.IN_PROGRESS) { fasterSession.UnsafeResumeThread(); try diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 33f59592f..dbc234bda 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -519,22 +519,6 @@ private LatchDestination AcquireLatchUpsert(FasterExecut } break; // Normal Processing } - case Phase.WAIT_PENDING: - { - if (!CheckEntryVersionNew(logicalAddress, sessionCtx)) - { - if (HashBucket.NoSharedLatches(bucket)) - { - return LatchDestination.CreateNewRecord; // Create a (v+1) record - } - else - { - status = OperationStatus.RETRY_LATER; - return LatchDestination.CreatePendingContext; // Go Pending - } - } - break; // Normal Processing - } case Phase.WAIT_FLUSH: { if (!CheckEntryVersionNew(logicalAddress, sessionCtx)) @@ -878,23 +862,6 @@ private LatchDestination AcquireLatchRMW(PendingContext< } break; // Normal Processing } - case Phase.WAIT_PENDING: - { - if (!CheckEntryVersionNew(logicalAddress, sessionCtx)) - { - if (HashBucket.NoSharedLatches(bucket)) - { - if (logicalAddress >= hlog.HeadAddress) - return LatchDestination.CreateNewRecord; // Create a (v+1) record - } - else - { - status = OperationStatus.RETRY_LATER; - return LatchDestination.CreatePendingContext; // Go Pending - } - } - break; // Normal Processing - } case Phase.WAIT_FLUSH: { if (!CheckEntryVersionNew(logicalAddress, sessionCtx)) @@ -1136,22 +1103,6 @@ internal OperationStatus InternalDelete( } break; // Normal Processing } - case Phase.WAIT_PENDING: - { - if (!CheckEntryVersionNew(logicalAddress, sessionCtx)) - { - if (HashBucket.NoSharedLatches(bucket)) - { - goto CreateNewRecord; // Create a (v+1) record - } - else - { - status = OperationStatus.RETRY_LATER; - goto CreatePendingContext; // Go Pending - } - } - break; // Normal Processing - } case Phase.WAIT_FLUSH: { if (!CheckEntryVersionNew(logicalAddress, sessionCtx)) diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 12e3eccba..5f52f8316 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -153,7 +153,7 @@ internal bool InternalCompletePending( #region Previous pending requests if (!RelaxedCPR) { - if (ctx.phase == Phase.IN_PROGRESS || ctx.phase == Phase.WAIT_PENDING) + if (ctx.phase == Phase.IN_PROGRESS) { InternalCompletePendingRequests(ctx.prevCtx, ctx, fasterSession, completedOutputs); InternalCompleteRetryRequests(ctx.prevCtx, ctx, fasterSession); diff --git a/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs index 942c7ad2e..e8abfd1ab 100644 --- a/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs +++ b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs @@ -87,7 +87,7 @@ public override SystemState NextState(SystemState start) case Phase.PREP_INDEX_CHECKPOINT: result.Phase = Phase.PREPARE; break; - case Phase.WAIT_PENDING: + case Phase.IN_PROGRESS: result.Phase = Phase.WAIT_INDEX_CHECKPOINT; break; case Phase.WAIT_INDEX_CHECKPOINT: diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs index 8e50fee92..2238cf943 100644 --- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -384,7 +384,7 @@ public override SystemState NextState(SystemState start) var result = SystemState.Copy(ref start); switch (start.Phase) { - case Phase.WAIT_PENDING: + case Phase.IN_PROGRESS: result.Phase = Phase.WAIT_FLUSH; break; case Phase.WAIT_FLUSH: diff --git a/cs/src/core/Index/Synchronization/StateTransitions.cs b/cs/src/core/Index/Synchronization/StateTransitions.cs index ace3af510..9e1803979 100644 --- a/cs/src/core/Index/Synchronization/StateTransitions.cs +++ b/cs/src/core/Index/Synchronization/StateTransitions.cs @@ -28,9 +28,6 @@ public enum Phase : int { /// In-progress phase, entering (v+1) version IN_PROGRESS, - /// Wait-pending phase, waiting for pending (v) operations to complete - WAIT_PENDING, - /// Wait for an index checkpoint to finish WAIT_INDEX_CHECKPOINT, diff --git a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs index f986684b7..5c36e360e 100644 --- a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs +++ b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs @@ -80,23 +80,6 @@ public void OnThreadState( if (faster.epoch.CheckIsComplete(EpochPhaseIdx.InProgress, current.Version)) faster.GlobalStateMachineStep(current); break; - case Phase.WAIT_PENDING: - if (ctx != null) - { - if (!faster.RelaxedCPR && !ctx.prevCtx.markers[EpochPhaseIdx.WaitPending]) - { - if (ctx.prevCtx.HasNoPendingRequests) - ctx.prevCtx.markers[EpochPhaseIdx.WaitPending] = true; - else - break; - } - - faster.epoch.Mark(EpochPhaseIdx.WaitPending, current.Version); - } - - if (faster.epoch.CheckIsComplete(EpochPhaseIdx.WaitPending, current.Version)) - faster.GlobalStateMachineStep(current); - break; case Phase.REST: break; } @@ -184,10 +167,6 @@ public override SystemState NextState(SystemState start) nextState.Version = (int) ToVersion(); break; case Phase.IN_PROGRESS: - // This phase has no effect if using relaxed CPR model - nextState.Phase = Phase.WAIT_PENDING; - break; - case Phase.WAIT_PENDING: nextState.Phase = Phase.REST; break; default: diff --git a/cs/test/StateMachineTests.cs b/cs/test/StateMachineTests.cs index 72f109a16..05ae8130f 100644 --- a/cs/test/StateMachineTests.cs +++ b/cs/test/StateMachineTests.cs @@ -73,20 +73,9 @@ public void StateMachineTest1() s2.Refresh(); - // We should be in WAIT_PENDING, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_PENDING, 2), fht1.SystemState)); - - s1.Refresh(); - // We should be in WAIT_FLUSH, 2 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_FLUSH, 2), fht1.SystemState)); - s2.Refresh(); - - - // We should be in PERSISTENCE_CALLBACK, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, 2), fht1.SystemState)); - // Expect checkpoint completion callback f.checkpointCallbackExpectation = 1; @@ -95,6 +84,12 @@ public void StateMachineTest1() // Completion callback should have been called once Assert.AreEqual(0, f.checkpointCallbackExpectation); + + // We should be in PERSISTENCE_CALLBACK, 2 + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, 2), fht1.SystemState)); + + s2.Refresh(); + // We should be in REST, 2 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 2), fht1.SystemState)); @@ -256,13 +251,14 @@ public void StateMachineTest5() s1.Refresh(); - // We should be in WAIT_PENDING, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_PENDING, 2), fht1.SystemState)); + // We should be in WAIT_FLUSH, 2 + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_FLUSH, 2), fht1.SystemState)); + s2.Refresh(); - // We should be in WAIT_FLUSH, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_FLUSH, 2), fht1.SystemState)); + // We should be in PERSISTENCE_CALLBACK, 2 + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, 2), fht1.SystemState)); // Expect checkpoint completion callback f.checkpointCallbackExpectation = 1; @@ -272,8 +268,8 @@ public void StateMachineTest5() // Completion callback should have been called once Assert.AreEqual(0, f.checkpointCallbackExpectation); - // We should be in PERSISTENCE_CALLBACK, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, 2), fht1.SystemState)); + // We should be in REST, 2 + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 2), fht1.SystemState)); // No callback here since already done s1.Refresh(); @@ -370,22 +366,10 @@ public void StateMachineCallbackTest1() s2.Refresh(); - // We should be in WAIT_PENDING, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_PENDING, 2), fht1.SystemState)); - callback.CheckInvoked(fht1.SystemState); - - s1.Refresh(); - // We should be in WAIT_FLUSH, 2 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_FLUSH, 2), fht1.SystemState)); callback.CheckInvoked(fht1.SystemState); - s2.Refresh(); - - // We should be in PERSISTENCE_CALLBACK, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, 2), fht1.SystemState)); - callback.CheckInvoked(fht1.SystemState); - // Expect checkpoint completion callback f.checkpointCallbackExpectation = 1; @@ -394,6 +378,12 @@ public void StateMachineCallbackTest1() // Completion callback should have been called once Assert.IsTrue(f.checkpointCallbackExpectation == 0); + // We should be in PERSISTENCE_CALLBACK, 2 + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, 2), fht1.SystemState)); + callback.CheckInvoked(fht1.SystemState); + + s2.Refresh(); + // We should be in REST, 2 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, 2), fht1.SystemState)); callback.CheckInvoked(fht1.SystemState); @@ -426,24 +416,22 @@ public void VersionChangeRollOverTest() s2.Refresh(); - // We should be in WAIT_PENDING, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_PENDING, toVersion + 1), fht1.SystemState)); - - s1.Refresh(); - // We should be in WAIT_FLUSH, 2 Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_FLUSH, toVersion + 1), fht1.SystemState)); - s2.Refresh(); - - // We should be in PERSISTENCE_CALLBACK, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, toVersion + 1), fht1.SystemState)); - // Expect checkpoint completion callback f.checkpointCallbackExpectation = 1; s1.Refresh(); + // Completion callback should have been called once + Assert.IsTrue(f.checkpointCallbackExpectation == 0); + + // We should be in PERSISTENCE_CALLBACK, 2 + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, toVersion + 1), fht1.SystemState)); + + s2.Refresh(); + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, toVersion + 1), fht1.SystemState)); From 23c9618cc97f6276083be2753f3b29a03c3f4c83 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 29 Oct 2021 15:59:00 -0700 Subject: [PATCH 2/8] * Use only 1 bit for newVersion in RecordInfo, for CPR. Semantics are that sessions in v+1 will create records in fuzzy region with this bit set. Recovery only scans fuzzy region to elide such records. * Add 1 Filler bit for future use with fillers in records * Increase read lock bits to 6 (64 parallel readers) * Use dirty bit for incremental checkpoints --- cs/samples/ReadAddress/VersionedReadApp.cs | 12 +- cs/src/core/Allocator/AllocatorBase.cs | 63 +++++++++-- cs/src/core/Allocator/GenericAllocator.cs | 7 +- cs/src/core/ClientSession/ClientSession.cs | 12 +- cs/src/core/Index/Common/Contexts.cs | 2 + cs/src/core/Index/Common/RecordInfo.cs | 70 +++++++----- cs/src/core/Index/FASTER/FASTERImpl.cs | 107 ++++++++---------- cs/src/core/Index/Recovery/Recovery.cs | 4 +- .../VersionChangeStateMachine.cs | 6 - cs/src/core/Utilities/PageAsyncResultTypes.cs | 18 +++ cs/test/ReadAddressTests.cs | 24 ++-- cs/test/StateMachineTests.cs | 12 +- 12 files changed, 196 insertions(+), 141 deletions(-) diff --git a/cs/samples/ReadAddress/VersionedReadApp.cs b/cs/samples/ReadAddress/VersionedReadApp.cs index 4e7d1f933..a747fa715 100644 --- a/cs/samples/ReadAddress/VersionedReadApp.cs +++ b/cs/samples/ReadAddress/VersionedReadApp.cs @@ -152,7 +152,6 @@ private static void ScanStore(FasterKV store, int keyValue) var input = default(Value); var key = new Key(keyValue); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = 9; /* tested in loop */; --lap) { var status = session.Read(ref key, ref input, ref output, ref recordMetadata, serialNo: maxLap + 1); @@ -169,7 +168,7 @@ private static void ScanStore(FasterKV store, int keyValue) } } - if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output, ref version)) + if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output)) break; } } @@ -184,27 +183,24 @@ private static async Task ScanStoreAsync(FasterKV store, int keyValu var input = default(Value); var key = new Key(keyValue); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = 9; /* tested in loop */; --lap) { var readAsyncResult = await session.ReadAsync(ref key, ref input, recordMetadata.RecordInfo.PreviousAddress, default, serialNo: maxLap + 1, cancellationToken: cancellationToken); cancellationToken.ThrowIfCancellationRequested(); var (status, output) = readAsyncResult.Complete(out recordMetadata); - if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output, ref version)) + if (!ProcessRecord(store, status, recordMetadata.RecordInfo, lap, ref output)) break; } } - private static bool ProcessRecord(FasterKV store, Status status, RecordInfo recordInfo, int lap, ref Value output, ref int previousVersion) + private static bool ProcessRecord(FasterKV store, Status status, RecordInfo recordInfo, int lap, ref Value output) { Debug.Assert((status == Status.NOTFOUND) == recordInfo.Tombstone); Debug.Assert((lap == deleteLap) == recordInfo.Tombstone); var value = recordInfo.Tombstone ? "" : output.value.ToString(); - Debug.Assert(previousVersion >= recordInfo.Version); - Console.WriteLine($" {value}; Version = {recordInfo.Version}; PrevAddress: {recordInfo.PreviousAddress}"); + Console.WriteLine($" {value}; PrevAddress: {recordInfo.PreviousAddress}"); // Check for end of loop - previousVersion = recordInfo.Version; return recordInfo.PreviousAddress >= store.Log.BeginAddress; } } diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 37f745f50..03ff3dc80 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -446,8 +446,9 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end { ref var info = ref GetInfo(physicalAddress); var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress); - if (info.Version == RecordInfo.GetShortVersion(version)) + if (info.Dirty) { + info.DirtyAtomic = false; // there may be read locks being taken, hence atomic int size = sizeof(long) + sizeof(int) + alignedRecordSize; if (destOffset + size > entryLength) { @@ -457,9 +458,9 @@ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long end if (destOffset + size > entryLength) throw new FasterException("Insufficient page size to write delta"); } - *((long*)(destPhysicalAddress + destOffset)) = logicalAddress; + *(long*)(destPhysicalAddress + destOffset) = logicalAddress; destOffset += sizeof(long); - *((int*)(destPhysicalAddress + destOffset)) = alignedRecordSize; + *(int*)(destPhysicalAddress + destOffset) = alignedRecordSize; destOffset += sizeof(int); Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize); destOffset += alignedRecordSize; @@ -1810,7 +1811,7 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica { int totalNumPages = (int)(endPage - startPage); completedSemaphore = new SemaphoreSlim(0); - var asyncResult = new PageAsyncFlushResult + var flushCompletionTracker = new FlushCompletionTracker { completedSemaphore = completedSemaphore, count = totalNumPages @@ -1819,11 +1820,18 @@ public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogica for (long flushPage = startPage; flushPage < endPage; flushPage++) { - + long flushPageAddress = flushPage << LogPageSizeBits; var pageSize = PageSize; - if (flushPage == endPage - 1) - pageSize = (int)(endLogicalAddress - (flushPage << LogPageSizeBits)); + pageSize = (int)(endLogicalAddress - flushPageAddress); + + var asyncResult = new PageAsyncFlushResult + { + flushCompletionTracker = flushCompletionTracker, + page = flushPage, + fromAddress = flushPageAddress, + untilAddress = flushPageAddress + pageSize, + }; // Intended destination is flushPage WriteAsyncToDevice(startPage, flushPage, pageSize, AsyncFlushPageToDeviceCallback, asyncResult, device, objectLogDevice, localSegmentOffsets); @@ -1977,7 +1985,46 @@ protected void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, obj } PageAsyncFlushResult result = (PageAsyncFlushResult)context; - if (Interlocked.Decrement(ref result.count) == 0) + + // Unset dirty bit for flushed pages + try + { + epoch.Resume(); + + var startAddress = result.page << LogPageSizeBits; + var endAddress = startAddress + PageSize; + + if (result.fromAddress > startAddress) + startAddress = result.fromAddress; + var _readOnlyAddress = SafeReadOnlyAddress; + if (_readOnlyAddress > startAddress) + startAddress = _readOnlyAddress; + + if (result.untilAddress < endAddress) + endAddress = result.untilAddress; + int flushWidth = (int)(endAddress - startAddress); + + if (flushWidth > 0) + { + var physicalAddress = GetPhysicalAddress(startAddress); + var endPhysicalAddress = physicalAddress + flushWidth; + + while (physicalAddress < endPhysicalAddress) + { + ref var info = ref GetInfo(physicalAddress); + var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress); + if (info.Dirty) + info.DirtyAtomic = false; // there may be read locks being taken, hence atomic + physicalAddress += alignedRecordSize; + } + } + } + finally + { + epoch.Suspend(); + } + + if (Interlocked.Decrement(ref result.flushCompletionTracker.count) == 0) { result.Free(); } diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 9bb584d7a..ca33e799c 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -448,8 +448,6 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres var _objBuffer = bufferPool.Get(memoryStreamLength); - asyncResult.done = new AutoResetEvent(false); - var _alignedLength = (memoryStreamLength + (sectorSize - 1)) & ~(sectorSize - 1); var _objAddr = Interlocked.Add(ref localSegmentOffsets[(long)(alignedDestinationAddress >> LogSegmentSizeBits) % SegmentBufferSize], _alignedLength) - _alignedLength; @@ -479,6 +477,8 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres // Reset address list for next chunk addr = new List(); + asyncResult.done = new AutoResetEvent(false); + objlogDevice.WriteAsync( (IntPtr)_objBuffer.aligned_pointer, (int)(alignedDestinationAddress >> LogSegmentSizeBits), @@ -493,6 +493,9 @@ private void WriteAsync(long flushPage, ulong alignedDestinationAddres // need to write both page and object cache Interlocked.Increment(ref asyncResult.count); + if (asyncResult.flushCompletionTracker != null) + Interlocked.Increment(ref asyncResult.flushCompletionTracker.count); + asyncResult.freeBuffer2 = _objBuffer; objlogDevice.WriteAsync( (IntPtr)_objBuffer.aligned_pointer, diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 24fe1a08e..5e9520a55 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -1198,7 +1198,6 @@ public void PostSingleWriter(ref Key key, ref Input input, ref Value src, ref Va [MethodImpl(MethodImplOptions.AggressiveInlining)] private void PostSingleWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { - recordInfo.Version = _clientSession.ctx.version; _clientSession.functions.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); } @@ -1225,7 +1224,7 @@ public bool ConcurrentWriter(ref Key key, ref Input input, ref Value src, ref Va [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool ConcurrentWriterNoLock(ref Key key, ref Input input, ref Value src, ref Value dst, ref Output output, ref RecordInfo recordInfo, long address) { - recordInfo.Version = _clientSession.ctx.version; + recordInfo.SetDirty(); // Note: KeyIndexes do not need notification of in-place updates because the key does not change. return _clientSession.functions.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref recordInfo, address); } @@ -1287,7 +1286,6 @@ public void PostInitialUpdater(ref Key key, ref Input input, ref Value value, re [MethodImpl(MethodImplOptions.AggressiveInlining)] private void PostInitialUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) { - recordInfo.Version = _clientSession.ctx.version; _clientSession.functions.PostInitialUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); } @@ -1343,7 +1341,6 @@ public bool PostCopyUpdater(ref Key key, ref Input input, ref Value oldValue, re [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value oldValue, ref Value newValue, ref RecordInfo recordInfo, long address) { - recordInfo.Version = _clientSession.ctx.version; return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address); } @@ -1373,7 +1370,7 @@ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Ou [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { - recordInfo.Version = _clientSession.ctx.version; + recordInfo.SetDirty(); // Note: KeyIndexes do not need notification of in-place updates because the key does not change. return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); } @@ -1406,7 +1403,6 @@ public void PostSingleDeleter(ref Key key, ref RecordInfo recordInfo, long addre return; // There is no value to lock here, so we take a RecordInfo lock in InternalDelete and release it here. - recordInfo.Version = _clientSession.ctx.version; _clientSession.functions.PostSingleDeleter(ref key, ref recordInfo, address); if (this.SupportsLocking) recordInfo.UnlockExclusive(); @@ -1421,8 +1417,8 @@ public bool ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recor [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) { - recordInfo.Version = _clientSession.ctx.version; - recordInfo.Tombstone = true; + recordInfo.SetDirty(); + recordInfo.SetTombstone(); return _clientSession.functions.ConcurrentDeleter(ref key, ref value, ref recordInfo, address); } diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 37e9cde55..a1fe55dbd 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -221,6 +221,8 @@ public async ValueTask WaitPendingAsync(CancellationToken token = default) await readyResponses.WaitForEntryAsync(token).ConfigureAwait(false); } + public bool NewVersion => phase < Phase.REST; + public FasterExecutionContext prevCtx; } } diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index c8a3173e7..a83b6b348 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -10,8 +10,8 @@ namespace FASTER.core { // RecordInfo layout (64 bits total): - // [VVVVV][Dirty][Stub][Sealed] [Valid][Tombstone][X][SSSSS] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] - // where V = version, X = exclusive lock, S = shared lock, A = address + // [--][CPR][Filler][Dirty][Stub][Sealed] [Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] + // where V = version, X = exclusive lock, S = shared lock, A = address, R = readcache, - = unused [StructLayout(LayoutKind.Explicit, Size = 8)] public struct RecordInfo { @@ -25,8 +25,8 @@ public struct RecordInfo // Shift position of lock in word const int kLockShiftInWord = kPreviousAddressBits; - // We use 6 lock bits: 5 shared lock bits + 1 exclusive lock bit - const int kSharedLockBits = 5; + // We use 7 lock bits: 6 shared lock bits + 1 exclusive lock bit + const int kSharedLockBits = 6; const int kExlcusiveLockBits = 1; // Shared lock constants @@ -43,33 +43,28 @@ public struct RecordInfo const int kStubBitOffset = kValidBitOffset + 1; const int kSealedBitOffset = kStubBitOffset + 1; const int kDirtyBitOffset = kSealedBitOffset + 1; + const int kFillerBitOffset = kDirtyBitOffset + 1; + const int kNewVersionBitOffset = kFillerBitOffset + 1; const long kTombstoneBitMask = 1L << kTombstoneBitOffset; const long kValidBitMask = 1L << kValidBitOffset; const long kStubBitMask = 1L << kStubBitOffset; const long kSealedBitMask = 1L << kSealedBitOffset; const long kDirtyBitMask = 1L << kDirtyBitOffset; - - // Shift position of version in word - const int kVersionShiftInWord = kDirtyBitOffset + 1; - - // We use the remaining bits (64 - 59 = 5) as a short version for record - const int kVersionBits = kTotalBits - kVersionShiftInWord; - - // Version constants - const long kVersionMaskInWord = ((1L << kVersionBits) - 1) << kVersionShiftInWord; - internal const long kVersionMaskInInteger = (1L << kVersionBits) - 1; + const long kFillerBitMask = 1L << kFillerBitOffset; + const long kNewVersionBitMask = 1L << kNewVersionBitOffset; [FieldOffset(0)] private long word; - public static void WriteInfo(ref RecordInfo info, int checkpointVersion, bool tombstone, bool invalidBit, long previousAddress) + public static void WriteInfo(ref RecordInfo info, bool newVersion, bool tombstone, bool dirty, long previousAddress) { info.word = default; info.Tombstone = tombstone; - info.Invalid = invalidBit; + info.SetValid(); + info.Dirty = dirty; info.PreviousAddress = previousAddress; - info.Version = checkpointVersion; + info.NewVersion = newVersion; } /// @@ -218,6 +213,21 @@ public bool Sealed } } + public bool DirtyAtomic + { + set + { + while (true) + { + long expected_word = word; + long new_word = value ? (word | kDirtyBitMask) : (word & ~kDirtyBitMask); + if (expected_word == Interlocked.CompareExchange(ref word, new_word, expected_word)) + break; + Thread.Yield(); + } + } + } + public bool Dirty { get => (word & kDirtyBitMask) > 0; @@ -228,26 +238,33 @@ public bool Dirty } } - public bool Invalid + public bool Filler { - get => !((word & kValidBitMask) > 0); + get => (word & kFillerBitMask) > 0; set { - if (value) word &= ~kValidBitMask; - else word |= kValidBitMask; + if (value) word |= kFillerBitMask; + else word &= ~kFillerBitMask; } } - public int Version + public bool NewVersion { - get => (int)((word & kVersionMaskInWord) >> kVersionShiftInWord); + get => (word & kNewVersionBitMask) > 0; set { - word &= ~kVersionMaskInWord; - word |= (value & kVersionMaskInInteger) << kVersionShiftInWord; + if (value) word |= kNewVersionBitMask; + else word &= ~kNewVersionBitMask; } } + public void SetDirty() => word |= kDirtyBitMask; + public void SetTombstone() => word |= kTombstoneBitMask; + public void SetValid() => word |= kValidBitMask; + public void SetInvalid() => word &= ~kValidBitMask; + + public bool Invalid => (word & kValidBitMask) == 0; + public long PreviousAddress { get => word & kPreviousAddressMaskInWord; @@ -263,9 +280,6 @@ public static int GetLength() { return kTotalSizeInBytes; } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int GetShortVersion(long version) => (int) (version & kVersionMaskInInteger); public override string ToString() => word.ToString(); } diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 4c3d1da06..453023905 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -28,26 +28,38 @@ private bool CheckEntryVersionNew(long logicalAddress, F { HashBucketEntry entry = default; entry.word = logicalAddress; - return CheckBucketVersionNew(ref entry, sessionCtx); + return CheckBucketVersionNew(ref entry); } /// /// Check the version of the passed-in entry. /// The semantics of this function are to check the tail of a bucket (indicated by entry), so we name it this way. /// - /// - /// - /// /// the last entry of a bucket - /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool CheckBucketVersionNew(ref HashBucketEntry entry, FasterExecutionContext sessionCtx) + private bool CheckBucketVersionNew(ref HashBucketEntry entry) { // A version shift can only in an address after the checkpoint starts, as v_new threads RCU entries to the tail. if (entry.Address < _hybridLogCheckpoint.info.startLogicalAddress) return false; + // Otherwise, check if the version suffix of the entry matches v_new. - return GetLatestRecordVersion(ref entry, sessionCtx.version) == RecordInfo.GetShortVersion(currentSyncStateMachine.ToVersion()); + if (UseReadCache && entry.ReadCache) + { + var _addr = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask); + if ((entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress) + return readcache.GetInfo(_addr).NewVersion; + else + return false; + } + else + { + var _addr = hlog.GetPhysicalAddress(entry.Address); + if (entry.Address >= hlog.HeadAddress) + return hlog.GetInfo(_addr).NewVersion; + else + return false; + } } internal enum LatchOperation : byte @@ -146,11 +158,10 @@ internal OperationStatus InternalRead( } else if (ReadFromCache(ref key, ref logicalAddress, ref physicalAddress)) { - if (sessionCtx.phase == Phase.PREPARE && CheckBucketVersionNew(ref entry, sessionCtx)) - { - status = OperationStatus.CPR_SHIFT_DETECTED; - goto CreatePendingContext; // Pivot thread - } + // When session is in PREPARE phase, a read-cache record cannot be new-version. + // This is because a new-version record insertion would have elided the read-cache entry. + // and before the new-version record can go to disk become eligible to enter the read-cache, + // the PREPARE phase for that session will be over due to an epoch refresh. // This is not called when looking up by address, so we do not set pendingContext.recordInfo. // ReadCache addresses are not valid for indexing etc. so pass kInvalidAddress. @@ -188,7 +199,7 @@ internal OperationStatus InternalRead( } #endregion - if (sessionCtx.phase == Phase.PREPARE && CheckBucketVersionNew(ref entry, sessionCtx)) + if (sessionCtx.phase == Phase.PREPARE && CheckBucketVersionNew(ref entry)) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreatePendingContext; // Pivot thread @@ -498,7 +509,7 @@ private LatchDestination AcquireLatchUpsert(FasterExecut // rather than the traced record (logicalAddress), because I'm worried that the implementation // may not allow in-place updates for version v when the bucket arrives v+1. // This is safer but potentially unnecessary. - if (CheckBucketVersionNew(ref entry, sessionCtx)) + if (CheckBucketVersionNew(ref entry)) { status = OperationStatus.CPR_SHIFT_DETECTED; return LatchDestination.CreatePendingContext; // Pivot Thread @@ -555,8 +566,8 @@ private OperationStatus CreateNewRecordUpsert( ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); if (!recordInfo.Tombstone) { - Debug.Assert(!UseFoldOverCheckpoint || recordInfo.Version == sessionCtx.version); - if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress)) { if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version); @@ -846,7 +855,7 @@ private LatchDestination AcquireLatchRMW(PendingContext< { // Set to release shared latch (default) latchOperation = LatchOperation.Shared; - if (CheckBucketVersionNew(ref entry, sessionCtx)) + if (CheckBucketVersionNew(ref entry)) { status = OperationStatus.CPR_SHIFT_DETECTED; return LatchDestination.CreatePendingContext; // Pivot Thread @@ -920,8 +929,9 @@ private OperationStatus CreateNewRecordRMW( { // Set to release shared latch (default) latchOperation = LatchOperation.Shared; - if (CheckBucketVersionNew(ref entry, sessionCtx)) + if (CheckBucketVersionNew(ref entry)) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreatePendingContext; // Pivot Thread @@ -1197,7 +1207,8 @@ internal OperationStatus InternalDelete( var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); ref RecordInfo recordInfo = ref hlog.GetInfo(newPhysicalAddress); RecordInfo.WriteInfo(ref recordInfo, - sessionCtx.version, tombstone:true, invalidBit:false, + newVersion: sessionCtx.NewVersion, + tombstone: true, dirty: true, latestLogicalAddress); hlog.Serialize(ref key, newPhysicalAddress); @@ -1229,7 +1240,7 @@ internal OperationStatus InternalDelete( else { recordInfo.UnlockExclusive(); - recordInfo.Invalid = true; + recordInfo.SetInvalid(); status = OperationStatus.RETRY_NOW; goto LatchRelease; } @@ -1375,7 +1386,6 @@ internal OperationStatus InternalContinuePendingRead= hlog.BeginAddress) { ref RecordInfo recordInfo = ref hlog.GetInfoFromBytePointer(request.record.GetValidPointer()); - Debug.Assert(recordInfo.Version <= ctx.version); if (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone) return OperationStatus.NOTFOUND; @@ -1538,8 +1548,9 @@ internal OperationStatus InternalContinuePendingRMW= readcache.HeadAddress) - return readcache.GetInfo(_addr).Version; - else - return defaultVersion; - } - else - { - var _addr = hlog.GetPhysicalAddress(entry.Address); - if (entry.Address >= hlog.HeadAddress) - return hlog.GetInfo(_addr).Version; - else - return defaultVersion; - } - } #endregion } } diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 9cf447e10..4f2a1f6b2 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -788,7 +788,7 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress, entry = default; FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress); - if (info.Version != RecordInfo.GetShortVersion(nextVersion) || !undoNextVersion) + if (!info.NewVersion || !undoNextVersion) { entry.Address = pageLogicalAddress + pointer; entry.Tag = tag; @@ -799,7 +799,7 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress, else { touched = true; - info.Invalid = true; + info.SetInvalid(); if (info.PreviousAddress < startRecoveryAddress) { entry.Address = info.PreviousAddress; diff --git a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs index 5c36e360e..86d7094fb 100644 --- a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs +++ b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs @@ -156,12 +156,6 @@ public override SystemState NextState(SystemState start) break; case Phase.PREPARE: nextState.Phase = Phase.IN_PROGRESS; - // FASTER records only store a few bits of version number, and we need to ensure that - // the next version is distinguishable from the last in those bits. - // If they are not distinguishable, simply increment target version to resolve this - if (((targetVersion - start.Version) & RecordInfo.kVersionMaskInInteger) == 0) - targetVersion++; - // TODO: Move to long for system state as well. SetToVersion(targetVersion == -1 ? start.Version + 1 : targetVersion); nextState.Version = (int) ToVersion(); diff --git a/cs/src/core/Utilities/PageAsyncResultTypes.cs b/cs/src/core/Utilities/PageAsyncResultTypes.cs index a25015cd5..658f74261 100644 --- a/cs/src/core/Utilities/PageAsyncResultTypes.cs +++ b/cs/src/core/Utilities/PageAsyncResultTypes.cs @@ -49,6 +49,22 @@ public void Free() } } + /// + /// Shared flush completion tracker, when bulk-flushing many pages + /// + public class FlushCompletionTracker + { + /// + /// Semaphore to set on flush completion + /// + public SemaphoreSlim completedSemaphore; + + /// + /// Number of pages being flushed + /// + public int count; + } + /// /// Page async flush result /// @@ -75,6 +91,7 @@ public sealed class PageAsyncFlushResult internal SectorAlignedMemory freeBuffer2; internal AutoResetEvent done; internal SemaphoreSlim completedSemaphore; + internal FlushCompletionTracker flushCompletionTracker; /// /// Free @@ -93,6 +110,7 @@ public void Free() } completedSemaphore?.Release(); + flushCompletionTracker?.completedSemaphore?.Release(); } } } diff --git a/cs/test/ReadAddressTests.cs b/cs/test/ReadAddressTests.cs index 69370f5bd..f630a0b0b 100644 --- a/cs/test/ReadAddressTests.cs +++ b/cs/test/ReadAddressTests.cs @@ -197,7 +197,7 @@ internal async Task Populate(bool useRMW, bool useAsync) await Flush(); } - internal bool ProcessChainRecord(Status status, RecordMetadata recordMetadata, int lap, ref Output actualOutput, ref int previousVersion) + internal bool ProcessChainRecord(Status status, RecordMetadata recordMetadata, int lap, ref Output actualOutput) { var recordInfo = recordMetadata.RecordInfo; Assert.GreaterOrEqual(lap, 0); @@ -205,13 +205,11 @@ internal bool ProcessChainRecord(Status status, RecordMetadata recordMetadata, i Assert.AreEqual(status == Status.NOTFOUND, recordInfo.Tombstone, $"status({status}) == NOTFOUND != Tombstone ({recordInfo.Tombstone})"); Assert.AreEqual(lap == deleteLap, recordInfo.Tombstone, $"lap({lap}) == deleteLap({deleteLap}) != Tombstone ({recordInfo.Tombstone})"); - Assert.GreaterOrEqual(previousVersion, recordInfo.Version); if (!recordInfo.Tombstone) Assert.AreEqual(expectedValue, actualOutput.value); // Check for end of loop - previousVersion = recordInfo.Version; - return recordInfo.PreviousAddress >= this.fkv.Log.BeginAddress; + return recordInfo.PreviousAddress >= fkv.Log.BeginAddress; } internal static void ProcessNoKeyRecord(Status status, ref Output actualOutput, int keyOrdinal) @@ -253,7 +251,6 @@ public void VersionedReadSyncTests(bool useReadCache, CopyReadsToTail copyReadsT var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -264,7 +261,7 @@ public void VersionedReadSyncTests(bool useReadCache, CopyReadsToTail copyReadsT session.CompletePendingWithOutputs(out var completedOutputs, wait: true); (status, output) = TestUtils.GetSinglePendingResult(completedOutputs, out recordMetadata); } - if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output, ref version)) + if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output)) break; } } @@ -287,13 +284,12 @@ public async Task VersionedReadAsyncTests(bool useReadCache, CopyReadsToTail cop var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = maxLap - 1; /* tested in loop */; --lap) { var readAsyncResult = await session.ReadAsync(ref key, ref input, recordMetadata.RecordInfo.PreviousAddress, default, serialNo: maxLap + 1); var (status, output) = readAsyncResult.Complete(out recordMetadata); - if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output, ref version)) + if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output)) break; } } @@ -317,7 +313,6 @@ public void ReadAtAddressSyncTests(bool useReadCache, CopyReadsToTail copyReadsT var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -330,7 +325,7 @@ public void ReadAtAddressSyncTests(bool useReadCache, CopyReadsToTail copyReadsT session.CompletePendingWithOutputs(out var completedOutputs, wait: true); (status, output) = TestUtils.GetSinglePendingResult(completedOutputs, out recordMetadata); } - if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output, ref version)) + if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output)) break; if (readAtAddress >= testStore.fkv.Log.BeginAddress) @@ -370,7 +365,6 @@ public async Task ReadAtAddressAsyncTests(bool useReadCache, CopyReadsToTail cop var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -378,7 +372,7 @@ public async Task ReadAtAddressAsyncTests(bool useReadCache, CopyReadsToTail cop var readAsyncResult = await session.ReadAsync(ref key, ref input, recordMetadata.RecordInfo.PreviousAddress, default, serialNo: maxLap + 1); var (status, output) = readAsyncResult.Complete(out recordMetadata); - if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output, ref version)) + if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output)) break; if (readAtAddress >= testStore.fkv.Log.BeginAddress) @@ -413,7 +407,6 @@ public async Task ReadAtAddressAsyncReadFlagsNoneTests(bool useReadCache, CopyRe var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -421,7 +414,7 @@ public async Task ReadAtAddressAsyncReadFlagsNoneTests(bool useReadCache, CopyRe var readAsyncResult = await session.ReadAsync(ref key, ref input, recordMetadata.RecordInfo.PreviousAddress, default, serialNo: maxLap + 1); var (status, output) = readAsyncResult.Complete(out recordMetadata); - if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output, ref version)) + if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output)) break; if (readAtAddress >= testStore.fkv.Log.BeginAddress) @@ -456,7 +449,6 @@ public async Task ReadAtAddressAsyncReadFlagsSkipCacheTests(bool useReadCache, C var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - int version = int.MaxValue; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -464,7 +456,7 @@ public async Task ReadAtAddressAsyncReadFlagsSkipCacheTests(bool useReadCache, C var readAsyncResult = await session.ReadAsync(ref key, ref input, recordMetadata.RecordInfo.PreviousAddress, default, serialNo: maxLap + 1); var (status, output) = readAsyncResult.Complete(out recordMetadata); - if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output, ref version)) + if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output)) break; if (readAtAddress >= testStore.fkv.Log.BeginAddress) diff --git a/cs/test/StateMachineTests.cs b/cs/test/StateMachineTests.cs index 05ae8130f..a499fa962 100644 --- a/cs/test/StateMachineTests.cs +++ b/cs/test/StateMachineTests.cs @@ -399,7 +399,7 @@ public void StateMachineCallbackTest1() [TestCase] [Category("FasterKV")] [Category("CheckpointRestore")] - public void VersionChangeRollOverTest() + public void VersionChangeTest() { var toVersion = 1 + (1 << 14); Prepare(out var f, out var s1, out var s2, toVersion); @@ -411,13 +411,13 @@ public void VersionChangeRollOverTest() s2.Refresh(); s1.Refresh(); - // We should now be in IN_PROGRESS, toVersion + 1 (because of rollover of 13 bit short version) - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.IN_PROGRESS, toVersion + 1), fht1.SystemState)); + // We should now be in IN_PROGRESS, toVersion + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.IN_PROGRESS, toVersion), fht1.SystemState)); s2.Refresh(); // We should be in WAIT_FLUSH, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_FLUSH, toVersion + 1), fht1.SystemState)); + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.WAIT_FLUSH, toVersion), fht1.SystemState)); // Expect checkpoint completion callback f.checkpointCallbackExpectation = 1; @@ -428,11 +428,11 @@ public void VersionChangeRollOverTest() Assert.IsTrue(f.checkpointCallbackExpectation == 0); // We should be in PERSISTENCE_CALLBACK, 2 - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, toVersion + 1), fht1.SystemState)); + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.PERSISTENCE_CALLBACK, toVersion), fht1.SystemState)); s2.Refresh(); - Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, toVersion + 1), fht1.SystemState)); + Assert.IsTrue(SystemState.Equal(SystemState.Make(Phase.REST, toVersion), fht1.SystemState)); // Dispose session s2; does not move state machine forward From 6dec39c6915dcc33b4f3d15ea7457beea986eeba Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 29 Oct 2021 16:05:51 -0700 Subject: [PATCH 3/8] fix ycsb to not delete data file if we are using --recover --- cs/benchmark/FasterSpanByteYcsbBenchmark.cs | 2 +- cs/benchmark/FasterYcsbBenchmark.cs | 2 +- cs/benchmark/TestLoader.cs | 8 ++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs index 0eeda11a3..6418ce38e 100644 --- a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs +++ b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs @@ -71,7 +71,7 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys for (int i = 0; i < 8; i++) input_[i].value = i; - device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: true); + device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.CheckpointRecoverStore, useIoCompletionPort: true); if (testLoader.Options.UseSmallMemoryLog) store = new FasterKV diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 9df09fc4d..0456978d8 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -68,7 +68,7 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade for (int i = 0; i < 8; i++) input_[i].value = i; - device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: true, useIoCompletionPort: true); + device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.CheckpointRecoverStore, useIoCompletionPort: true); if (testLoader.Options.ThreadCount >= 16) device.ThrottleLimit = testLoader.Options.ThreadCount * 12; diff --git a/cs/benchmark/TestLoader.cs b/cs/benchmark/TestLoader.cs index 72501a1bb..6f7284bbc 100644 --- a/cs/benchmark/TestLoader.cs +++ b/cs/benchmark/TestLoader.cs @@ -332,10 +332,14 @@ private static void LoadSyntheticData(string distribution, uin internal string BackupPath => $"{DataPath}/{this.Distribution}_{(this.Options.UseSyntheticData ? "synthetic" : "ycsb")}_{(this.Options.UseSmallData ? "2.5M_10M" : "250M_1000M")}"; + + internal bool CheckpointRecoverStore + => Options.BackupAndRestore && Options.PeriodicCheckpointMilliseconds <= 0; + internal bool MaybeRecoverStore(FasterKV store) { // Recover database for fast benchmark repeat runs. - if (this.Options.BackupAndRestore && this.Options.PeriodicCheckpointMilliseconds <= 0) + if (CheckpointRecoverStore) { if (this.Options.UseSmallData) { @@ -364,7 +368,7 @@ internal bool MaybeRecoverStore(FasterKV store) internal void MaybeCheckpointStore(FasterKV store) { // Checkpoint database for fast benchmark repeat runs. - if (this.Options.BackupAndRestore && this.Options.PeriodicCheckpointMilliseconds <= 0) + if (CheckpointRecoverStore) { Console.WriteLine($"Checkpointing FasterKV to {this.BackupPath} for fast restart"); var sw = Stopwatch.StartNew(); From 005d86f76e4f997333bd65bc466c5c8a82c520c8 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 29 Oct 2021 16:19:02 -0700 Subject: [PATCH 4/8] nits --- cs/benchmark/TestLoader.cs | 4 +--- cs/samples/ReadAddress/VersionedReadApp.cs | 2 +- cs/src/core/Index/Common/RecordInfo.cs | 4 ++-- cs/src/core/Index/Recovery/Checkpoint.cs | 7 +------ cs/src/core/Index/Recovery/Recovery.cs | 5 +---- .../core/Index/Synchronization/HybridLogCheckpointTask.cs | 2 +- .../Index/Synchronization/VersionChangeStateMachine.cs | 2 +- cs/test/GenericByteArrayTests.cs | 2 +- 8 files changed, 9 insertions(+), 19 deletions(-) diff --git a/cs/benchmark/TestLoader.cs b/cs/benchmark/TestLoader.cs index 6f7284bbc..69ec3a09b 100644 --- a/cs/benchmark/TestLoader.cs +++ b/cs/benchmark/TestLoader.cs @@ -10,8 +10,6 @@ using System.Runtime.InteropServices; using System.Threading; -#pragma warning disable CS0162 // Unreachable code detected -- when switching on YcsbConstants - namespace FASTER.benchmark { internal interface IKeySetter @@ -373,7 +371,7 @@ internal void MaybeCheckpointStore(FasterKV store) Console.WriteLine($"Checkpointing FasterKV to {this.BackupPath} for fast restart"); var sw = Stopwatch.StartNew(); store.TakeFullCheckpoint(out _, CheckpointType.Snapshot); - store.CompleteCheckpointAsync().GetAwaiter().GetResult(); + store.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult(); sw.Stop(); Console.WriteLine($" Completed checkpoint in {(double)sw.ElapsedMilliseconds / 1000:N3} seconds"); } diff --git a/cs/samples/ReadAddress/VersionedReadApp.cs b/cs/samples/ReadAddress/VersionedReadApp.cs index a747fa715..211fc2846 100644 --- a/cs/samples/ReadAddress/VersionedReadApp.cs +++ b/cs/samples/ReadAddress/VersionedReadApp.cs @@ -110,7 +110,7 @@ private async static Task PopulateStore(FasterKV store) using var s = store.For(new Functions()).NewSession(); Console.WriteLine($"Writing {numKeys} keys to FASTER", numKeys); - Stopwatch sw = new Stopwatch(); + Stopwatch sw = new(); sw.Start(); var prevLap = 0; for (int ii = 0; ii < numKeys; ii++) diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index a83b6b348..f12519bfa 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -10,8 +10,8 @@ namespace FASTER.core { // RecordInfo layout (64 bits total): - // [--][CPR][Filler][Dirty][Stub][Sealed] [Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] - // where V = version, X = exclusive lock, S = shared lock, A = address, R = readcache, - = unused + // [--][NewVersion][Filler][Dirty][Stub][Sealed] [Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] + // where X = exclusive lock, S = shared lock, R = readcache, A = address, - = unused [StructLayout(LayoutKind.Explicit, Size = 8)] public struct RecordInfo { diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs index 7409521f4..38f5ddaf6 100644 --- a/cs/src/core/Index/Recovery/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -1,10 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -#pragma warning disable 0162 - -//#define WAIT_FOR_INDEX_CHECKPOINT - using System; using System.Linq; using System.Text; @@ -36,8 +32,7 @@ internal static class EpochPhaseIdx public partial class FasterKV { - internal TaskCompletionSource checkpointTcs - = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + internal TaskCompletionSource checkpointTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); internal Guid _indexCheckpointToken; internal Guid _hybridLogCheckpointToken; diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 4f2a1f6b2..c52831e93 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -1,11 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -#pragma warning disable 0162 - using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; @@ -204,7 +201,7 @@ private void FindRecoveryInfo(long requestedVersion, out HybridLogCheckpointInfo } } - private bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryInfo recoveryInfo) + private static bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryInfo recoveryInfo) { var l1 = indexInfo.finalLogicalAddress; var l2 = recoveryInfo.finalLogicalAddress; diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs index 2238cf943..5917bac16 100644 --- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -48,7 +48,7 @@ public virtual void GlobalBeforeEnteringState(SystemState next, } } - protected void CollectMetadata(SystemState next, FasterKV faster) + protected static void CollectMetadata(SystemState next, FasterKV faster) { // Collect object log offsets only after flushes // are completed diff --git a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs index 86d7094fb..3ac7adde8 100644 --- a/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs +++ b/cs/src/core/Index/Synchronization/VersionChangeStateMachine.cs @@ -127,7 +127,7 @@ public void OnThreadState( /// internal class VersionChangeStateMachine : SynchronizationStateMachineBase { - private long targetVersion; + private readonly long targetVersion; /// /// Construct a new VersionChangeStateMachine with the given tasks. Does not load any tasks by default. diff --git a/cs/test/GenericByteArrayTests.cs b/cs/test/GenericByteArrayTests.cs index 8b74d52d5..1b90644c9 100644 --- a/cs/test/GenericByteArrayTests.cs +++ b/cs/test/GenericByteArrayTests.cs @@ -47,7 +47,7 @@ public void TearDown() TestUtils.DeleteDirectory(TestUtils.MethodTestDir); } - private byte[] GetByteArray(int i) + private static byte[] GetByteArray(int i) { return BitConverter.GetBytes(i); } From 7f159720c9cec9103c189591e46a4b16ad7cc2d7 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 29 Oct 2021 17:37:37 -0700 Subject: [PATCH 5/8] Fix epoch re-entry violation --- cs/src/core/Allocator/AllocatorBase.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 03ff3dc80..9697ee87e 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1987,10 +1987,15 @@ protected void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, obj PageAsyncFlushResult result = (PageAsyncFlushResult)context; // Unset dirty bit for flushed pages - try + bool epochTaken = false; + if (!epoch.ThisInstanceProtected()) { + epochTaken = true; epoch.Resume(); + } + try + { var startAddress = result.page << LogPageSizeBits; var endAddress = startAddress + PageSize; @@ -2021,7 +2026,8 @@ protected void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, obj } finally { - epoch.Suspend(); + if (epochTaken) + epoch.Suspend(); } if (Interlocked.Decrement(ref result.flushCompletionTracker.count) == 0) From 89a8c1764c49d0e99d87c0cb05fe44d96cb72b3f Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 29 Oct 2021 18:46:23 -0700 Subject: [PATCH 6/8] Move to long versions, for KV checkpoints. --- cs/src/core/Allocator/AllocatorBase.cs | 8 +-- cs/src/core/Allocator/GenericAllocator.cs | 2 +- cs/src/core/ClientSession/ClientSession.cs | 2 +- cs/src/core/ClientSession/IClientSession.cs | 2 +- cs/src/core/Epochs/LightEpoch.cs | 8 +-- .../DeviceLogCommitCheckpointManager.cs | 2 +- cs/src/core/Index/Common/Contexts.cs | 22 ++++---- cs/src/core/Index/FASTER/FASTER.cs | 4 +- cs/src/core/Index/Recovery/Checkpoint.cs | 2 +- .../core/Index/Recovery/ICheckpointManager.cs | 2 +- .../Index/Recovery/LocalCheckpointManager.cs | 2 +- cs/src/core/Index/Recovery/Recovery.cs | 19 ++++--- .../Index/Synchronization/StateTransitions.cs | 54 +++++++++++++++---- 13 files changed, 80 insertions(+), 49 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 9697ee87e..fb83286eb 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -23,7 +23,7 @@ internal struct FullPageStatus [FieldOffset(8)] public long LastClosedUntilAddress; [FieldOffset(16)] - public int Dirty; + public long Dirty; } [StructLayout(LayoutKind.Explicit)] @@ -413,7 +413,7 @@ private protected void VerifyCompatibleSectorSize(IDevice device) /// /// /// - internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog) + internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog) { long startPage = GetPage(startAddress); long endPage = GetPage(endAddress); @@ -530,7 +530,7 @@ internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage, long } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void MarkPage(long logicalAddress, int version) + internal void MarkPage(long logicalAddress, long version) { var offset = (logicalAddress >> LogPageSizeBits) % BufferSize; if (PageStatusIndicator[offset].Dirty < version) @@ -538,7 +538,7 @@ internal void MarkPage(long logicalAddress, int version) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void MarkPageAtomic(long logicalAddress, int version) + internal void MarkPageAtomic(long logicalAddress, long version) { var offset = (logicalAddress >> LogPageSizeBits) % BufferSize; Utility.MonotonicUpdate(ref PageStatusIndicator[offset].Dirty, version, out _); diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index ca33e799c..8ee811efd 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -1052,7 +1052,7 @@ internal override void MemoryPageScan(long beginAddress, long endAddress) } } - internal override void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog) + internal override void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog) { throw new FasterException("Incremental snapshots not supported with generic allocator"); } diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 5e9520a55..773af9cce 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -1108,7 +1108,7 @@ internal void UnsafeSuspendThread() fht.epoch.Suspend(); } - void IClientSession.AtomicSwitch(int version) + void IClientSession.AtomicSwitch(long version) { fht.AtomicSwitch(ctx, ctx.prevCtx, version, fht._hybridLogCheckpoint.info.checkpointTokens); } diff --git a/cs/src/core/ClientSession/IClientSession.cs b/cs/src/core/ClientSession/IClientSession.cs index 54f8ff976..d8dbc39c2 100644 --- a/cs/src/core/ClientSession/IClientSession.cs +++ b/cs/src/core/ClientSession/IClientSession.cs @@ -5,7 +5,7 @@ namespace FASTER.core { internal interface IClientSession { - void AtomicSwitch(int version); + void AtomicSwitch(long version); } } diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index d856a0530..74d9f72e6 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -474,9 +474,9 @@ private struct EpochActionPair /// Version /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Mark(int markerIdx, int version) + public void Mark(int markerIdx, long version) { - (*(tableAligned + threadEntryIndex)).markers[markerIdx] = version; + (*(tableAligned + threadEntryIndex)).markers[markerIdx] = (int)version; } /// @@ -487,7 +487,7 @@ public void Mark(int markerIdx, int version) /// Version /// Whether complete [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool CheckIsComplete(int markerIdx, int version) + public bool CheckIsComplete(int markerIdx, long version) { // check if all threads have reported complete for (int index = 1; index <= kTableSize; ++index) @@ -496,7 +496,7 @@ public bool CheckIsComplete(int markerIdx, int version) int fc_version = (*(tableAligned + index)).markers[markerIdx]; if (0 != entry_epoch) { - if (fc_version != version && entry_epoch < int.MaxValue) + if ((fc_version != (int)version) && (entry_epoch < int.MaxValue)) { return false; } diff --git a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs index cd06ce56c..281aafb1f 100644 --- a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs +++ b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs @@ -272,7 +272,7 @@ public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) } /// - public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog) + public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog) { deltaLog.Allocate(out int length, out long physicalAddress); if (length < commitMetadata.Length) diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index a1fe55dbd..157bf0ff1 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -36,7 +36,7 @@ internal enum OperationStatus internal class SerializedFasterExecutionContext { - internal int version; + internal long version; internal long serialNum; internal string guid; @@ -56,7 +56,7 @@ public void Write(StreamWriter writer) public void Load(StreamReader reader) { string value = reader.ReadLine(); - version = int.Parse(value); + version = long.Parse(value); guid = reader.ReadLine(); value = reader.ReadLine(); @@ -78,7 +78,7 @@ internal struct PendingContext // Some additional information about the previous attempt internal long id; - internal int version; + internal long version; internal long logicalAddress; internal long serialNum; internal HashBucketEntry entry; @@ -248,7 +248,7 @@ public struct CommitPoint /// public struct HybridLogRecoveryInfo { - const int CheckpointVersion = 2; + const int CheckpointVersion = 3; /// /// Guid @@ -261,11 +261,11 @@ public struct HybridLogRecoveryInfo /// /// Version /// - public int version; + public long version; /// /// Next Version /// - public int nextVersion; + public long nextVersion; /// /// Flushed logical address; indicates the latest immutable address on the main FASTER log at recovery time. /// @@ -318,7 +318,7 @@ public struct HybridLogRecoveryInfo /// /// /// - public void Initialize(Guid token, int _version) + public void Initialize(Guid token, long _version) { guid = token; useSnapshotFile = 0; @@ -356,10 +356,10 @@ public void Initialize(StreamReader reader) useSnapshotFile = int.Parse(value); value = reader.ReadLine(); - version = int.Parse(value); + version = long.Parse(value); value = reader.ReadLine(); - nextVersion = int.Parse(value); + nextVersion = long.Parse(value); value = reader.ReadLine(); flushedLogicalAddress = long.Parse(value); @@ -560,9 +560,9 @@ internal struct HybridLogCheckpointInfo : IDisposable public IDevice deltaFileDevice; public DeltaLog deltaLog; public SemaphoreSlim flushedSemaphore; - public int prevVersion; + public long prevVersion; - public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager) + public void Initialize(Guid token, long _version, ICheckpointManager checkpointManager) { info.Initialize(token, _version); checkpointManager.InitializeLogCheckpoint(token); diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index afff26c1a..db31f2857 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -228,9 +228,7 @@ public FasterKV(long size, LogSettings logSettings, sectorSize = (int)logSettings.LogDevice.SectorSize; Initialize(size, sectorSize); - systemState = default; - systemState.Phase = Phase.REST; - systemState.Version = 1; + systemState = SystemState.Make(Phase.REST, 1); } /// diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs index 38f5ddaf6..3c171a1ab 100644 --- a/cs/src/core/Index/Recovery/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -92,7 +92,7 @@ internal void InitializeIndexCheckpoint(Guid indexToken) _indexCheckpoint.Initialize(indexToken, state[resizeInfo.version].size, checkpointManager); } - internal void InitializeHybridLogCheckpoint(Guid hybridLogToken, int version) + internal void InitializeHybridLogCheckpoint(Guid hybridLogToken, long version) { _hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager); } diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs index 291e73123..e38b464ca 100644 --- a/cs/src/core/Index/Recovery/ICheckpointManager.cs +++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs @@ -65,7 +65,7 @@ public interface ICheckpointManager : IDisposable /// /// /// - void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog); + void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog); /// /// Retrieve commit metadata for specified index checkpoint diff --git a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs index 079826f17..1ce5f2651 100644 --- a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs +++ b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs @@ -277,7 +277,7 @@ public void Dispose() } /// - public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog) + public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, long version, byte[] commitMetadata, DeltaLog deltaLog) { deltaLog.Allocate(out int length, out long physicalAddress); if (length < commitMetadata.Length) diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index c52831e93..7098bebfa 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -363,8 +363,7 @@ private bool RecoverToInitialPage(IndexCheckpointInfo recoveredICInfo, HybridLog currentSyncStateMachine = null; // Set new system state after recovery - systemState.Phase = Phase.REST; - systemState.Version = recoveredHLCInfo.info.version + 1; + systemState = SystemState.Make(Phase.REST, recoveredHLCInfo.info.version + 1); if (!recoveredICInfo.IsDefault() && recoveryCountdown != null) { @@ -442,7 +441,7 @@ private bool SetRecoveryPageRanges(HybridLogCheckpointInfo recoveredHLCInfo, int return true; } - private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int nextVersion, CheckpointType checkpointType, bool undoNextVersion) + private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, CheckpointType checkpointType, bool undoNextVersion) { if (untilAddress <= scanFromAddress) return; @@ -478,7 +477,7 @@ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon WaitUntilAllPagesHaveBeenFlushed(startPage, endPage, recoveryStatus); } - private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, int nextVersion, CheckpointType checkpointType, bool undoNextVersion, CancellationToken cancellationToken) + private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, long nextVersion, CheckpointType checkpointType, bool undoNextVersion, CancellationToken cancellationToken) { if (untilAddress <= scanFromAddress) return; @@ -529,7 +528,7 @@ private RecoveryStatus GetPageRangesToRead(long scanFromAddress, long untilAddre return new RecoveryStatus(capacity, endPage, untilAddress, checkpointType); } - private bool ProcessReadPage(long recoverFromAddress, long untilAddress, int nextVersion, bool undoNextVersion, RecoveryStatus recoveryStatus, long page, int pageIndex) + private bool ProcessReadPage(long recoverFromAddress, long untilAddress, long nextVersion, bool undoNextVersion, RecoveryStatus recoveryStatus, long page, int pageIndex) { var startLogicalAddress = hlog.GetStartLogicalAddress(page); var endLogicalAddress = hlog.GetStartLogicalAddress(page + 1); @@ -569,7 +568,7 @@ private async ValueTask WaitUntilAllPagesHaveBeenFlushedAsync(long startPage, lo await recoveryStatus.WaitFlushAsync(hlog.GetPageIndexForPage(page), cancellationToken).ConfigureAwait(false); } - private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, int nextVersion, Guid guid, bool undoNextVersion, DeltaLog deltaLog, long recoverTo) + private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, long nextVersion, Guid guid, bool undoNextVersion, DeltaLog deltaLog, long recoverTo) { GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst); @@ -626,7 +625,7 @@ private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover recoveryStatus.Dispose(); } - private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, int nextVersion, Guid guid, bool undoNextVersion, DeltaLog deltaLog, long recoverTo, CancellationToken cancellationToken) + private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, long nextVersion, Guid guid, bool undoNextVersion, DeltaLog deltaLog, long recoverTo, CancellationToken cancellationToken) { GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst); @@ -716,7 +715,7 @@ private void GetSnapshotPageRangesToRead(long fromAddress, long untilAddress, lo numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); } - private void ProcessReadSnapshotPage(long fromAddress, long untilAddress, int nextVersion, bool undoNextVersion, RecoveryStatus recoveryStatus, long page, int pageIndex) + private void ProcessReadSnapshotPage(long fromAddress, long untilAddress, long nextVersion, bool undoNextVersion, RecoveryStatus recoveryStatus, long page, int pageIndex) { // Page at hand var startLogicalAddress = hlog.GetStartLogicalAddress(page); @@ -753,7 +752,7 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress, long untilLogicalAddressInPage, long pageLogicalAddress, long pagePhysicalAddress, - int nextVersion, bool undoNextVersion) + long nextVersion, bool undoNextVersion) { bool touched = false; @@ -851,7 +850,7 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, ob } } - internal bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext toCtx, int version, ConcurrentDictionary tokens) + internal bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext toCtx, long version, ConcurrentDictionary tokens) { lock (toCtx) { diff --git a/cs/src/core/Index/Synchronization/StateTransitions.cs b/cs/src/core/Index/Synchronization/StateTransitions.cs index 9e1803979..56458b0c1 100644 --- a/cs/src/core/Index/Synchronization/StateTransitions.cs +++ b/cs/src/core/Index/Synchronization/StateTransitions.cs @@ -65,23 +65,57 @@ public enum Phase : int { [StructLayout(LayoutKind.Explicit, Size = 8)] public struct SystemState { + const int kTotalSizeInBytes = 8; + const int kTotalBits = kTotalSizeInBytes * 8; + + // Phase + const int kPhaseBits = 8; + const int kPhaseShiftInWord = kTotalBits - kPhaseBits; + const long kPhaseMaskInWord = ((1L << kPhaseBits) - 1) << kPhaseShiftInWord; + const long kPhaseMaskInInteger = (1L << kPhaseBits) - 1; + + // Version + const int kVersionBits = kPhaseShiftInWord; + const long kVersionMaskInWord = (1L << kVersionBits) - 1; + /// - /// The current of the operation + /// The word containing information in bitfields /// [FieldOffset(0)] - public Phase Phase; + internal long Word; + /// - /// The version of the database when this operation is complete + /// The current of the operation /// - [FieldOffset(4)] - public int Version; - + public Phase Phase + { + get + { + return (Phase)((Word >> kPhaseShiftInWord) & kPhaseMaskInInteger); + } + set + { + Word &= ~kPhaseMaskInWord; + Word |= (((long)value) & kPhaseMaskInInteger) << kPhaseShiftInWord; + } + } + /// - /// The word containing information in bitfields + /// The version of the database when this operation is complete /// - [FieldOffset(0)] - internal long Word; + public long Version + { + get + { + return Word & kVersionMaskInWord; + } + set + { + Word &= ~kVersionMaskInWord; + Word |= value & kVersionMaskInWord; + } + } /// /// Copy the into this @@ -98,7 +132,7 @@ internal static SystemState Copy(ref SystemState other) /// Create a with the specified values /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static SystemState Make(Phase status, int version) + internal static SystemState Make(Phase status, long version) { var info = default(SystemState); info.Phase = status; From f0f37cf57ea41dbb259e0f79dafcc09e7a59c67d Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Sat, 30 Oct 2021 16:58:26 -0700 Subject: [PATCH 7/8] Shuffle around stuff for better cache alignment in benchmark. --- cs/benchmark/FasterSpanByteYcsbBenchmark.cs | 2 +- cs/benchmark/FasterYcsbBenchmark.cs | 2 +- cs/benchmark/Program.cs | 4 +- cs/benchmark/TestLoader.cs | 49 ++++++++++----------- cs/benchmark/YcsbConstants.cs | 4 +- 5 files changed, 30 insertions(+), 31 deletions(-) diff --git a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs index 6418ce38e..b6d5f6219 100644 --- a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs +++ b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs @@ -71,7 +71,7 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys for (int i = 0; i < 8; i++) input_[i].value = i; - device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.CheckpointRecoverStore, useIoCompletionPort: true); + device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.RecoverMode, useIoCompletionPort: true); if (testLoader.Options.UseSmallMemoryLog) store = new FasterKV diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 0456978d8..98347b65f 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -68,7 +68,7 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade for (int i = 0; i < 8; i++) input_[i].value = i; - device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.CheckpointRecoverStore, useIoCompletionPort: true); + device = Devices.CreateLogDevice(TestLoader.DevicePath, preallocateFile: true, deleteOnClose: !testLoader.RecoverMode, useIoCompletionPort: true); if (testLoader.Options.ThreadCount >= 16) device.ThrottleLimit = testLoader.Options.ThreadCount * 12; diff --git a/cs/benchmark/Program.cs b/cs/benchmark/Program.cs index 3d45a2353..f9273caba 100644 --- a/cs/benchmark/Program.cs +++ b/cs/benchmark/Program.cs @@ -12,8 +12,8 @@ public class Program public static void Main(string[] args) { - TestLoader testLoader = new(); - if (!testLoader.Parse(args)) + TestLoader testLoader = new(args); + if (testLoader.error) return; TestStats testStats = new(testLoader.Options); diff --git a/cs/benchmark/TestLoader.cs b/cs/benchmark/TestLoader.cs index 69ec3a09b..bbee6c0a1 100644 --- a/cs/benchmark/TestLoader.cs +++ b/cs/benchmark/TestLoader.cs @@ -19,27 +19,29 @@ internal interface IKeySetter class TestLoader { - internal Options Options; - - internal BenchmarkType BenchmarkType; - internal LockImpl LockImpl; - internal string Distribution; - + internal readonly Options Options; + internal readonly string Distribution; internal Key[] init_keys = default; internal Key[] txn_keys = default; internal KeySpanByte[] init_span_keys = default; internal KeySpanByte[] txn_span_keys = default; - internal long InitCount; - internal long TxnCount; - internal int MaxKey; + internal readonly BenchmarkType BenchmarkType; + internal readonly LockImpl LockImpl; + internal readonly long InitCount; + internal readonly long TxnCount; + internal readonly int MaxKey; + internal readonly bool RecoverMode; + internal readonly bool error; - internal bool Parse(string[] args) + + internal TestLoader(string[] args) { + error = true; ParserResult result = Parser.Default.ParseArguments(args); if (result.Tag == ParserResultType.NotParsed) { - return false; + return; } Options = result.MapResult(o => o, xs => new Options()); @@ -52,33 +54,34 @@ static bool verifyOption(bool isValid, string name) this.BenchmarkType = (BenchmarkType)Options.Benchmark; if (!verifyOption(Enum.IsDefined(typeof(BenchmarkType), this.BenchmarkType), "Benchmark")) - return false; + return; if (!verifyOption(Options.NumaStyle >= 0 && Options.NumaStyle <= 1, "NumaStyle")) - return false; + return; this.LockImpl = (LockImpl)Options.LockImpl; if (!verifyOption(Enum.IsDefined(typeof(LockImpl), this.LockImpl), "Lock Implementation")) - return false; + return; if (!verifyOption(Options.IterationCount > 0, "Iteration Count")) - return false; + return; if (!verifyOption(Options.ReadPercent >= -1 && Options.ReadPercent <= 100, "Read Percent")) - return false; + return; this.Distribution = Options.DistributionName.ToLower(); if (!verifyOption(this.Distribution == YcsbConstants.UniformDist || this.Distribution == YcsbConstants.ZipfDist, "Distribution")) - return false; + return; if (!verifyOption(this.Options.RunSeconds >= 0, "RunSeconds")) - return false; + return; this.InitCount = this.Options.UseSmallData ? 2500480 : 250000000; this.TxnCount = this.Options.UseSmallData ? 10000000 : 1000000000; this.MaxKey = this.Options.UseSmallData ? 1 << 22 : 1 << 28; + this.RecoverMode = Options.BackupAndRestore && Options.PeriodicCheckpointMilliseconds <= 0; - return true; + error = false; } internal void LoadData() @@ -330,14 +333,10 @@ private static void LoadSyntheticData(string distribution, uin internal string BackupPath => $"{DataPath}/{this.Distribution}_{(this.Options.UseSyntheticData ? "synthetic" : "ycsb")}_{(this.Options.UseSmallData ? "2.5M_10M" : "250M_1000M")}"; - - internal bool CheckpointRecoverStore - => Options.BackupAndRestore && Options.PeriodicCheckpointMilliseconds <= 0; - internal bool MaybeRecoverStore(FasterKV store) { // Recover database for fast benchmark repeat runs. - if (CheckpointRecoverStore) + if (RecoverMode) { if (this.Options.UseSmallData) { @@ -366,7 +365,7 @@ internal bool MaybeRecoverStore(FasterKV store) internal void MaybeCheckpointStore(FasterKV store) { // Checkpoint database for fast benchmark repeat runs. - if (CheckpointRecoverStore) + if (RecoverMode) { Console.WriteLine($"Checkpointing FasterKV to {this.BackupPath} for fast restart"); var sw = Stopwatch.StartNew(); diff --git a/cs/benchmark/YcsbConstants.cs b/cs/benchmark/YcsbConstants.cs index cbbd5e522..03cda44e0 100644 --- a/cs/benchmark/YcsbConstants.cs +++ b/cs/benchmark/YcsbConstants.cs @@ -5,14 +5,14 @@ namespace FASTER.benchmark { - enum BenchmarkType : int + enum BenchmarkType : byte { Ycsb, SpanByte, ConcurrentDictionaryYcsb }; - enum LockImpl : int + enum LockImpl : byte { None, RecordInfo From 84f5f416c1383fff5d340effd91f5e8b7c2778f7 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Mon, 1 Nov 2021 19:52:11 -0700 Subject: [PATCH 8/8] Updates based on review --- cs/src/core/Index/Common/Contexts.cs | 2 +- cs/src/core/Index/Common/RecordInfo.cs | 18 ++++++------- cs/src/core/Index/FASTER/FASTERImpl.cs | 36 ++++++++++---------------- cs/src/core/Index/Recovery/Recovery.cs | 2 +- 4 files changed, 25 insertions(+), 33 deletions(-) diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 157bf0ff1..3c365cbe0 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -221,7 +221,7 @@ public async ValueTask WaitPendingAsync(CancellationToken token = default) await readyResponses.WaitForEntryAsync(token).ConfigureAwait(false); } - public bool NewVersion => phase < Phase.REST; + public bool InNewVersion => phase < Phase.REST; public FasterExecutionContext prevCtx; } diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index f12519bfa..d5f0adfe3 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -10,7 +10,7 @@ namespace FASTER.core { // RecordInfo layout (64 bits total): - // [--][NewVersion][Filler][Dirty][Stub][Sealed] [Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] + // [--][InNewVersion][Filler][Dirty][Stub][Sealed] [Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] // where X = exclusive lock, S = shared lock, R = readcache, A = address, - = unused [StructLayout(LayoutKind.Explicit, Size = 8)] public struct RecordInfo @@ -44,7 +44,7 @@ public struct RecordInfo const int kSealedBitOffset = kStubBitOffset + 1; const int kDirtyBitOffset = kSealedBitOffset + 1; const int kFillerBitOffset = kDirtyBitOffset + 1; - const int kNewVersionBitOffset = kFillerBitOffset + 1; + const int kInNewVersionBitOffset = kFillerBitOffset + 1; const long kTombstoneBitMask = 1L << kTombstoneBitOffset; const long kValidBitMask = 1L << kValidBitOffset; @@ -52,19 +52,19 @@ public struct RecordInfo const long kSealedBitMask = 1L << kSealedBitOffset; const long kDirtyBitMask = 1L << kDirtyBitOffset; const long kFillerBitMask = 1L << kFillerBitOffset; - const long kNewVersionBitMask = 1L << kNewVersionBitOffset; + const long kInNewVersionBitMask = 1L << kInNewVersionBitOffset; [FieldOffset(0)] private long word; - public static void WriteInfo(ref RecordInfo info, bool newVersion, bool tombstone, bool dirty, long previousAddress) + public static void WriteInfo(ref RecordInfo info, bool inNewVersion, bool tombstone, bool dirty, long previousAddress) { info.word = default; info.Tombstone = tombstone; info.SetValid(); info.Dirty = dirty; info.PreviousAddress = previousAddress; - info.NewVersion = newVersion; + info.InNewVersion = inNewVersion; } /// @@ -248,13 +248,13 @@ public bool Filler } } - public bool NewVersion + public bool InNewVersion { - get => (word & kNewVersionBitMask) > 0; + get => (word & kInNewVersionBitMask) > 0; set { - if (value) word |= kNewVersionBitMask; - else word &= ~kNewVersionBitMask; + if (value) word |= kInNewVersionBitMask; + else word &= ~kInNewVersionBitMask; } } diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 453023905..4c46a08f4 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -43,23 +43,15 @@ private bool CheckBucketVersionNew(ref HashBucketEntry entry) // A version shift can only in an address after the checkpoint starts, as v_new threads RCU entries to the tail. if (entry.Address < _hybridLogCheckpoint.info.startLogicalAddress) return false; - // Otherwise, check if the version suffix of the entry matches v_new. - if (UseReadCache && entry.ReadCache) - { - var _addr = readcache.GetPhysicalAddress(entry.Address & ~Constants.kReadCacheBitMask); - if ((entry.Address & ~Constants.kReadCacheBitMask) >= readcache.HeadAddress) - return readcache.GetInfo(_addr).NewVersion; - else - return false; - } + // Read cache entries are not in new version + if (UseReadCache && entry.ReadCache) return false; + + // Check if record has the new version bit set + var _addr = hlog.GetPhysicalAddress(entry.Address); + if (entry.Address >= hlog.HeadAddress) + return hlog.GetInfo(_addr).InNewVersion; else - { - var _addr = hlog.GetPhysicalAddress(entry.Address); - if (entry.Address >= hlog.HeadAddress) - return hlog.GetInfo(_addr).NewVersion; - else - return false; - } + return false; } internal enum LatchOperation : byte @@ -566,7 +558,7 @@ private OperationStatus CreateNewRecordUpsert( var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); ref RecordInfo recordInfo = ref hlog.GetInfo(newPhysicalAddress); RecordInfo.WriteInfo(ref recordInfo, - newVersion: sessionCtx.NewVersion, + inNewVersion: sessionCtx.InNewVersion, tombstone: true, dirty: true, latestLogicalAddress); hlog.Serialize(ref key, newPhysicalAddress); @@ -1549,7 +1541,7 @@ internal OperationStatus InternalContinuePendingRMW