Skip to content

Commit

Permalink
[C#] Recover returns version we recovered to (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored Feb 11, 2022
1 parent f2c025d commit 9975276
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 25 deletions.
25 changes: 15 additions & 10 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,11 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp
/// <param name="numPagesToPreload">Number of pages to preload into memory (beyond what needs to be read for recovery)</param>
/// <param name="undoNextVersion">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="recoverTo"> specific version requested or -1 for latest version. FASTER will recover to the largest version number checkpointed that's smaller than the required version. </param>

public void Recover(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1)
/// <returns>Version we actually recovered to</returns>
public long Recover(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1)
{
FindRecoveryInfo(recoverTo, out var recoveredHlcInfo, out var recoveredIcInfo);
InternalRecover(recoveredIcInfo, recoveredHlcInfo, numPagesToPreload, undoNextVersion, recoverTo);
return InternalRecover(recoveredIcInfo, recoveredHlcInfo, numPagesToPreload, undoNextVersion, recoverTo);
}

/// <summary>
Expand All @@ -415,7 +415,8 @@ public void Recover(int numPagesToPreload = -1, bool undoNextVersion = true, lon
/// <param name="undoNextVersion">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="recoverTo"> specific version requested or -1 for latest version. FASTER will recover to the largest version number checkpointed that's smaller than the required version.</param>
/// <param name="cancellationToken">Cancellation token</param>
public ValueTask RecoverAsync(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1,
/// <returns>Version we actually recovered to</returns>
public ValueTask<long> RecoverAsync(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1,
CancellationToken cancellationToken = default)
{
FindRecoveryInfo(recoverTo, out var recoveredHlcInfo, out var recoveredIcInfo);
Expand All @@ -428,9 +429,10 @@ public ValueTask RecoverAsync(int numPagesToPreload = -1, bool undoNextVersion =
/// <param name="fullCheckpointToken">Token</param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true)
/// <returns>Version we actually recovered to</returns>
public long Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true)
{
InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoNextVersion, -1);
return InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoNextVersion, -1);
}

/// <summary>
Expand All @@ -440,7 +442,8 @@ public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool u
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="cancellationToken">Cancellation token</param>
public ValueTask RecoverAsync(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default)
/// <returns>Version we actually recovered to</returns>
public ValueTask<long> RecoverAsync(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default)
=> InternalRecoverAsync(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoNextVersion, -1, cancellationToken);

/// <summary>
Expand All @@ -450,9 +453,10 @@ public ValueTask RecoverAsync(Guid fullCheckpointToken, int numPagesToPreload =
/// <param name="hybridLogCheckpointToken"></param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true)
/// <returns>Version we actually recovered to</returns>
public long Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true)
{
InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoNextVersion, -1);
return InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoNextVersion, -1);
}

/// <summary>
Expand Down Expand Up @@ -502,7 +506,8 @@ public void DisposeRecoverableSessions()
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="cancellationToken">Cancellation token</param>
public ValueTask RecoverAsync(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default)
/// <returns>Version we actually recovered to</returns>
public ValueTask<long> RecoverAsync(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default)
=> InternalRecoverAsync(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoNextVersion, -1, cancellationToken);

/// <summary>
Expand Down
19 changes: 12 additions & 7 deletions cs/src/core/Index/Interfaces/IFasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ public ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value,
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with version after checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="recoverTo"> specific version requested within the checkpoint, if checkpoint supports multiple versions (e.g. incremental snapshot checkpoints), or -1 for latest version</param>

void Recover(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1);
/// <returns>Version we actually recovered to</returns>
long Recover(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1);

/// <summary>
/// Asynchronously recover from last successful index and log checkpoint
Expand All @@ -154,15 +154,17 @@ public ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value,
/// <param name="undoNextVersion">Whether records with version after checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="recoverTo"> specific version requested within the checkpoint, if checkpoint supports multiple versions (e.g. incremental snapshot checkpoints), or -1 for latest version</param>
/// <param name="cancellationToken">Cancellation token</param>
ValueTask RecoverAsync(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1, CancellationToken cancellationToken = default);
/// <returns>Version we actually recovered to</returns>
ValueTask<long> RecoverAsync(int numPagesToPreload = -1, bool undoNextVersion = true, long recoverTo = -1, CancellationToken cancellationToken = default);

/// <summary>
/// Recover using full checkpoint token
/// </summary>
/// <param name="fullcheckpointToken"></param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with version after checkpoint version need to be undone (and invalidated on log)</param>
void Recover(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true);
/// <returns>Version we actually recovered to</returns>
long Recover(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true);

/// <summary>
/// Asynchronously recover using full checkpoint token
Expand All @@ -171,7 +173,8 @@ public ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value,
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with version after checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="cancellationToken">Cancellation token</param>
ValueTask RecoverAsync(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default);
/// <returns>Version we actually recovered to</returns>
ValueTask<long> RecoverAsync(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default);

/// <summary>
/// Recover using a separate index and log checkpoint token
Expand All @@ -180,7 +183,8 @@ public ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value,
/// <param name="hybridLogToken"></param>
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with version after checkpoint version need to be undone (and invalidated on log)</param>
void Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoNextVersion = true);
/// <returns>Version we actually recovered to</returns>
long Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoNextVersion = true);

/// <summary>
/// Asynchronously recover using a separate index and log checkpoint token
Expand All @@ -190,7 +194,8 @@ public ClientSession<Key, Value, Input, Output, Context, IFunctions<Key, Value,
/// <param name="numPagesToPreload">Number of pages to preload into memory after recovery</param>
/// <param name="undoNextVersion">Whether records with version after checkpoint version need to be undone (and invalidated on log)</param>
/// <param name="cancellationToken">Cancellation token</param>
ValueTask RecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default);
/// <returns>Version we actually recovered to</returns>
ValueTask<long> RecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default);

/// <summary>
/// Complete ongoing checkpoint (spin-wait)
Expand Down
16 changes: 9 additions & 7 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,17 @@ private static bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRec
return l1 <= l2;
}

private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoNextVersion, long recoverTo)
private long InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoNextVersion, long recoverTo)
{
GetRecoveryInfo(indexToken, hybridLogToken, out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo);
if (recoverTo != -1 && recoveredHLCInfo.deltaLog == null)
{
throw new FasterException("Recovering to a specific version within a token is only supported for incremental snapshots");
}
InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoNextVersion, recoverTo);
return InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoNextVersion, recoverTo);
}

private ValueTask InternalRecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoNextVersion, long recoverTo, CancellationToken cancellationToken)
private ValueTask<long> InternalRecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoNextVersion, long recoverTo, CancellationToken cancellationToken)
{
GetRecoveryInfo(indexToken, hybridLogToken, out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo);
return InternalRecoverAsync(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoNextVersion, recoverTo, cancellationToken);
Expand Down Expand Up @@ -299,13 +299,13 @@ private void GetRecoveryInfo(Guid indexToken, Guid hybridLogToken, out HybridLog
}
}

private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoNextVersion, long recoverTo)
private long InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoNextVersion, long recoverTo)
{
if (!RecoverToInitialPage(recoveredICInfo, recoveredHLCInfo, out long recoverFromAddress))
RecoverFuzzyIndex(recoveredICInfo);

if (!SetRecoveryPageRanges(recoveredHLCInfo, numPagesToPreload, recoverFromAddress, out long tailAddress, out long headAddress, out long scanFromAddress))
return;
return -1;
RecoveryOptions options = new(recoveredHLCInfo.info.manualLockingActive, headAddress, tailAddress, undoNextVersion);

long readOnlyAddress;
Expand All @@ -330,15 +330,16 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
}

DoPostRecovery(recoveredICInfo, recoveredHLCInfo, tailAddress, ref headAddress, ref readOnlyAddress);
return recoveredHLCInfo.info.version;
}

private async ValueTask InternalRecoverAsync(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoNextVersion, long recoverTo, CancellationToken cancellationToken)
private async ValueTask<long> InternalRecoverAsync(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoNextVersion, long recoverTo, CancellationToken cancellationToken)
{
if (!RecoverToInitialPage(recoveredICInfo, recoveredHLCInfo, out long recoverFromAddress))
await RecoverFuzzyIndexAsync(recoveredICInfo, cancellationToken).ConfigureAwait(false);

if (!SetRecoveryPageRanges(recoveredHLCInfo, numPagesToPreload, recoverFromAddress, out long tailAddress, out long headAddress, out long scanFromAddress))
return;
return -1;
RecoveryOptions options = new(recoveredHLCInfo.info.manualLockingActive, headAddress, tailAddress, undoNextVersion);

long readOnlyAddress;
Expand All @@ -365,6 +366,7 @@ await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogical
}

DoPostRecovery(recoveredICInfo, recoveredHLCInfo, tailAddress, ref headAddress, ref readOnlyAddress);
return recoveredHLCInfo.info.version;
}

private void DoPostRecovery(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, long tailAddress, ref long headAddress, ref long readOnlyAddress)
Expand Down
2 changes: 1 addition & 1 deletion cs/test/FasterLogTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void Consume(ReadOnlySpan<byte> result, long currentAddress, long nextAdd

[Test]
[Category("FasterLog")]
public async ValueTask FasterLogConsumerTest([Values] LogChecksumType logChecksum)
public void FasterLogConsumerTest([Values] LogChecksumType logChecksum)
{
device = Devices.CreateLogDevice(path + "fasterlog.log", deleteOnClose: true);
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = logChecksum, LogCommitManager = manager, TryRecoverLatest = false };
Expand Down

0 comments on commit 9975276

Please sign in to comment.