diff --git a/cs/src/core/Allocator/IFasterScanIterator.cs b/cs/src/core/Allocator/IFasterScanIterator.cs index 3101c3fc9..55698679f 100644 --- a/cs/src/core/Allocator/IFasterScanIterator.cs +++ b/cs/src/core/Allocator/IFasterScanIterator.cs @@ -70,5 +70,15 @@ public interface IFasterScanIterator : IDisposable /// Next address /// long NextAddress { get; } + + /// + /// The starting address of the scan + /// + long BeginAddress { get; } + + /// + /// The ending address of the scan + /// + long EndAddress { get; } } } diff --git a/cs/src/core/Allocator/MemoryPageScanIterator.cs b/cs/src/core/Allocator/MemoryPageScanIterator.cs index 2d82f5622..6b54a2b49 100644 --- a/cs/src/core/Allocator/MemoryPageScanIterator.cs +++ b/cs/src/core/Allocator/MemoryPageScanIterator.cs @@ -16,7 +16,7 @@ namespace FASTER.core class MemoryPageScanIterator : IFasterScanIterator { readonly Record[] page; - readonly int end; + readonly int start, end; int offset; @@ -25,6 +25,7 @@ public MemoryPageScanIterator(Record[] page, int start, int end) this.page = new Record[page.Length]; Array.Copy(page, start, this.page, start, end - start); offset = start - 1; + this.start = start; this.end = end; } @@ -32,6 +33,10 @@ public MemoryPageScanIterator(Record[] page, int start, int end) public long NextAddress => offset + 1; + public long BeginAddress => start; + + public long EndAddress => end; + public void Dispose() { } diff --git a/cs/src/core/Allocator/ScanIteratorBase.cs b/cs/src/core/Allocator/ScanIteratorBase.cs index 37a110ec6..21c0db3bd 100644 --- a/cs/src/core/Allocator/ScanIteratorBase.cs +++ b/cs/src/core/Allocator/ScanIteratorBase.cs @@ -52,6 +52,16 @@ public abstract class ScanIteratorBase /// public long NextAddress => nextAddress; + /// + /// The starting address of the scan + /// + public long BeginAddress => beginAddress; + + /// + /// The ending address of the scan + /// + public long EndAddress => endAddress; + /// /// Constructor /// diff --git a/cs/src/core/Allocator/WorkQueueLIFO.cs b/cs/src/core/Allocator/WorkQueueLIFO.cs index 42eb76a62..852a7dfc7 100644 --- a/cs/src/core/Allocator/WorkQueueLIFO.cs +++ b/cs/src/core/Allocator/WorkQueueLIFO.cs @@ -54,7 +54,7 @@ public void EnqueueAndTryWork(T work, bool asTask) private void ProcessQueue() { - // Process items in qork queue + // Process items in work queue while (true) { while (_queue.TryPop(out var workItem)) diff --git a/cs/src/core/Index/Common/CompletedOutput.cs b/cs/src/core/Index/Common/CompletedOutput.cs index 107853595..5e3d8025c 100644 --- a/cs/src/core/Index/Common/CompletedOutput.cs +++ b/cs/src/core/Index/Common/CompletedOutput.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. using System; -using System.Collections.Generic; namespace FASTER.core { @@ -94,7 +93,7 @@ public struct CompletedOutput public ref TInput Input => ref inputContainer.Get(); /// - /// The output for this pending operation. + /// The output for this pending operation. It is the caller's responsibility to dispose this if necessary; will not try to dispose this member. /// public TOutput Output; diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 80ba150bf..37e9cde55 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -18,7 +18,6 @@ internal enum OperationType READ, RMW, UPSERT, - INSERT, DELETE } @@ -266,7 +265,7 @@ public struct HybridLogRecoveryInfo /// public int nextVersion; /// - /// Flushed logical address + /// Flushed logical address; indicates the latest immutable address on the main FASTER log at recovery time. /// public long flushedLogicalAddress; /// @@ -588,37 +587,39 @@ public HybridLogCheckpointInfo Transfer() } public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits, - bool scanDelta, long recoverTo) + bool scanDelta = false, long recoverTo = -1) { deltaFileDevice = checkpointManager.GetDeltaLogDevice(token); - deltaFileDevice.Initialize(-1); - if (deltaFileDevice.GetFileSize(0) > 0) + if (deltaFileDevice is not null) { - deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); - deltaLog.InitializeForReads(); - info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo); - } - else - { - info.Recover(token, checkpointManager, null); + deltaFileDevice.Initialize(-1); + if (deltaFileDevice.GetFileSize(0) > 0) + { + deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); + deltaLog.InitializeForReads(); + info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo); + return; + } } + info.Recover(token, checkpointManager, null); } public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits, out byte[] commitCookie, bool scanDelta = false, long recoverTo = -1) { deltaFileDevice = checkpointManager.GetDeltaLogDevice(token); - deltaFileDevice.Initialize(-1); - if (deltaFileDevice.GetFileSize(0) > 0) + if (deltaFileDevice is not null) { - deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); - deltaLog.InitializeForReads(); - info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo); - } - else - { - info.Recover(token, checkpointManager, out commitCookie); + deltaFileDevice.Initialize(-1); + if (deltaFileDevice.GetFileSize(0) > 0) + { + deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); + deltaLog.InitializeForReads(); + info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo); + return; + } } + info.Recover(token, checkpointManager, out commitCookie); } public bool IsDefault() diff --git a/cs/src/core/Index/Common/HeapContainer.cs b/cs/src/core/Index/Common/HeapContainer.cs index c46d94940..77f1cb0a8 100644 --- a/cs/src/core/Index/Common/HeapContainer.cs +++ b/cs/src/core/Index/Common/HeapContainer.cs @@ -12,9 +12,8 @@ namespace FASTER.core public interface IHeapContainer : IDisposable { /// - /// Get object + /// Get a reference to the contained object /// - /// ref T Get(); } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index f4b30578f..0c6c428a9 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -35,11 +35,15 @@ public partial class FasterKV : FasterBase, { internal readonly AllocatorBase hlog; private readonly AllocatorBase readcache; - private readonly IFasterEqualityComparer comparer; + + /// + /// Compares two keys + /// + protected readonly IFasterEqualityComparer comparer; internal readonly bool UseReadCache; private readonly CopyReadsToTail CopyReadsToTail; - private readonly bool FoldOverSnapshot; + private readonly bool UseFoldOverCheckpoint; internal readonly int sectorSize; private readonly bool WriteDefaultOnDelete; internal bool RelaxedCPR; @@ -145,7 +149,7 @@ public FasterKV(long size, LogSettings logSettings, if (checkpointSettings.CheckpointManager == null) disposeCheckpointManager = true; - FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; + UseFoldOverCheckpoint = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; CopyReadsToTail = logSettings.CopyReadsToTail; if (logSettings.ReadCacheSettings != null) @@ -244,7 +248,7 @@ public FasterKV(long size, LogSettings logSettings, /// operation such as growing the index). Use CompleteCheckpointAsync to wait completion. /// public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1) - => TakeFullCheckpoint(out token, this.FoldOverSnapshot ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion); + => TakeFullCheckpoint(out token, this.UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion); /// /// Initiate full checkpoint @@ -353,17 +357,7 @@ public bool TakeIndexCheckpoint(out Guid token) /// /// Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion. public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1) - { - ISynchronizationTask backend; - if (FoldOverSnapshot) - backend = new FoldOverCheckpointTask(); - else - backend = new SnapshotCheckpointTask(); - - var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion)); - token = _hybridLogCheckpointToken; - return result; - } + => TakeHybridLogCheckpoint(out token, UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, tryIncremental: false, targetVersion); /// /// Initiate log-only checkpoint @@ -631,7 +625,6 @@ internal Status ContextUpsert(ref Key key while (internalStatus == OperationStatus.RETRY_NOW); Status status; - if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) { status = (Status)internalStatus; diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 2752cd1b3..4c1fc07db 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -396,7 +396,7 @@ internal OperationStatus InternalUpsert( goto CreateNewRecord; } -#region Entry latch operation + #region Entry latch operation if (sessionCtx.phase != Phase.REST) { latchDestination = AcquireLatchUpsert(sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress); @@ -717,8 +717,8 @@ internal OperationStatus InternalRMW( { ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); if (!recordInfo.Tombstone) - { - if (FoldOverSnapshot) + { + if (UseFoldOverCheckpoint) { Debug.Assert(recordInfo.Version == sessionCtx.version); } @@ -780,9 +780,9 @@ internal OperationStatus InternalRMW( } } -#endregion + #endregion -#region Create new record + #region Create new record CreateNewRecord: if (latchDestination != LatchDestination.CreatePendingContext) { @@ -1402,7 +1402,9 @@ internal void InternalContinuePendingReadCopyToTail fht, Functions functions, long unti public long NextAddress => enumerationPhase == 0 ? iter1.NextAddress : iter2.NextAddress; + public long BeginAddress => enumerationPhase == 0 ? iter1.BeginAddress : iter2.BeginAddress; + + public long EndAddress => enumerationPhase == 0 ? iter1.EndAddress : iter2.EndAddress; + public void Dispose() { iter1?.Dispose(); diff --git a/cs/src/core/Index/Interfaces/IFasterSession.cs b/cs/src/core/Index/Interfaces/IFasterSession.cs index aa35804c1..d8b8c4e80 100644 --- a/cs/src/core/Index/Interfaces/IFasterSession.cs +++ b/cs/src/core/Index/Interfaces/IFasterSession.cs @@ -1,4 +1,7 @@ -namespace FASTER.core +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +namespace FASTER.core { /// /// Provides thread management and callback to checkpoint completion (called state machine). diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs index a2ccfc506..291e73123 100644 --- a/cs/src/core/Index/Recovery/ICheckpointManager.cs +++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs @@ -82,7 +82,7 @@ public interface ICheckpointManager : IDisposable /// whether or not to scan through the delta log to acquire latest entry /// version upper bound to scan for in the delta log. Function will return the largest version metadata no greater than the given version. /// Metadata, or null if invalid - byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo); + byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta = false, long recoverTo = -1); /// /// Get list of index checkpoint tokens, in order of usage preference diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 47a52d536..9cf447e10 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -304,6 +304,7 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck // Recover session information hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress); _recoveredSessions = recoveredHLCInfo.info.continueTokens; + checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid); recoveredHLCInfo.Dispose(); } @@ -348,6 +349,7 @@ await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogical hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress); _recoveredSessions = recoveredHLCInfo.info.continueTokens; + checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid); recoveredHLCInfo.Dispose(); } diff --git a/cs/test/BasicFASTERTests.cs b/cs/test/BasicFASTERTests.cs index 3c1eadb8e..1e93526ca 100644 --- a/cs/test/BasicFASTERTests.cs +++ b/cs/test/BasicFASTERTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Diagnostics; using System.Linq; using FASTER.core; using NUnit.Framework; @@ -244,7 +245,12 @@ public unsafe void NativeInMemWriteRead2() public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType) { InputStruct input = default; - int count = 200; + const int RandSeed = 10; + const int RandRange = 10000; + const int NumRecs = 200; + + Random r = new Random(RandSeed); + var sw = Stopwatch.StartNew(); string filename = path + "TestShiftHeadAddress" + deviceType.ToString() + ".log"; log = TestUtils.CreateTestDevice(deviceType, filename); @@ -253,20 +259,21 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType session = fht.For(new Functions()).NewSession(); - Random r = new Random(10); - for (int c = 0; c < count; c++) + for (int c = 0; c < NumRecs; c++) { - var i = r.Next(10000); + var i = r.Next(RandRange); var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; session.Upsert(ref key1, ref value, Empty.Default, 0); } + Console.WriteLine($"Time to insert {NumRecs} records: {sw.ElapsedMilliseconds} ms"); - r = new Random(10); + r = new Random(RandSeed); + sw.Restart(); - for (int c = 0; c < count; c++) + for (int c = 0; c < NumRecs; c++) { - var i = r.Next(10000); + var i = r.Next(RandRange); OutputStruct output = default; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; @@ -279,20 +286,30 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType Assert.AreEqual(value.vfield1, output.value.vfield1); Assert.AreEqual(value.vfield2, output.value.vfield2); } + Console.WriteLine($"Time to read {NumRecs} in-memory records: {sw.ElapsedMilliseconds} ms"); // Shift head and retry - should not find in main memory now fht.Log.FlushAndEvict(true); - r = new Random(10); - for (int c = 0; c < count; c++) + r = new Random(RandSeed); + sw.Restart(); + + for (int c = 0; c < NumRecs; c++) { - var i = r.Next(10000); + var i = r.Next(RandRange); OutputStruct output = default; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; + var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; + Status foundStatus = session.Read(ref key1, ref input, ref output, Empty.Default, 0); Assert.AreEqual(Status.PENDING, foundStatus); - session.CompletePending(true); + session.CompletePendingWithOutputs(out var outputs, wait: true); + Assert.IsTrue(outputs.Next()); + Assert.AreEqual(value.vfield1, outputs.Current.Output.value.vfield1); + outputs.Current.Dispose(); + Assert.IsFalse(outputs.Next()); } + Console.WriteLine($"Time to read {NumRecs} on-disk records: {sw.ElapsedMilliseconds} ms"); } [Test] diff --git a/cs/test/LockTests.cs b/cs/test/LockTests.cs index 35d131579..225da38f9 100644 --- a/cs/test/LockTests.cs +++ b/cs/test/LockTests.cs @@ -15,11 +15,6 @@ internal class LockTests { internal class Functions : AdvancedSimpleFunctions { - public override void ConcurrentReader(ref int key, ref int input, ref int value, ref int dst, ref RecordInfo recordInfo, long address) - { - dst = value; - } - bool Increment(ref int dst) { ++dst; diff --git a/cs/test/ObjectTestTypes.cs b/cs/test/ObjectTestTypes.cs index fd883443c..46a19831f 100644 --- a/cs/test/ObjectTestTypes.cs +++ b/cs/test/ObjectTestTypes.cs @@ -230,7 +230,6 @@ public override void SingleReader(ref MyKey key, ref MyInput input, ref MyValue { if (dst == null) dst = new MyOutput(); - dst.value = value; } @@ -288,7 +287,6 @@ public class MyLargeValue public MyLargeValue() { - } public MyLargeValue(int size) diff --git a/cs/test/RecoveryTestTypes.cs b/cs/test/RecoveryTestTypes.cs index a0f0bf74c..a3f6b649b 100644 --- a/cs/test/RecoveryTestTypes.cs +++ b/cs/test/RecoveryTestTypes.cs @@ -13,6 +13,8 @@ public struct AdId : IFasterEqualityComparer public long GetHashCode64(ref AdId key) => Utility.GetHashCode(key.adId); public bool Equals(ref AdId k1, ref AdId k2) => k1.adId == k2.adId; + + public override string ToString() => adId.ToString(); } public struct AdInput diff --git a/cs/test/SimpleRecoveryTest.cs b/cs/test/SimpleRecoveryTest.cs index b0c39997a..1ae2cd2c8 100644 --- a/cs/test/SimpleRecoveryTest.cs +++ b/cs/test/SimpleRecoveryTest.cs @@ -72,7 +72,7 @@ public async ValueTask LocalDeviceSimpleRecoveryTest([Values] CheckpointType che { checkpointManager = new DeviceLogCommitCheckpointManager( new LocalStorageNamedDeviceFactory(), - new DefaultCheckpointNamingScheme($"{TestUtils.MethodTestDir}/{TestUtils.AzureTestDirectory}")); + new DefaultCheckpointNamingScheme($"{TestUtils.MethodTestDir}/chkpt")); await SimpleRecoveryTest1_Worker(checkpointType, isAsync, testCommitCookie); checkpointManager.PurgeAll(); } @@ -140,9 +140,16 @@ private async ValueTask SimpleRecoveryTest1_Worker(CheckpointType checkpointType var status = session2.Read(ref inputArray[key], ref inputArg, ref output, Empty.Default, 0); if (status == Status.PENDING) - session2.CompletePending(true); + { + session2.CompletePendingWithOutputs(out var outputs, wait: true); + Assert.IsTrue(outputs.Next()); + output = outputs.Current.Output; + Assert.IsFalse(outputs.Next()); + outputs.Current.Dispose(); + } else - Assert.IsTrue(output.value.numClicks == key); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key, output.value.numClicks); } session2.Dispose(); } @@ -193,7 +200,7 @@ public async ValueTask SimpleRecoveryTest2([Values]CheckpointType checkpointType session2.CompletePending(true); else { - Assert.IsTrue(output.value.numClicks == key); + Assert.AreEqual(key, output.value.numClicks); } } session2.Dispose(); @@ -248,8 +255,8 @@ public class AdSimpleFunctions : FunctionsBase