From cf0f199ddb1697dea2bd0de69a77e921d2ef2f90 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Sun, 12 Jul 2020 19:34:11 -0700 Subject: [PATCH] Allow checkpoint type to be chosen per instantiation (#285) * Allow checkpoint type to be chosen per instantiation * Add custom checkpoint overloads to legacy API as well. --- cs/src/core/Index/Common/Contexts.cs | 26 +++++----- cs/src/core/Index/FASTER/FASTER.cs | 48 +++++++++++++++++++ cs/src/core/Index/FASTER/FASTERLegacy.cs | 6 +++ .../FullCheckpointStateMachine.cs | 6 +-- .../HybridLogCheckpointTask.cs | 3 +- .../IndexSnapshotStateMachine.cs | 7 ++- 6 files changed, 73 insertions(+), 23 deletions(-) diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index f2103a0a6..3c90c63b7 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -293,14 +293,6 @@ internal void Recover(Guid token, ICheckpointManager checkpointManager) Initialize(s); } - /// - /// Reset - /// - public void Reset() - { - Initialize(default, -1); - } - /// /// Write info to byte array /// @@ -374,29 +366,30 @@ internal struct HybridLogCheckpointInfo public IDevice snapshotFileDevice; public IDevice snapshotFileObjectLogDevice; public SemaphoreSlim flushedSemaphore; - public long started; public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager) { info.Initialize(token, _version); - started = 0; checkpointManager.InitializeLogCheckpoint(token); } public void Recover(Guid token, ICheckpointManager checkpointManager) { info.Recover(token, checkpointManager); - started = 0; } public void Reset() { - started = 0; flushedSemaphore = null; - info.Reset(); + info = default; if (snapshotFileDevice != null) snapshotFileDevice.Close(); if (snapshotFileObjectLogDevice != null) snapshotFileObjectLogDevice.Close(); } + + public bool IsDefault() + { + return info.guid == default; + } } internal struct IndexRecoveryInfo @@ -514,8 +507,13 @@ public void Recover(Guid token, ICheckpointManager checkpointManager) public void Reset() { - info.Reset(); + info = default; main_ht_device.Close(); } + + public bool IsDefault() + { + return info.token == default; + } } } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index 271c20b7d..0fdd122f7 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -204,6 +204,32 @@ public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1) return result; } + /// + /// Initiate full checkpoint + /// + /// Checkpoint token + /// Checkpoint type + /// upper limit (inclusive) of the version included + /// + /// Whether we successfully initiated the checkpoint (initiation may + /// fail if we are already taking a checkpoint or performing some other + /// operation such as growing the index). + /// + public bool TakeFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) + { + ISynchronizationTask backend; + if (checkpointType == CheckpointType.FoldOver) + backend = new FoldOverCheckpointTask(); + else if (checkpointType == CheckpointType.Snapshot) + backend = new SnapshotCheckpointTask(); + else + throw new FasterException("Unsupported full checkpoint type"); + + var result = StartStateMachine(new FullCheckpointStateMachine(backend, targetVersion)); + token = _hybridLogCheckpointToken; + return result; + } + /// /// Initiate index checkpoint /// @@ -235,6 +261,28 @@ public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1) return result; } + /// + /// Take incremental hybrid log checkpoint + /// + /// Checkpoint token + /// Checkpoint type + /// upper limit (inclusive) of the version included + /// Whether we could initiate the checkpoint + public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) + { + ISynchronizationTask backend; + if (checkpointType == CheckpointType.FoldOver) + backend = new FoldOverCheckpointTask(); + else if (checkpointType == CheckpointType.Snapshot) + backend = new SnapshotCheckpointTask(); + else + throw new FasterException("Unsupported checkpoint type"); + + var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion)); + token = _hybridLogCheckpointToken; + return result; + } + /// /// Recover from the latest checkpoints /// diff --git a/cs/src/core/Index/FASTER/FASTERLegacy.cs b/cs/src/core/Index/FASTER/FASTERLegacy.cs index ceec88e5f..819f1a456 100644 --- a/cs/src/core/Index/FASTER/FASTERLegacy.cs +++ b/cs/src/core/Index/FASTER/FASTERLegacy.cs @@ -271,12 +271,18 @@ public ClientSession ResumeSessio /// public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1) => _fasterKV.TakeFullCheckpoint(out token, targetVersion); + /// + public bool TakeFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) => _fasterKV.TakeFullCheckpoint(out token, checkpointType, targetVersion); + /// public bool TakeIndexCheckpoint(out Guid token) => _fasterKV.TakeIndexCheckpoint(out token); /// public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1) => _fasterKV.TakeHybridLogCheckpoint(out token, targetVersion); + /// + public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1) => _fasterKV.TakeHybridLogCheckpoint(out token, checkpointType, targetVersion); + /// public void Recover() => _fasterKV.Recover(); diff --git a/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs index 00d10473b..5620a80db 100644 --- a/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs +++ b/cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs @@ -20,8 +20,8 @@ public void GlobalBeforeEnteringState( switch (next.phase) { case Phase.PREP_INDEX_CHECKPOINT: - Debug.Assert(faster._indexCheckpointToken == default && - faster._hybridLogCheckpointToken == default); + Debug.Assert(faster._indexCheckpoint.IsDefault() && + faster._hybridLogCheckpoint.IsDefault()); var fullCheckpointToken = Guid.NewGuid(); faster._indexCheckpointToken = fullCheckpointToken; faster._hybridLogCheckpointToken = fullCheckpointToken; @@ -39,7 +39,7 @@ public void GlobalBeforeEnteringState( break; case Phase.PERSISTENCE_CALLBACK: faster.WriteIndexMetaInfo(); - faster._indexCheckpointToken = default; + faster._indexCheckpoint.Reset(); break; } } diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs index 6ccc8695f..00583fb1b 100644 --- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -20,7 +20,7 @@ public virtual void GlobalBeforeEnteringState(SystemState next, switch (next.phase) { case Phase.PREPARE: - if (faster._hybridLogCheckpointToken == default) + if (faster._hybridLogCheckpoint.IsDefault()) { faster._hybridLogCheckpointToken = Guid.NewGuid(); faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version); @@ -52,7 +52,6 @@ public virtual void GlobalBeforeEnteringState(SystemState next, faster.WriteHybridLogMetaInfo(); break; case Phase.REST: - faster._hybridLogCheckpointToken = default; faster._hybridLogCheckpoint.Reset(); var nextTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); faster.checkpointTcs.SetResult(new LinkedCheckpointInfo { NextTask = nextTcs.Task }); diff --git a/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs b/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs index 2f71446a9..076f3a419 100644 --- a/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs +++ b/cs/src/core/Index/Synchronization/IndexSnapshotStateMachine.cs @@ -20,7 +20,7 @@ public void GlobalBeforeEnteringState( switch (next.phase) { case Phase.PREP_INDEX_CHECKPOINT: - if (faster._indexCheckpointToken == default) + if (faster._indexCheckpoint.IsDefault()) { faster._indexCheckpointToken = Guid.NewGuid(); faster.InitializeIndexCheckpoint(faster._indexCheckpointToken); @@ -40,12 +40,11 @@ public void GlobalBeforeEnteringState( // the tail address. if (faster.ObtainCurrentTailAddress(ref faster._indexCheckpoint.info.finalLogicalAddress)) faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress(); - if (faster._indexCheckpointToken != default) + if (!faster._indexCheckpoint.IsDefault()) { faster.WriteIndexMetaInfo(); - faster._indexCheckpointToken = default; + faster._indexCheckpoint.Reset(); } - faster._indexCheckpoint.Reset(); break; }