Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Sample for read-only secondary KV store continuously recovering to primary's log/checkpoints #334

Merged
merged 2 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogPubSub", "samples\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureBackedStore", "samples\AzureBackedStore\AzureBackedStore.csproj", "{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SecondaryReaderStore", "samples\SecondaryReaderStore\SecondaryReaderStore.csproj", "{EBE313E5-22D2-4C74-BA1F-16B60404B335}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -205,6 +207,14 @@ Global
{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}.Release|Any CPU.Build.0 = Release|x64
{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}.Release|x64.ActiveCfg = Release|x64
{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E}.Release|x64.Build.0 = Release|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|Any CPU.ActiveCfg = Debug|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|Any CPU.Build.0 = Debug|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|x64.ActiveCfg = Debug|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Debug|x64.Build.0 = Debug|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|Any CPU.ActiveCfg = Release|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|Any CPU.Build.0 = Release|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|x64.ActiveCfg = Release|x64
{EBE313E5-22D2-4C74-BA1F-16B60404B335}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -230,6 +240,7 @@ Global
{DACB12EB-8A64-4BB4-BFA3-0377AACD28D3} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{F6EA46D5-DD66-47F2-8FAC-370FDD733DD3} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{EBE313E5-22D2-4C74-BA1F-16B60404B335} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
116 changes: 116 additions & 0 deletions cs/samples/SecondaryReaderStore/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;

namespace SecondaryReaderStore
{
class Program
{
static FasterKV<long, long> primaryStore;
static FasterKV<long, long> secondaryStore;
const int numOps = 3000;
const int checkpointFreq = 500;

static void Main()
{
// Create files for storing data
var path = Path.GetTempPath() + "SecondaryReaderStore\\";
if (Directory.Exists(path))
new DirectoryInfo(path).Delete(true);

var log = Devices.CreateLogDevice(path + "hlog.log", deleteOnClose: true);

primaryStore = new FasterKV<long, long>
(1L << 10,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 },
checkpointSettings: new CheckpointSettings { CheckpointDir = path }
);

secondaryStore = new FasterKV<long, long>
(1L << 10,
logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20 },
checkpointSettings: new CheckpointSettings { CheckpointDir = path }
);

var p = new Thread(new ThreadStart(PrimaryWriter));
var s = new Thread(new ThreadStart(SecondaryReader));

p.Start(); s.Start();
p.Join(); s.Join();

log.Dispose();
new DirectoryInfo(path).Delete(true);
}

static void PrimaryWriter()
{
using var s1 = primaryStore.NewSession(new SimpleFunctions<long, long>());

Console.WriteLine($"Upserting keys at primary starting from key 0");
for (long key=0; key<numOps; key++)
{
if (key > 0 && key % checkpointFreq == 0)
{
Console.WriteLine($"Checkpointing primary until key {key - 1}");
primaryStore.TakeHybridLogCheckpointAsync(CheckpointType.Snapshot).GetAwaiter().GetResult();
Console.WriteLine($"Upserting keys at primary starting from {key}");
}

Thread.Sleep(10);
s1.Upsert(ref key, ref key);

}
Console.WriteLine($"Checkpointing primary until key {numOps - 1}");
primaryStore.TakeHybridLogCheckpointAsync(CheckpointType.Snapshot).GetAwaiter().GetResult();
Console.WriteLine("Shutting down primary");
}

static void SecondaryReader()
{
using var s1 = secondaryStore.NewSession(new SimpleFunctions<long, long>());

long key = 0, output = 0;
while (true)
{
try
{
secondaryStore.Recover(undoFutureVersions: false); // read-only recovery, no writing back undos
}
catch
{
Console.WriteLine("Nothing to recover to at secondary, retrying");
Thread.Sleep(500);
continue;
}

while (true)
{
var status = s1.Read(ref key, ref output);
if (status == Status.NOTFOUND)
{
Console.WriteLine($"Key {key} not found at secondary; performing recovery to catch up");
Thread.Sleep(500);
break;
}
if (key != output)
throw new Exception($"Invalid value {output} found for key {key} at secondary");

Console.WriteLine($"Successfully read key {key}, value {output} at secondary");
key++;
if (key == numOps)
{
Console.WriteLine("Shutting down secondary");
return;
}
}
}

}

}
}
15 changes: 15 additions & 0 deletions cs/samples/SecondaryReaderStore/SecondaryReaderStore.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFramework>netcoreapp3.1</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifier>win7-x64</RuntimeIdentifier>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/BlittableAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ protected override void ReadAsync<TContext>(
/// <param name="callback"></param>
/// <param name="context"></param>
/// <param name="result"></param>
protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int numBytes, DeviceIOCompletionCallback callback, AsyncIOContext<Key, Value> context, SectorAlignedMemory result = default(SectorAlignedMemory))
protected override void AsyncReadRecordObjectsToMemory(long fromLogical, int numBytes, DeviceIOCompletionCallback callback, AsyncIOContext<Key, Value> context, SectorAlignedMemory result = default)
{
throw new InvalidOperationException("AsyncReadRecordObjectsToMemory invalid for BlittableAllocator");
}
Expand Down
17 changes: 10 additions & 7 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,20 +379,22 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp
/// <summary>
/// Recover from the latest checkpoint (blocking operation)
/// </summary>
/// <param name="numPagesToPreload"></param>
public void Recover(int numPagesToPreload = -1)
/// <param name="numPagesToPreload">Number of pages to preload into memory (beyond what needs to be read for recovery)</param>
/// <param name="undoFutureVersions">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
public void Recover(int numPagesToPreload = -1, bool undoFutureVersions = true)
{
InternalRecoverFromLatestCheckpoints(numPagesToPreload);
InternalRecoverFromLatestCheckpoints(numPagesToPreload, undoFutureVersions);
}

/// <summary>
/// Recover from specific token (blocking operation)
/// </summary>
/// <param name="fullCheckpointToken">Token</param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1)
/// <param name="undoFutureVersions">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true)
{
InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload);
InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoFutureVersions);
}

/// <summary>
Expand All @@ -401,9 +403,10 @@ public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1)
/// <param name="indexCheckpointToken"></param>
/// <param name="hybridLogCheckpointToken"></param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1)
/// <param name="undoFutureVersions">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true)
{
InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload);
InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoFutureVersions);
}

/// <summary>
Expand Down
9 changes: 6 additions & 3 deletions cs/src/core/Index/Interfaces/IFasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,25 @@ ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSession<Input
/// Recover from last successful index and log checkpoint
/// </summary>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
void Recover(int numPagesToPreload = -1);
/// <param name="undoFutureVersions">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
void Recover(int numPagesToPreload = -1, bool undoFutureVersions = true);

/// <summary>
/// Recover using full checkpoint token
/// </summary>
/// <param name="fullcheckpointToken"></param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
void Recover(Guid fullcheckpointToken, int numPagesToPreload = -1);
/// <param name="undoFutureVersions">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
void Recover(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true);

/// <summary>
/// Recover using a separate index and log checkpoint token
/// </summary>
/// <param name="indexToken"></param>
/// <param name="hybridLogToken"></param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
void Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1);
/// <param name="undoFutureVersions">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
void Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoFutureVersions = true);

/// <summary>
/// Complete ongoing checkpoint (spin-wait)
Expand Down
34 changes: 20 additions & 14 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public RecoveryStatus(int capacity,
public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
{

private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload)
private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload, bool undoFutureVersions)
{
Debug.WriteLine("********* Primary Recovery Information ********");

Expand Down Expand Up @@ -110,7 +110,7 @@ private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload)
Debug.WriteLine("No index checkpoint found, recovering from beginning of log");
}

InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload);
InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions);
}

private bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryInfo recoveryInfo)
Expand All @@ -120,7 +120,7 @@ private bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryIn
return l1 <= l2;
}

private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload)
private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoFutureVersions)
{
Debug.WriteLine("********* Primary Recovery Information ********");
Debug.WriteLine("Index Checkpoint: {0}", indexToken);
Expand Down Expand Up @@ -157,10 +157,10 @@ private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesT
}
}

InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload);
InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions);
}

private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload)
private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoFutureVersions)
{
// Ensure active state machine to null
currentSyncStateMachine = null;
Expand All @@ -186,6 +186,12 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
// Unless we recovered previously until some hlog address
if (hlog.FlushedUntilAddress > fromAddress)
fromAddress = hlog.FlushedUntilAddress;

// Start recovery at least from beginning of fuzzy log region
// Needed if we are recovering to the same checkpoint a second time, with undo
// set to true during the second time.
if (recoveredHLCInfo.info.startLogicalAddress < fromAddress)
fromAddress = recoveredHLCInfo.info.startLogicalAddress;
}
else
{
Expand Down Expand Up @@ -237,7 +243,7 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
// Make index consistent for version v
if (recoveredHLCInfo.info.useSnapshotFile == 0)
{
RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version);
RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, undoFutureVersions);
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, tailAddress);
}
else
Expand All @@ -246,17 +252,17 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
headAddress = recoveredHLCInfo.info.flushedLogicalAddress;

// First recover from index starting point (fromAddress) to snapshot starting point (flushedLogicalAddress)
RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.version);
RecoverHybridLog(scanFromAddress, fromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.version, undoFutureVersions);
// Then recover snapshot into mutable region
RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, recoveredHLCInfo.info.guid);
RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, recoveredHLCInfo.info.guid, undoFutureVersions);
hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, recoveredHLCInfo.info.flushedLogicalAddress);
}

// Recover session information
_recoveredSessions = recoveredHLCInfo.info.continueTokens;
}

private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int version)
private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int version, bool undoFutureVersions)
{
if (untilAddress < scanFromAddress)
return;
Expand Down Expand Up @@ -304,7 +310,7 @@ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon
}

var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress);
if (RecoverFromPage(recoverFromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, version))
if (RecoverFromPage(recoverFromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, version, undoFutureVersions))
{
// OS thread flushes current page and issues a read request if necessary
recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
Expand Down Expand Up @@ -342,7 +348,7 @@ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon
}
}

private void RecoverHybridLogFromSnapshotFile(long fromAddress, long untilAddress, int version, Guid guid)
private void RecoverHybridLogFromSnapshotFile(long fromAddress, long untilAddress, int version, Guid guid, bool undoFutureVersions)
{
// Compute startPage and endPage
var startPage = hlog.GetPage(fromAddress);
Expand Down Expand Up @@ -416,7 +422,7 @@ private void RecoverHybridLogFromSnapshotFile(long fromAddress, long untilAddres

var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress);
RecoverFromPage(fromAddress, pageFromAddress, pageUntilAddress,
startLogicalAddress, physicalAddress, version);
startLogicalAddress, physicalAddress, version, undoFutureVersions);

}

Expand Down Expand Up @@ -458,7 +464,7 @@ private bool RecoverFromPage(long startRecoveryAddress,
long untilLogicalAddressInPage,
long pageLogicalAddress,
long pagePhysicalAddress,
int version)
int version, bool undoFutureVersions)
{
bool touched = false;

Expand Down Expand Up @@ -490,7 +496,7 @@ private bool RecoverFromPage(long startRecoveryAddress,
entry = default;
FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);

if (info.Version <= version)
if (info.Version <= version || !undoFutureVersions)
{
entry.Address = pageLogicalAddress + pointer;
entry.Tag = tag;
Expand Down