From b29043d92744389272127d2bc7fdf60c08373e2b Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 17 Sep 2020 19:01:26 -0700 Subject: [PATCH] * Sample to show read-only secondary reader store constantly catching up to primary store * Add recovery option to recover without writing undo (for read-only secondaries) --- cs/FASTER.sln | 11 ++ cs/samples/SecondaryReaderStore/Program.cs | 116 ++++++++++++++++++ .../SecondaryReaderStore.csproj | 15 +++ cs/src/core/Allocator/BlittableAllocator.cs | 2 +- cs/src/core/Index/FASTER/FASTER.cs | 17 +-- cs/src/core/Index/Interfaces/IFasterKV.cs | 9 +- cs/src/core/Index/Recovery/Recovery.cs | 34 ++--- 7 files changed, 179 insertions(+), 25 deletions(-) create mode 100644 cs/samples/SecondaryReaderStore/Program.cs create mode 100644 cs/samples/SecondaryReaderStore/SecondaryReaderStore.csproj diff --git a/cs/FASTER.sln b/cs/FASTER.sln index 17c2dd5fe..224ce475f 100644 --- a/cs/FASTER.sln +++ b/cs/FASTER.sln @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogPubSub", "samples\ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureBackedStore", "samples\AzureBackedStore\AzureBackedStore.csproj", "{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SecondaryReaderStore", "samples\SecondaryReaderStore\SecondaryReaderStore.csproj", "{EBE313E5-22D2-4C74-BA1F-16B60404B335}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -205,6 +207,14 @@ Global {E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}.Release|Any CPU.Build.0 = Release|x64 {E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}.Release|x64.ActiveCfg = Release|x64 {E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}.Release|x64.Build.0 = Release|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|Any CPU.ActiveCfg = Debug|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|Any CPU.Build.0 = Debug|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|x64.ActiveCfg = Debug|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|x64.Build.0 = Debug|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|Any CPU.ActiveCfg = Release|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|Any CPU.Build.0 = Release|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|x64.ActiveCfg = Release|x64 + {EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|x64.Build.0 = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -230,6 +240,7 @@ Global {DACB12EB-8A64-4BB4-BFA3-0377AACD28D3} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} {F6EA46D5-DD66-47F2-8FAC-370FDD733DD3} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} {E2A1C205-4D35-448C-A72F-B9A4AE28EB4E} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} + {EBE313E5-22D2-4C74-BA1F-16B60404B335} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC} diff --git a/cs/samples/SecondaryReaderStore/Program.cs b/cs/samples/SecondaryReaderStore/Program.cs new file mode 100644 index 000000000..4e9ccd075 --- /dev/null +++ b/cs/samples/SecondaryReaderStore/Program.cs @@ -0,0 +1,116 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; + +namespace SecondaryReaderStore +{ + class Program + { + static FasterKV primaryStore; + static FasterKV secondaryStore; + const int numOps = 3000; + const int checkpointFreq = 500; + + static void Main() + { + // Create files for storing data + var path = Path.GetTempPath() + "SecondaryReaderStore\\"; + if (Directory.Exists(path)) + new DirectoryInfo(path).Delete(true); + + var log = Devices.CreateLogDevice(path + "hlog.log", deleteOnClose: true); + + primaryStore = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + + secondaryStore = new FasterKV + (1L << 10, + logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 }, + checkpointSettings: new CheckpointSettings { CheckpointDir = path } + ); + + var p = new Thread(new ThreadStart(PrimaryWriter)); + var s = new Thread(new ThreadStart(SecondaryReader)); + + p.Start(); s.Start(); + p.Join(); s.Join(); + + log.Dispose(); + new DirectoryInfo(path).Delete(true); + } + + static void PrimaryWriter() + { + using var s1 = primaryStore.NewSession(new SimpleFunctions()); + + Console.WriteLine($"Upserting keys at primary starting from key 0"); + for (long key=0; key 0 && key % checkpointFreq == 0) + { + Console.WriteLine($"Checkpointing primary until key {key - 1}"); + primaryStore.TakeHybridLogCheckpointAsync(CheckpointType.Snapshot).GetAwaiter().GetResult(); + Console.WriteLine($"Upserting keys at primary starting from {key}"); + } + + Thread.Sleep(10); + s1.Upsert(ref key, ref key); + + } + Console.WriteLine($"Checkpointing primary until key {numOps - 1}"); + primaryStore.TakeHybridLogCheckpointAsync(CheckpointType.Snapshot).GetAwaiter().GetResult(); + Console.WriteLine("Shutting down primary"); + } + + static void SecondaryReader() + { + using var s1 = secondaryStore.NewSession(new SimpleFunctions()); + + long key = 0, output = 0; + while (true) + { + try + { + secondaryStore.Recover(undoFutureVersions: false); // read-only recovery, no writing back undos + } + catch + { + Console.WriteLine("Nothing to recover to at secondary, retrying"); + Thread.Sleep(500); + continue; + } + + while (true) + { + var status = s1.Read(ref key, ref output); + if (status == Status.NOTFOUND) + { + Console.WriteLine($"Key {key} not found at secondary; performing recovery to catch up"); + Thread.Sleep(500); + break; + } + if (key != output) + throw new Exception($"Invalid value {output} found for key {key} at secondary"); + + Console.WriteLine($"Successfully read key {key}, value {output} at secondary"); + key++; + if (key == numOps) + { + Console.WriteLine("Shutting down secondary"); + return; + } + } + } + + } + + } +} diff --git a/cs/samples/SecondaryReaderStore/SecondaryReaderStore.csproj b/cs/samples/SecondaryReaderStore/SecondaryReaderStore.csproj new file mode 100644 index 000000000..0d269e4a0 --- /dev/null +++ b/cs/samples/SecondaryReaderStore/SecondaryReaderStore.csproj @@ -0,0 +1,15 @@ + + + + Exe + true + netcoreapp3.1 + x64 + win7-x64 + + + + + + + diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index b97e2e10c..f88071a5d 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -262,7 +262,7 @@ protected override void ReadAsync( /// /// /// - protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int numBytes, DeviceIOCompletionCallback callback, AsyncIOContext context, SectorAlignedMemory result = default(SectorAlignedMemory)) + protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int numBytes, DeviceIOCompletionCallback callback, AsyncIOContext context, SectorAlignedMemory result = default) { throw new InvalidOperationException("AsyncReadRecordObjectsToMemory invalid for BlittableAllocator"); } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index ca0196e38..5a0178a8c 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -379,10 +379,11 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp /// /// Recover from the latest checkpoint (blocking operation) /// - /// - public void Recover(int numPagesToPreload = -1) + /// Number of pages to preload into memory (beyond what needs to be read for recovery) + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + public void Recover(int numPagesToPreload = -1, bool undoFutureVersions = true) { - InternalRecoverFromLatestCheckpoints(numPagesToPreload); + InternalRecoverFromLatestCheckpoints(numPagesToPreload, undoFutureVersions); } /// @@ -390,9 +391,10 @@ public void Recover(int numPagesToPreload = -1) /// /// Token /// Number of pages to preload into memory after recovery - public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1) + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true) { - InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload); + InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoFutureVersions); } /// @@ -401,9 +403,10 @@ public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1) /// /// /// Number of pages to preload into memory after recovery - public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1) + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true) { - InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload); + InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoFutureVersions); } /// diff --git a/cs/src/core/Index/Interfaces/IFasterKV.cs b/cs/src/core/Index/Interfaces/IFasterKV.cs index d8572f485..a319aacaf 100644 --- a/cs/src/core/Index/Interfaces/IFasterKV.cs +++ b/cs/src/core/Index/Interfaces/IFasterKV.cs @@ -73,14 +73,16 @@ ClientSession ResumeSession /// Number of pages to preload into memory after recovery - void Recover(int numPagesToPreload = -1); + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + void Recover(int numPagesToPreload = -1, bool undoFutureVersions = true); /// /// Recover using full checkpoint token /// /// /// Number of pages to preload into memory after recovery - void Recover(Guid fullcheckpointToken, int numPagesToPreload = -1); + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + void Recover(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true); /// /// Recover using a separate index and log checkpoint token @@ -88,7 +90,8 @@ ClientSession ResumeSession /// /// Number of pages to preload into memory after recovery - void Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1); + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + void Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoFutureVersions = true); /// /// Complete ongoing checkpoint (spin-wait) diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 207f419db..b0bc46281 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -52,7 +52,7 @@ public RecoveryStatus(int capacity, public unsafe partial class FasterKV : FasterBase, IFasterKV { - private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload) + private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload, bool undoFutureVersions) { Debug.WriteLine("********* Primary Recovery Information ********"); @@ -110,7 +110,7 @@ private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload) Debug.WriteLine("No index checkpoint found, recovering from beginning of log"); } - InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload); + InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions); } private bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryInfo recoveryInfo) @@ -120,7 +120,7 @@ private bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryIn return l1 <= l2; } - private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload) + private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoFutureVersions) { Debug.WriteLine("********* Primary Recovery Information ********"); Debug.WriteLine("Index Checkpoint: {0}", indexToken); @@ -157,10 +157,10 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesT } } - InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload); + InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions); } - private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload) + private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoFutureVersions) { // Ensure active state machine to null currentSyncStateMachine = null; @@ -186,6 +186,12 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck // Unless we recovered previously until some hlog address if (hlog.FlushedUntilAddress > fromAddress) fromAddress = hlog.FlushedUntilAddress; + + // Start recovery at least from beginning of fuzzy log region + // Needed if we are recovering to the same checkpoint a second time, with undo + // set to true during the second time. + if (recoveredHLCInfo.info.startLogicalAddress < fromAddress) + fromAddress = recoveredHLCInfo.info.startLogicalAddress; } else { @@ -237,7 +243,7 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck // Make index consistent for version v if (recoveredHLCInfo.info.useSnapshotFile == 0) { - RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version); + RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, undoFutureVersions); hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, tailAddress); } else @@ -246,9 +252,9 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck headAddress = recoveredHLCInfo.info.flushedLogicalAddress; // First recover from index starting point (fromAddress) to snapshot starting point (flushedLogicalAddress) - RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.version); + RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.version, undoFutureVersions); // Then recover snapshot into mutable region - RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, recoveredHLCInfo.info.guid); + RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, recoveredHLCInfo.info.guid, undoFutureVersions); hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, recoveredHLCInfo.info.flushedLogicalAddress); } @@ -256,7 +262,7 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck _recoveredSessions = recoveredHLCInfo.info.continueTokens; } - private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int version) + private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int version, bool undoFutureVersions) { if (untilAddress < scanFromAddress) return; @@ -304,7 +310,7 @@ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon } var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress); - if (RecoverFromPage(recoverFromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, version)) + if (RecoverFromPage(recoverFromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, version, undoFutureVersions)) { // OS thread flushes current page and issues a read request if necessary recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending; @@ -342,7 +348,7 @@ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon } } - private void RecoverHybridLogFromSnapshotFile(long fromAddress, long untilAddress, int version, Guid guid) + private void RecoverHybridLogFromSnapshotFile(long fromAddress, long untilAddress, int version, Guid guid, bool undoFutureVersions) { // Compute startPage and endPage var startPage = hlog.GetPage(fromAddress); @@ -416,7 +422,7 @@ private void RecoverHybridLogFromSnapshotFile(long fromAddress, long untilAddres var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress); RecoverFromPage(fromAddress, pageFromAddress, pageUntilAddress, - startLogicalAddress, physicalAddress, version); + startLogicalAddress, physicalAddress, version, undoFutureVersions); } @@ -458,7 +464,7 @@ private bool RecoverFromPage(long startRecoveryAddress, long untilLogicalAddressInPage, long pageLogicalAddress, long pagePhysicalAddress, - int version) + int version, bool undoFutureVersions) { bool touched = false; @@ -490,7 +496,7 @@ private bool RecoverFromPage(long startRecoveryAddress, entry = default; FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress); - if (info.Version <= version) + if (info.Version <= version || !undoFutureVersions) { entry.Address = pageLogicalAddress + pointer; entry.Tag = tag;