From 12aa82f4ede7c2aee8b5c8e4c362289ffd228d32 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 15 Sep 2020 18:14:42 -0700 Subject: [PATCH] [C#] Recovery improvements (#330) * Avoid writing back untouched pages during recovery * Allow calling recover multple times with newer hlog checkpoints * Fix bug in index fast-forward when using snapshot checkpoints with separate index checkpoints * Recovery from snapshot leaves the log mutable after recovery * Avoid rescanning the log during recovery - only single scan of the log is now needed * Only write back pages that have records that need to be marked as Invalid * Do not write back from snapshot file to main log during recovery as it should remain as mutable unpersisted state. * Code cleanup * Adding testcases for recovery combinations --- cs/src/core/Allocator/AllocatorBase.cs | 9 +- cs/src/core/Index/FASTER/FASTERBase.cs | 23 +- cs/src/core/Index/FasterLog/FasterLog.cs | 2 +- .../Index/Interfaces/NullFasterSession.cs | 2 +- cs/src/core/Index/Recovery/IndexRecovery.cs | 10 +- cs/src/core/Index/Recovery/Recovery.cs | 244 ++++++++++++------ cs/test/AsyncLargeObjectTests.cs | 2 +- cs/test/AsyncTests.cs | 2 +- cs/test/GenericDiskDeleteTests.cs | 5 - cs/test/RecoveryChecks.cs | 198 ++++++++++++++ cs/test/SessionFASTERTests.cs | 209 +++++++-------- 11 files changed, 482 insertions(+), 224 deletions(-) create mode 100644 cs/test/RecoveryChecks.cs diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 692e5ed02..d46da3244 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1118,7 +1118,8 @@ protected void ShiftClosedUntilAddress() /// /// /// - public void RecoveryReset(long tailAddress, long headAddress, long beginAddress) + /// + public void RecoveryReset(long tailAddress, long headAddress, long beginAddress, long readonlyAddress) { long tailPage = GetPage(tailAddress); long offsetInPage = GetOffsetInPage(tailAddress); @@ -1136,9 +1137,9 @@ public void RecoveryReset(long tailAddress, long headAddress, long beginAddress) HeadAddress = headAddress; SafeHeadAddress = headAddress; ClosedUntilAddress = headAddress; - FlushedUntilAddress = tailAddress; - ReadOnlyAddress = tailAddress; - SafeReadOnlyAddress = tailAddress; + FlushedUntilAddress = readonlyAddress; + ReadOnlyAddress = readonlyAddress; + SafeReadOnlyAddress = readonlyAddress; // for the last page which contains tailoffset, it must be open pageIndex = GetPageIndexForAddress(tailAddress); diff --git a/cs/src/core/Index/FASTER/FASTERBase.cs b/cs/src/core/Index/FASTER/FASTERBase.cs index debef6709..8678ac6c7 100644 --- a/cs/src/core/Index/FASTER/FASTERBase.cs +++ b/cs/src/core/Index/FASTER/FASTERBase.cs @@ -257,9 +257,6 @@ public unsafe partial class FasterBase // Used as an atomic counter to check if resizing is complete internal long numPendingChunksToBeSplit; - // Epoch set for resizing - internal int resizeEpoch; - internal LightEpoch epoch; internal ResizeInfo resizeInfo; @@ -309,7 +306,7 @@ public void Initialize(long size, int sector_size) } minTableSize = size; - resizeInfo = default(ResizeInfo); + resizeInfo = default; resizeInfo.status = ResizeOperationStatus.DONE; resizeInfo.version = 0; Initialize(resizeInfo.version, size, sector_size); @@ -385,7 +382,7 @@ internal bool FindTag(long hash, ushort tag, ref HashBucket* bucket, ref int slo if (target_entry_word == 0) { - entry = default(HashBucketEntry); + entry = default; return false; } bucket = (HashBucket*)overflowBucketsAllocator.GetPhysicalAddress(target_entry_word); @@ -408,7 +405,7 @@ internal void FindOrCreateTag(long hash, ushort tag, ref HashBucket* bucket, ref // Install tentative tag in free slot - entry = default(HashBucketEntry); + entry = default; entry.Tag = tag; entry.Address = Constants.kTempInvalidAddress; entry.Pending = false; @@ -451,7 +448,7 @@ private bool FindTagInternal(long hash, ushort tag, ref HashBucket* bucket, ref continue; } - HashBucketEntry entry = default(HashBucketEntry); + HashBucketEntry entry = default; entry.word = target_entry_word; if (tag == entry.Tag) { @@ -491,7 +488,7 @@ private bool FindTagMaybeTentativeInternal(long hash, ushort tag, ref HashBucket continue; } - HashBucketEntry entry = default(HashBucketEntry); + HashBucketEntry entry = default; entry.word = target_entry_word; if (tag == entry.Tag) { @@ -599,7 +596,7 @@ private bool FindTagOrFreeInternal(long hash, ushort tag, ref HashBucket* bucket // Install succeeded bucket = physicalBucketAddress; slot = 0; - entry = default(HashBucketEntry); + entry = default; return recordExists; } } @@ -609,7 +606,7 @@ private bool FindTagOrFreeInternal(long hash, ushort tag, ref HashBucket* bucket { bucket = entry_slot_bucket; } - entry = default(HashBucketEntry); + entry = default; break; } } @@ -650,7 +647,7 @@ private bool FindOtherTagMaybeTentativeInternal(long hash, ushort tag, ref HashB continue; } - HashBucketEntry entry = default(HashBucketEntry); + HashBucketEntry entry = default; entry.word = target_entry_word; if (tag == entry.Tag) { @@ -726,7 +723,7 @@ protected virtual long GetEntryCount() /// /// /// - protected virtual string _DumpDistribution(int version) + protected virtual string DumpDistributionInternal(int version) { var table_size_ = state[version].size; var ptable_ = state[version].tableAligned; @@ -779,7 +776,7 @@ protected virtual string _DumpDistribution(int version) /// public string DumpDistribution() { - return _DumpDistribution(resizeInfo.version); + return DumpDistributionInternal(resizeInfo.version); } } diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index 34650ed41..ca224c021 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -912,7 +912,7 @@ private void Restore(out Dictionary recoveredIterators) recoveredIterators = info.Iterators; - allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress); + allocator.RestoreHybridLog(info.BeginAddress, headAddress, info.FlushedUntilAddress, info.FlushedUntilAddress); CommittedUntilAddress = info.FlushedUntilAddress; CommittedBeginAddress = info.BeginAddress; SafeTailAddress = info.FlushedUntilAddress; diff --git a/cs/src/core/Index/Interfaces/NullFasterSession.cs b/cs/src/core/Index/Interfaces/NullFasterSession.cs index 3c340474d..c17a7ffd8 100644 --- a/cs/src/core/Index/Interfaces/NullFasterSession.cs +++ b/cs/src/core/Index/Interfaces/NullFasterSession.cs @@ -2,7 +2,7 @@ { struct NullFasterSession : IFasterSession { - public static readonly NullFasterSession Instance; + public static readonly NullFasterSession Instance = new NullFasterSession(); public void CheckpointCompletionCallback(string guid, CommitPoint commitPoint) { diff --git a/cs/src/core/Index/Recovery/IndexRecovery.cs b/cs/src/core/Index/Recovery/IndexRecovery.cs index ed14616fa..16eabd72a 100644 --- a/cs/src/core/Index/Recovery/IndexRecovery.cs +++ b/cs/src/core/Index/Recovery/IndexRecovery.cs @@ -68,8 +68,10 @@ internal bool IsFuzzyIndexRecoveryComplete(bool waitUntilComplete = false) return completed1 && completed2; } - //Main Index Recovery Functions - private CountdownEvent mainIndexRecoveryEvent; + /// + /// Main Index Recovery Functions + /// + protected CountdownEvent mainIndexRecoveryEvent; private void BeginMainIndexRecovery( int version, @@ -93,7 +95,7 @@ private void BeginMainIndexRecovery( for (int index = 0; index < numChunks; index++) { long chunkStartBucket = (long)start + (index * chunkSize); - HashIndexPageAsyncReadResult result = default(HashIndexPageAsyncReadResult); + HashIndexPageAsyncReadResult result = default; result.chunkIndex = index; device.ReadAsync(numBytesRead, (IntPtr)chunkStartBucket, chunkSize, AsyncPageReadCallback, result); numBytesRead += chunkSize; @@ -124,7 +126,7 @@ private unsafe void AsyncPageReadCallback(uint errorCode, uint numBytes, object internal void DeleteTentativeEntries() { - HashBucketEntry entry = default(HashBucketEntry); + HashBucketEntry entry = default; int version = resizeInfo.version; var table_size_ = state[version].size; diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 89d19d5a0..207f419db 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -19,6 +19,7 @@ internal class RecoveryStatus public long endPage; public long untilAddress; public int capacity; + public CheckpointType checkpointType; public IDevice recoveryDevice; public long recoveryDevicePageOffset; @@ -26,15 +27,17 @@ internal class RecoveryStatus public ReadStatus[] readStatus; public FlushStatus[] flushStatus; - + public RecoveryStatus(int capacity, long startPage, - long endPage, long untilAddress) + long endPage, long untilAddress, CheckpointType checkpointType) { this.capacity = capacity; this.startPage = startPage; this.endPage = endPage; this.untilAddress = untilAddress; + this.checkpointType = checkpointType; + readStatus = new ReadStatus[capacity]; flushStatus = new FlushStatus[capacity]; for (int i = 0; i < capacity; i++) @@ -167,12 +170,39 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck systemState.phase = Phase.REST; systemState.version = (v + 1); - // Recover fuzzy index from checkpoint + long fromAddress; + + if (!recoveredICInfo.IsDefault() && mainIndexRecoveryEvent != null) + { + Debug.WriteLine("Ignoring index checkpoint as we have already recovered index previously"); + recoveredICInfo = default; + } + if (recoveredICInfo.IsDefault()) - recoveredICInfo.info.startLogicalAddress = recoveredHLCInfo.info.beginAddress; + { + // No index checkpoint - recover from begin of log + fromAddress = recoveredHLCInfo.info.beginAddress; + + // Unless we recovered previously until some hlog address + if (hlog.FlushedUntilAddress > fromAddress) + fromAddress = hlog.FlushedUntilAddress; + } else - RecoverFuzzyIndex(recoveredICInfo); + { + fromAddress = recoveredHLCInfo.info.beginAddress; + if (recoveredICInfo.info.startLogicalAddress > fromAddress) + { + // Index checkpoint given - recover to that + RecoverFuzzyIndex(recoveredICInfo); + fromAddress = recoveredICInfo.info.startLogicalAddress; + } + } + + if ((recoveredHLCInfo.info.useSnapshotFile == 0) && (recoveredHLCInfo.info.finalLogicalAddress <= hlog.GetTailAddress())) + { + return; + } // Recover segment offsets for object log if (recoveredHLCInfo.info.objectLogSegmentOffsets != null) @@ -180,41 +210,66 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck hlog.GetSegmentOffsets(), recoveredHLCInfo.info.objectLogSegmentOffsets.Length); + long tailAddress = recoveredHLCInfo.info.finalLogicalAddress; + long headAddress = recoveredHLCInfo.info.headAddress; + if (numPagesToPreload != -1) + { + var head = (hlog.GetPage(tailAddress) - numPagesToPreload) << hlog.LogPageSizeBits; + if (head > headAddress) + headAddress = head; + } + + long scanFromAddress = headAddress; + if (fromAddress < scanFromAddress) + scanFromAddress = fromAddress; + + // Adjust head address if we need to anyway preload + if (scanFromAddress < headAddress) + { + headAddress = scanFromAddress; + if (headAddress < recoveredHLCInfo.info.headAddress) + headAddress = recoveredHLCInfo.info.headAddress; + } + + if (hlog.FlushedUntilAddress > scanFromAddress) + scanFromAddress = hlog.FlushedUntilAddress; // Make index consistent for version v if (recoveredHLCInfo.info.useSnapshotFile == 0) { - RecoverHybridLog(recoveredICInfo.info, recoveredHLCInfo.info); + RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version); + hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, tailAddress); } else { - RecoverHybridLogFromSnapshotFile(recoveredICInfo.info, recoveredHLCInfo.info); + if (recoveredHLCInfo.info.flushedLogicalAddress < headAddress) + headAddress = recoveredHLCInfo.info.flushedLogicalAddress; + + // First recover from index starting point (fromAddress) to snapshot starting point (flushedLogicalAddress) + RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.version); + // Then recover snapshot into mutable region + RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, recoveredHLCInfo.info.guid); + hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, recoveredHLCInfo.info.flushedLogicalAddress); } - - // Read appropriate hybrid log pages into memory - hlog.RestoreHybridLog(recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.headAddress, recoveredHLCInfo.info.beginAddress, numPagesToPreload); - // Recover session information _recoveredSessions = recoveredHLCInfo.info.continueTokens; } - private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, - HybridLogRecoveryInfo recoveryInfo) + private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int version) { - var fromAddress = indexRecoveryInfo.startLogicalAddress; - var untilAddress = recoveryInfo.finalLogicalAddress; + if (untilAddress < scanFromAddress) + return; - var startPage = hlog.GetPage(fromAddress); + var startPage = hlog.GetPage(scanFromAddress); var endPage = hlog.GetPage(untilAddress); - if ((untilAddress > hlog.GetStartLogicalAddress(endPage)) && (untilAddress > fromAddress)) + if (untilAddress > hlog.GetStartLogicalAddress(endPage)) { endPage++; } - // By default first page has one extra record var capacity = hlog.GetCapacityNumPages(); - var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress); + var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress, CheckpointType.FoldOver); int totalPagesToRead = (int)(endPage - startPage); int numPagesToReadFirst = Math.Min(capacity, totalPagesToRead); @@ -234,27 +289,40 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, var startLogicalAddress = hlog.GetStartLogicalAddress(page); var endLogicalAddress = hlog.GetStartLogicalAddress(page + 1); - var pageFromAddress = 0L; - if (fromAddress > startLogicalAddress && fromAddress < endLogicalAddress) + if (recoverFromAddress < endLogicalAddress) { - pageFromAddress = hlog.GetOffsetInPage(fromAddress); - } + var pageFromAddress = 0L; + if (recoverFromAddress > startLogicalAddress) + { + pageFromAddress = hlog.GetOffsetInPage(recoverFromAddress); + } - var pageUntilAddress = hlog.GetPageSize(); - if (endLogicalAddress > untilAddress) - { - pageUntilAddress = hlog.GetOffsetInPage(untilAddress); - } + var pageUntilAddress = hlog.GetPageSize(); + if (untilAddress < endLogicalAddress) + { + pageUntilAddress = hlog.GetOffsetInPage(untilAddress); + } - var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress); - RecoverFromPage(fromAddress, pageFromAddress, pageUntilAddress, - startLogicalAddress, physicalAddress, recoveryInfo.version); + var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress); + if (RecoverFromPage(recoverFromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, version)) + { + // OS thread flushes current page and issues a read request if necessary + recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; + recoveryStatus.flushStatus[pageIndex] = FlushStatus.Pending; + + hlog.AsyncFlushPages(page, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus); + continue; + } + } - // OS thread flushes current page and issues a read request if necessary - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; - recoveryStatus.flushStatus[pageIndex] = FlushStatus.Pending; + recoveryStatus.flushStatus[pageIndex] = FlushStatus.Done; - hlog.AsyncFlushPages(page, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus); + // Issue next read + if (page + capacity < endPage) + { + recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; + hlog.AsyncReadPagesFromDevice(page + capacity, 1, untilAddress, hlog.AsyncReadPagesCallbackForRecovery, recoveryStatus); + } } // Assert that all pages have been flushed @@ -272,20 +340,12 @@ private void RecoverHybridLog(IndexRecoveryInfo indexRecoveryInfo, } } } - - } - private void RecoverHybridLogFromSnapshotFile( - IndexRecoveryInfo indexRecoveryInfo, - HybridLogRecoveryInfo recoveryInfo) + private void RecoverHybridLogFromSnapshotFile(long fromAddress, long untilAddress, int version, Guid guid) { - var fileStartAddress = recoveryInfo.flushedLogicalAddress; - var fromAddress = indexRecoveryInfo.startLogicalAddress; - var untilAddress = recoveryInfo.finalLogicalAddress; - // Compute startPage and endPage - var startPage = hlog.GetPage(fileStartAddress); + var startPage = hlog.GetPage(fromAddress); var endPage = hlog.GetPage(untilAddress); if (untilAddress > hlog.GetStartLogicalAddress(endPage)) { @@ -294,11 +354,11 @@ private void RecoverHybridLogFromSnapshotFile( // By default first page has one extra record var capacity = hlog.GetCapacityNumPages(); - var recoveryDevice = checkpointManager.GetSnapshotLogDevice(recoveryInfo.guid); - var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(recoveryInfo.guid); + var recoveryDevice = checkpointManager.GetSnapshotLogDevice(guid); + var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(guid); recoveryDevice.Initialize(hlog.GetSegmentSize()); objectLogRecoveryDevice.Initialize(-1); - var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress) + var recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress, CheckpointType.Snapshot) { recoveryDevice = recoveryDevice, objectLogRecoveryDevice = objectLogRecoveryDevice, @@ -319,7 +379,7 @@ private void RecoverHybridLogFromSnapshotFile( for (long page = startPage; page < endPage; page++) { - // Ensure the page is read from file + // Ensure the page is read from file or flushed int pageIndex = hlog.GetPageIndexForPage(page); while (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) { @@ -356,16 +416,21 @@ private void RecoverHybridLogFromSnapshotFile( var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress); RecoverFromPage(fromAddress, pageFromAddress, pageUntilAddress, - startLogicalAddress, physicalAddress, recoveryInfo.version); + startLogicalAddress, physicalAddress, version); } - // OS thread flushes current page and issues a read request if necessary - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; - recoveryStatus.flushStatus[pageIndex] = FlushStatus.Pending; + recoveryStatus.flushStatus[pageIndex] = FlushStatus.Done; - // Write back records from snapshot to main hybrid log - hlog.AsyncFlushPages(page, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus); + // Issue next read + if (page + capacity < endPage) + { + recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; + hlog.AsyncReadPagesFromDevice(page + capacity, 1, untilAddress, hlog.AsyncReadPagesCallbackForRecovery, + recoveryStatus, + recoveryStatus.recoveryDevicePageOffset, + recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice); + } } // Assert and wait until all pages have been flushed @@ -388,13 +453,15 @@ private void RecoverHybridLogFromSnapshotFile( recoveryStatus.objectLogRecoveryDevice.Dispose(); } - private void RecoverFromPage(long startRecoveryAddress, + private bool RecoverFromPage(long startRecoveryAddress, long fromLogicalAddressInPage, long untilLogicalAddressInPage, long pageLogicalAddress, long pagePhysicalAddress, int version) { + bool touched = false; + var hash = default(long); var tag = default(ushort); var pointer = default(long); @@ -433,6 +500,7 @@ private void RecoverFromPage(long startRecoveryAddress, } else { + touched = true; info.Invalid = true; if (info.PreviousAddress < startRecoveryAddress) { @@ -446,6 +514,8 @@ private void RecoverFromPage(long startRecoveryAddress, } pointer += hlog.GetRecordSize(recordStart); } + + return touched; } @@ -466,7 +536,7 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, ob if (result.page + result.context.capacity < result.context.endPage) { long readPage = result.page + result.context.capacity; - if (FoldOverSnapshot) + if (result.context.checkpointType == CheckpointType.FoldOver) { hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery, result.context); } @@ -488,11 +558,12 @@ public unsafe abstract partial class AllocatorBase : IDisposable /// /// Restore log /// - /// - /// /// + /// + /// + /// /// Number of pages to preload into memory after recovery - public void RestoreHybridLog(long untilAddress, long headAddress, long beginAddress, int numPagesToPreload = -1) + public void RestoreHybridLog(long beginAddress, long headAddress, long fromAddress, long untilAddress, int numPagesToPreload = -1) { if (numPagesToPreload != -1) { @@ -514,42 +585,45 @@ public void RestoreHybridLog(long untilAddress, long headAddress, long beginAddr } else { - var tailPage = GetPage(untilAddress); - var headPage = GetPage(headAddress); - - var recoveryStatus = new RecoveryStatus(GetCapacityNumPages(), headPage, tailPage, untilAddress); - for (int i = 0; i < recoveryStatus.capacity; i++) + if (headAddress < fromAddress) { - recoveryStatus.readStatus[i] = ReadStatus.Done; - } + var tailPage = GetPage(fromAddress); + var headPage = GetPage(headAddress); - var numPages = 0; - for (var page = headPage; page <= tailPage; page++) - { - var pageIndex = GetPageIndexForPage(page); - recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; - numPages++; - } + var recoveryStatus = new RecoveryStatus(GetCapacityNumPages(), headPage, tailPage, untilAddress, 0); + for (int i = 0; i < recoveryStatus.capacity; i++) + { + recoveryStatus.readStatus[i] = ReadStatus.Done; + } - AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); + var numPages = 0; + for (var page = headPage; page <= tailPage; page++) + { + var pageIndex = GetPageIndexForPage(page); + recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; + numPages++; + } - var done = false; - while (!done) - { - done = true; - for (long page = headPage; page <= tailPage; page++) + AsyncReadPagesFromDevice(headPage, numPages, untilAddress, AsyncReadPagesCallbackForRecovery, recoveryStatus); + + var done = false; + while (!done) { - int pageIndex = GetPageIndexForPage(page); - if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) + done = true; + for (long page = headPage; page <= tailPage; page++) { - done = false; - break; + int pageIndex = GetPageIndexForPage(page); + if (recoveryStatus.readStatus[pageIndex] == ReadStatus.Pending) + { + done = false; + break; + } } } } } - RecoveryReset(untilAddress, headAddress, beginAddress); + RecoveryReset(untilAddress, headAddress, beginAddress, untilAddress); } internal void AsyncReadPagesCallbackForRecovery(uint errorCode, uint numBytes, object context) diff --git a/cs/test/AsyncLargeObjectTests.cs b/cs/test/AsyncLargeObjectTests.cs index 16bdde561..2b40c7567 100644 --- a/cs/test/AsyncLargeObjectTests.cs +++ b/cs/test/AsyncLargeObjectTests.cs @@ -21,7 +21,7 @@ internal class LargeObjectTests private FasterKV fht2; private IDevice log, objlog; private string test_path; - private MyLargeFunctions functions = new MyLargeFunctions(); + private readonly MyLargeFunctions functions = new MyLargeFunctions(); [SetUp] public void Setup() diff --git a/cs/test/AsyncTests.cs b/cs/test/AsyncTests.cs index 57adb204b..6a48c7a4d 100644 --- a/cs/test/AsyncTests.cs +++ b/cs/test/AsyncTests.cs @@ -20,7 +20,7 @@ public class RecoveryTests { private FasterKV fht1; private FasterKV fht2; - private SimpleFunctions functions = new SimpleFunctions(); + private readonly SimpleFunctions functions = new SimpleFunctions(); private IDevice log; diff --git a/cs/test/GenericDiskDeleteTests.cs b/cs/test/GenericDiskDeleteTests.cs index e7c2dc0cf..8a86e6279 100644 --- a/cs/test/GenericDiskDeleteTests.cs +++ b/cs/test/GenericDiskDeleteTests.cs @@ -71,11 +71,7 @@ public void GenericDiskDeleteTest1() for (int i = 0; i < totalRecords; i++) { - var input = new MyInput(); - var output = new MyOutput(); var key1 = new MyKey { key = i }; - var value = new MyValue { value = i }; - session.Delete(ref key1, 0, 0); } @@ -84,7 +80,6 @@ public void GenericDiskDeleteTest1() var input = new MyInput(); var output = new MyOutput(); var key1 = new MyKey { key = i }; - var value = new MyValue { value = i }; var status = session.Read(ref key1, ref input, ref output, 1, 0); diff --git a/cs/test/RecoveryChecks.cs b/cs/test/RecoveryChecks.cs new file mode 100644 index 000000000..317fbc71c --- /dev/null +++ b/cs/test/RecoveryChecks.cs @@ -0,0 +1,198 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Linq; +using FASTER.core; +using System.IO; +using NUnit.Framework; +using FASTER.test.recovery.sumstore; +using System.Diagnostics; +using System.Net; + +namespace FASTER.test.recovery +{ + + [TestFixture] + public class RecoveryChecks + { + IDevice log; + FasterKV fht1; + const int numOps = 5000; + AdId[] inputArray; + string path; + + [SetUp] + public void Setup() + { + inputArray = new AdId[numOps]; + for (int i = 0; i < numOps; i++) + { + inputArray[i].adId = i; + } + + path = TestContext.CurrentContext.TestDirectory + "\\RecoveryChecks\\"; + log = Devices.CreateLogDevice(path + "hlog.log", deleteOnClose: true); + Directory.CreateDirectory(path); + fht1 = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + } + + [TearDown] + public void TearDown() + { + fht1.Dispose(); + log.Dispose(); + new DirectoryInfo(path).Delete(true); + } + + + [Test] + public void RecoveryCheck1([Values] CheckpointType checkpointType) + { + using var s1 = fht1.NewSession(new SimpleFunctions()); + for (long key = 0; key < 1000; key++) + { + s1.Upsert(ref key, ref key); + } + fht1.TakeHybridLogCheckpointAsync(checkpointType).GetAwaiter().GetResult(); + + using var fht2 = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + fht2.Recover(); + + Assert.IsTrue(fht1.Log.HeadAddress == fht2.Log.HeadAddress); + Assert.IsTrue(fht1.Log.ReadOnlyAddress == fht2.Log.ReadOnlyAddress); + Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress); + + using var s2 = fht2.NewSession(new SimpleFunctions()); + for (long key = 0; key < 1000; key++) + { + long output = default; + var status = s2.Read(ref key, ref output); + Assert.IsTrue(status == Status.OK && output == key); + } + } + + [Test] + public void RecoveryCheck2([Values] CheckpointType checkpointType) + { + using var s1 = fht1.NewSession(new SimpleFunctions()); + + using var fht2 = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + + for (int i = 0; i < 5; i++) + { + for (long key = 1000*i; key < 1000 * i + 1000; key++) + { + s1.Upsert(ref key, ref key); + } + fht1.TakeHybridLogCheckpointAsync(checkpointType).GetAwaiter().GetResult(); + + fht2.Recover(); + + Assert.IsTrue(fht1.Log.HeadAddress == fht2.Log.HeadAddress); + Assert.IsTrue(fht1.Log.ReadOnlyAddress == fht2.Log.ReadOnlyAddress); + Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress); + + using var s2 = fht2.NewSession(new SimpleFunctions()); + for (long key = 0; key < 1000 * i + 1000; key++) + { + long output = default; + var status = s2.Read(ref key, ref output); + Assert.IsTrue(status == Status.OK && output == key); + } + } + } + + [Test] + public void RecoveryCheck3([Values] CheckpointType checkpointType) + { + using var s1 = fht1.NewSession(new SimpleFunctions()); + + using var fht2 = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + + for (int i = 0; i < 5; i++) + { + for (long key = 1000 * i; key < 1000 * i + 1000; key++) + { + s1.Upsert(ref key, ref key); + } + fht1.TakeFullCheckpointAsync(checkpointType).GetAwaiter().GetResult(); + + fht2.Recover(); + + Assert.IsTrue(fht1.Log.HeadAddress == fht2.Log.HeadAddress); + Assert.IsTrue(fht1.Log.ReadOnlyAddress == fht2.Log.ReadOnlyAddress); + Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress); + + using var s2 = fht2.NewSession(new SimpleFunctions()); + for (long key = 0; key < 1000 * i + 1000; key++) + { + long output = default; + var status = s2.Read(ref key, ref output); + Assert.IsTrue(status == Status.OK && output == key); + } + } + } + + [Test] + public void RecoveryCheck4([Values] CheckpointType checkpointType) + { + using var s1 = fht1.NewSession(new SimpleFunctions()); + + using var fht2 = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + + for (int i = 0; i < 5; i++) + { + for (long key = 1000 * i; key < 1000 * i + 1000; key++) + { + s1.Upsert(ref key, ref key); + } + + if (i == 0) + fht1.TakeIndexCheckpointAsync().GetAwaiter().GetResult(); + + fht1.TakeHybridLogCheckpointAsync(checkpointType).GetAwaiter().GetResult(); + + fht2.Recover(); + + Assert.IsTrue(fht1.Log.HeadAddress == fht2.Log.HeadAddress); + Assert.IsTrue(fht1.Log.ReadOnlyAddress == fht2.Log.ReadOnlyAddress); + Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress); + + using var s2 = fht2.NewSession(new SimpleFunctions()); + for (long key = 0; key < 1000 * i + 1000; key++) + { + long output = default; + var status = s2.Read(ref key, ref output); + Assert.IsTrue(status == Status.OK && output == key); + } + } + } + + + } +} diff --git a/cs/test/SessionFASTERTests.cs b/cs/test/SessionFASTERTests.cs index b8a55c495..08441aac8 100644 --- a/cs/test/SessionFASTERTests.cs +++ b/cs/test/SessionFASTERTests.cs @@ -41,10 +41,83 @@ public void TearDown() [Test] public void SessionTest1() { - using (var session = fht.NewSession(new Functions())) + using var session = fht.NewSession(new Functions()); + InputStruct input = default; + OutputStruct output = default; + + var key1 = new KeyStruct { kfield1 = 13, kfield2 = 14 }; + var value = new ValueStruct { vfield1 = 23, vfield2 = 24 }; + + session.Upsert(ref key1, ref value, Empty.Default, 0); + var status = session.Read(ref key1, ref input, ref output, Empty.Default, 0); + + if (status == Status.PENDING) + { + session.CompletePending(true); + } + else { - InputStruct input = default(InputStruct); - OutputStruct output = default(OutputStruct); + Assert.IsTrue(status == Status.OK); + } + + Assert.IsTrue(output.value.vfield1 == value.vfield1); + Assert.IsTrue(output.value.vfield2 == value.vfield2); + } + + + [Test] + public void SessionTest2() + { + using var session1 = fht.NewSession(new Functions()); + using var session2 = fht.NewSession(new Functions()); + InputStruct input = default; + OutputStruct output = default; + + var key1 = new KeyStruct { kfield1 = 14, kfield2 = 15 }; + var value1 = new ValueStruct { vfield1 = 24, vfield2 = 25 }; + var key2 = new KeyStruct { kfield1 = 15, kfield2 = 16 }; + var value2 = new ValueStruct { vfield1 = 25, vfield2 = 26 }; + + session1.Upsert(ref key1, ref value1, Empty.Default, 0); + session2.Upsert(ref key2, ref value2, Empty.Default, 0); + + var status = session1.Read(ref key1, ref input, ref output, Empty.Default, 0); + + if (status == Status.PENDING) + { + session1.CompletePending(true); + } + else + { + Assert.IsTrue(status == Status.OK); + } + + Assert.IsTrue(output.value.vfield1 == value1.vfield1); + Assert.IsTrue(output.value.vfield2 == value1.vfield2); + + status = session2.Read(ref key2, ref input, ref output, Empty.Default, 0); + + if (status == Status.PENDING) + { + session2.CompletePending(true); + } + else + { + Assert.IsTrue(status == Status.OK); + } + + Assert.IsTrue(output.value.vfield1 == value2.vfield1); + Assert.IsTrue(output.value.vfield2 == value2.vfield2); + } + + [Test] + public void SessionTest3() + { + using var session = fht.NewSession(new Functions()); + Task.CompletedTask.ContinueWith((t) => + { + InputStruct input = default; + OutputStruct output = default; var key1 = new KeyStruct { kfield1 = 13, kfield2 = 14 }; var value = new ValueStruct { vfield1 = 23, vfield2 = 24 }; @@ -63,27 +136,23 @@ public void SessionTest1() Assert.IsTrue(output.value.vfield1 == value.vfield1); Assert.IsTrue(output.value.vfield2 == value.vfield2); - } + }).Wait(); } - [Test] - public void SessionTest2() + public void SessionTest4() { - using (var session1 = fht.NewSession(new Functions())) - using (var session2 = fht.NewSession(new Functions())) + using var session1 = fht.NewSession(new Functions()); + using var session2 = fht.NewSession(new Functions()); + var t1 = Task.CompletedTask.ContinueWith((t) => { - InputStruct input = default(InputStruct); - OutputStruct output = default(OutputStruct); + InputStruct input = default; + OutputStruct output = default; var key1 = new KeyStruct { kfield1 = 14, kfield2 = 15 }; var value1 = new ValueStruct { vfield1 = 24, vfield2 = 25 }; - var key2 = new KeyStruct { kfield1 = 15, kfield2 = 16 }; - var value2 = new ValueStruct { vfield1 = 25, vfield2 = 26 }; session1.Upsert(ref key1, ref value1, Empty.Default, 0); - session2.Upsert(ref key2, ref value2, Empty.Default, 0); - var status = session1.Read(ref key1, ref input, ref output, Empty.Default, 0); if (status == Status.PENDING) @@ -97,8 +166,19 @@ public void SessionTest2() Assert.IsTrue(output.value.vfield1 == value1.vfield1); Assert.IsTrue(output.value.vfield2 == value1.vfield2); + }); - status = session2.Read(ref key2, ref input, ref output, Empty.Default, 0); + var t2 = Task.CompletedTask.ContinueWith((t) => + { + InputStruct input = default; + OutputStruct output = default; + + var key2 = new KeyStruct { kfield1 = 15, kfield2 = 16 }; + var value2 = new ValueStruct { vfield1 = 25, vfield2 = 26 }; + + session2.Upsert(ref key2, ref value2, Empty.Default, 0); + + var status = session2.Read(ref key2, ref input, ref output, Empty.Default, 0); if (status == Status.PENDING) { @@ -111,108 +191,19 @@ public void SessionTest2() Assert.IsTrue(output.value.vfield1 == value2.vfield1); Assert.IsTrue(output.value.vfield2 == value2.vfield2); - } - } - - [Test] - public void SessionTest3() - { - using (var session = fht.NewSession(new Functions())) - { - Task.CompletedTask.ContinueWith((t) => - { - InputStruct input = default(InputStruct); - OutputStruct output = default(OutputStruct); - - var key1 = new KeyStruct { kfield1 = 13, kfield2 = 14 }; - var value = new ValueStruct { vfield1 = 23, vfield2 = 24 }; - - session.Upsert(ref key1, ref value, Empty.Default, 0); - var status = session.Read(ref key1, ref input, ref output, Empty.Default, 0); - - if (status == Status.PENDING) - { - session.CompletePending(true); - } - else - { - Assert.IsTrue(status == Status.OK); - } - - Assert.IsTrue(output.value.vfield1 == value.vfield1); - Assert.IsTrue(output.value.vfield2 == value.vfield2); - }).Wait(); - } - } - - [Test] - public void SessionTest4() - { - using (var session1 = fht.NewSession(new Functions())) - using (var session2 = fht.NewSession(new Functions())) - { - var t1 = Task.CompletedTask.ContinueWith((t) => - { - InputStruct input = default(InputStruct); - OutputStruct output = default(OutputStruct); - - var key1 = new KeyStruct { kfield1 = 14, kfield2 = 15 }; - var value1 = new ValueStruct { vfield1 = 24, vfield2 = 25 }; + }); - session1.Upsert(ref key1, ref value1, Empty.Default, 0); - var status = session1.Read(ref key1, ref input, ref output, Empty.Default, 0); - - if (status == Status.PENDING) - { - session1.CompletePending(true); - } - else - { - Assert.IsTrue(status == Status.OK); - } - - Assert.IsTrue(output.value.vfield1 == value1.vfield1); - Assert.IsTrue(output.value.vfield2 == value1.vfield2); - }); - - var t2 = Task.CompletedTask.ContinueWith((t) => - { - InputStruct input = default(InputStruct); - OutputStruct output = default(OutputStruct); - - var key2 = new KeyStruct { kfield1 = 15, kfield2 = 16 }; - var value2 = new ValueStruct { vfield1 = 25, vfield2 = 26 }; - - session2.Upsert(ref key2, ref value2, Empty.Default, 0); - - var status = session2.Read(ref key2, ref input, ref output, Empty.Default, 0); - - if (status == Status.PENDING) - { - session2.CompletePending(true); - } - else - { - Assert.IsTrue(status == Status.OK); - } - - Assert.IsTrue(output.value.vfield1 == value2.vfield1); - Assert.IsTrue(output.value.vfield2 == value2.vfield2); - }); - - t1.Wait(); - t2.Wait(); - } + t1.Wait(); + t2.Wait(); } [Test] public void SessionTest5() { var session = fht.NewSession(new Functions()); - var id = session.ID; - InputStruct input = default(InputStruct); - OutputStruct output = default(OutputStruct); + InputStruct input = default; + OutputStruct output = default; var key1 = new KeyStruct { kfield1 = 16, kfield2 = 17 }; var value1 = new ValueStruct { vfield1 = 26, vfield2 = 27 };