Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow checkpoint type to be chosen per instantiation #285

Merged
merged 4 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,6 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager)
Initialize(s);
}

/// <summary>
/// Reset
/// </summary>
public void Reset()
{
Initialize(default, -1);
}

/// <summary>
/// Write info to byte array
/// </summary>
Expand Down Expand Up @@ -374,29 +366,30 @@ internal struct HybridLogCheckpointInfo
public IDevice snapshotFileDevice;
public IDevice snapshotFileObjectLogDevice;
public SemaphoreSlim flushedSemaphore;
public long started;

public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager)
{
info.Initialize(token, _version);
started = 0;
checkpointManager.InitializeLogCheckpoint(token);
}

public void Recover(Guid token, ICheckpointManager checkpointManager)
{
info.Recover(token, checkpointManager);
started = 0;
}

public void Reset()
{
started = 0;
flushedSemaphore = null;
info.Reset();
info = default;
if (snapshotFileDevice != null) snapshotFileDevice.Close();
if (snapshotFileObjectLogDevice != null) snapshotFileObjectLogDevice.Close();
}

public bool IsDefault()
{
return info.guid == default;
}
}

internal struct IndexRecoveryInfo
Expand Down Expand Up @@ -514,8 +507,13 @@ public void Recover(Guid token, ICheckpointManager checkpointManager)

public void Reset()
{
info.Reset();
info = default;
main_ht_device.Close();
}

public bool IsDefault()
{
return info.token == default;
}
}
}
48 changes: 48 additions & 0 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,32 @@ public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1)
return result;
}

/// <summary>
/// Initiate full checkpoint
/// </summary>
/// <param name="token">Checkpoint token</param>
/// <param name="checkpointType">Checkpoint type</param>
/// <param name="targetVersion">upper limit (inclusive) of the version included</param>
/// <returns>
/// Whether we successfully initiated the checkpoint (initiation may
/// fail if we are already taking a checkpoint or performing some other
/// operation such as growing the index).
/// </returns>
public bool TakeFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1)
{
ISynchronizationTask backend;
if (checkpointType == CheckpointType.FoldOver)
backend = new FoldOverCheckpointTask();
else if (checkpointType == CheckpointType.Snapshot)
backend = new SnapshotCheckpointTask();
else
throw new FasterException("Unsupported full checkpoint type");

var result = StartStateMachine(new FullCheckpointStateMachine(backend, targetVersion));
token = _hybridLogCheckpointToken;
return result;
}

/// <summary>
/// Initiate index checkpoint
/// </summary>
Expand Down Expand Up @@ -235,6 +261,28 @@ public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1)
return result;
}

/// <summary>
/// Take incremental hybrid log checkpoint
/// </summary>
/// <param name="token">Checkpoint token</param>
/// <param name="checkpointType">Checkpoint type</param>
/// <param name="targetVersion">upper limit (inclusive) of the version included</param>
/// <returns>Whether we could initiate the checkpoint</returns>
public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1)
{
ISynchronizationTask backend;
if (checkpointType == CheckpointType.FoldOver)
backend = new FoldOverCheckpointTask();
else if (checkpointType == CheckpointType.Snapshot)
backend = new SnapshotCheckpointTask();
else
throw new FasterException("Unsupported checkpoint type");

var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion));
token = _hybridLogCheckpointToken;
return result;
}

/// <summary>
/// Recover from the latest checkpoints
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions cs/src/core/Index/FASTER/FASTERLegacy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,18 @@ public ClientSession<Key, Value, Input, Output, Context, Functions> ResumeSessio
/// <inheritdoc />
public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1) => _fasterKV.TakeFullCheckpoint(out token, targetVersion);

/// <inheritdoc />
public bool TakeFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) => _fasterKV.TakeFullCheckpoint(out token, checkpointType, targetVersion);

/// <inheritdoc />
public bool TakeIndexCheckpoint(out Guid token) => _fasterKV.TakeIndexCheckpoint(out token);

/// <inheritdoc />
public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1) => _fasterKV.TakeHybridLogCheckpoint(out token, targetVersion);

/// <inheritdoc />
public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) => _fasterKV.TakeHybridLogCheckpoint(out token, checkpointType, targetVersion);

/// <inheritdoc />
public void Recover() => _fasterKV.Recover();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public void GlobalBeforeEnteringState<Key, Value>(
switch (next.phase)
{
case Phase.PREP_INDEX_CHECKPOINT:
Debug.Assert(faster._indexCheckpointToken == default &&
faster._hybridLogCheckpointToken == default);
Debug.Assert(faster._indexCheckpoint.IsDefault() &&
faster._hybridLogCheckpoint.IsDefault());
var fullCheckpointToken = Guid.NewGuid();
faster._indexCheckpointToken = fullCheckpointToken;
faster._hybridLogCheckpointToken = fullCheckpointToken;
Expand All @@ -39,7 +39,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
break;
case Phase.PERSISTENCE_CALLBACK:
faster.WriteIndexMetaInfo();
faster._indexCheckpointToken = default;
faster._indexCheckpoint.Reset();
break;
}
}
Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public virtual void GlobalBeforeEnteringState<Key, Value>(SystemState next,
switch (next.phase)
{
case Phase.PREPARE:
if (faster._hybridLogCheckpointToken == default)
if (faster._hybridLogCheckpoint.IsDefault())
{
faster._hybridLogCheckpointToken = Guid.NewGuid();
faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version);
Expand Down Expand Up @@ -52,7 +52,6 @@ public virtual void GlobalBeforeEnteringState<Key, Value>(SystemState next,
faster.WriteHybridLogMetaInfo();
break;
case Phase.REST:
faster._hybridLogCheckpointToken = default;
faster._hybridLogCheckpoint.Reset();
var nextTcs = new TaskCompletionSource<LinkedCheckpointInfo>(TaskCreationOptions.RunContinuationsAsynchronously);
faster.checkpointTcs.SetResult(new LinkedCheckpointInfo { NextTask = nextTcs.Task });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
switch (next.phase)
{
case Phase.PREP_INDEX_CHECKPOINT:
if (faster._indexCheckpointToken == default)
if (faster._indexCheckpoint.IsDefault())
{
faster._indexCheckpointToken = Guid.NewGuid();
faster.InitializeIndexCheckpoint(faster._indexCheckpointToken);
Expand All @@ -40,12 +40,11 @@ public void GlobalBeforeEnteringState<Key, Value>(
// the tail address.
if (faster.ObtainCurrentTailAddress(ref faster._indexCheckpoint.info.finalLogicalAddress))
faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress();
if (faster._indexCheckpointToken != default)
if (!faster._indexCheckpoint.IsDefault())
{
faster.WriteIndexMetaInfo();
faster._indexCheckpointToken = default;
faster._indexCheckpoint.Reset();
}
faster._indexCheckpoint.Reset();

break;
}
Expand Down