Skip to content

Commit

Permalink
[C#] Misc warning and code cleanup (#560)
Browse files Browse the repository at this point in the history
* Adding comments to structures that were made public (Phase, SystemState, DeltaLogEntryType

* updates

* misc minor cleanup

Co-authored-by: TedHartMS <15467143+TedHartMS@users.noreply.github.com>
  • Loading branch information
badrishc and TedHartMS authored Sep 15, 2021
1 parent 7fc497b commit 8207000
Show file tree
Hide file tree
Showing 18 changed files with 232 additions and 137 deletions.
19 changes: 12 additions & 7 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ public FasterKV(long size, LogSettings logSettings,
Initialize(size, sectorSize);

systemState = default;
systemState.phase = Phase.REST;
systemState.version = 1;
systemState.Phase = Phase.REST;
systemState.Version = 1;
}

/// <summary>
Expand Down Expand Up @@ -404,6 +404,11 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp
/// <param name="checkpointType">Checkpoint type</param>
/// <param name="tryIncremental">For snapshot, try to store as incremental delta over last snapshot</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="targetVersion">
/// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
/// than current version. Actual new version may have version number greater than supplied number. If the supplied
/// number is -1, checkpoint will unconditionally create a new version.
/// </param>
/// <returns>
/// (bool success, Guid token)
/// success: Whether we successfully initiated the checkpoint (initiation may
Expand Down Expand Up @@ -508,8 +513,8 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default
while (true)
{
var systemState = this.systemState;
if (systemState.phase == Phase.REST || systemState.phase == Phase.PREPARE_GROW ||
systemState.phase == Phase.IN_PROGRESS_GROW)
if (systemState.Phase == Phase.REST || systemState.Phase == Phase.PREPARE_GROW ||
systemState.Phase == Phase.IN_PROGRESS_GROW)
return;

List<ValueTask> valueTasks = new();
Expand Down Expand Up @@ -718,15 +723,15 @@ public bool GrowIndex()
while (true)
{
SystemState _systemState = SystemState.Copy(ref systemState);
if (_systemState.phase == Phase.IN_PROGRESS_GROW)
if (_systemState.Phase == Phase.IN_PROGRESS_GROW)
{
SplitBuckets(0);
epoch.ProtectAndDrain();
}
else
{
SystemState.RemoveIntermediate(ref _systemState);
if (_systemState.phase != Phase.PREPARE_GROW && _systemState.phase != Phase.IN_PROGRESS_GROW)
if (_systemState.Phase != Phase.PREPARE_GROW && _systemState.Phase != Phase.IN_PROGRESS_GROW)
{
return true;
}
Expand All @@ -752,7 +757,7 @@ public void Dispose()
checkpointManager?.Dispose();
}

private void UpdateVarLen(ref VariableLengthStructSettings<Key, Value> variableLengthStructSettings)
private static void UpdateVarLen(ref VariableLengthStructSettings<Key, Value> variableLengthStructSettings)
{
if (typeof(Key) == typeof(SpanByte))
{
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ private void HeavyEnter<Input, Output, Context, FasterSession>(long hash, Faster
{
// We spin-wait as a simplification
// Could instead do a "heavy operation" here
while (systemState.phase != Phase.IN_PROGRESS_GROW)
while (systemState.Phase != Phase.IN_PROGRESS_GROW)
Thread.SpinWait(100);
InternalRefresh(ctx, session);
}
Expand Down Expand Up @@ -1954,7 +1954,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
var (actualSize, allocatedSize) = hlog.GetRecordSize(ref key, ref value);

long newLogicalAddress, newPhysicalAddress;
bool copyToReadCache = noReadCache ? false : UseReadCache;
bool copyToReadCache = !noReadCache && UseReadCache;
if (copyToReadCache)
{
BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/FASTERLegacy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public bool CompleteCheckpoint(bool spinWait = false)
do
{
CompletePending();
if (_fasterKV.systemState.phase == Phase.REST)
if (_fasterKV.systemState.Phase == Phase.REST)
{
CompletePending();
return true;
Expand All @@ -228,7 +228,7 @@ private Guid InternalAcquire()
{
_fasterKV.epoch.Resume();
_threadCtx.InitializeThread();
Phase phase = _fasterKV.systemState.phase;
Phase phase = _fasterKV.systemState.Phase;
if (phase != Phase.REST)
{
throw new FasterException("Can acquire only in REST phase!");
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal CommitPoint InternalContinue<Input, Output, Context>(string guid, out F
// We have recovered the corresponding session.
// Now obtain the session by first locking the rest phase
var currentState = SystemState.Copy(ref systemState);
if (currentState.phase == Phase.REST)
if (currentState.Phase == Phase.REST)
{
var intermediateState = SystemState.MakeIntermediate(currentState);
if (MakeTransition(currentState, intermediateState))
Expand Down Expand Up @@ -70,7 +70,7 @@ internal void InternalRefresh<Input, Output, Context, FasterSession>(FasterExecu

// We check if we are in normal mode
var newPhaseInfo = SystemState.Copy(ref systemState);
if (ctx.phase == Phase.REST && newPhaseInfo.phase == Phase.REST && ctx.version == newPhaseInfo.version)
if (ctx.phase == Phase.REST && newPhaseInfo.Phase == Phase.REST && ctx.version == newPhaseInfo.Version)
{
return;
}
Expand Down Expand Up @@ -177,7 +177,7 @@ internal bool InternalCompletePending<Input, Output, Context, FasterSession>(
}
}

internal bool InRestPhase() => systemState.phase == Phase.REST;
internal bool InRestPhase() => systemState.Phase == Phase.REST;

#region Complete Retry Requests
internal void InternalCompleteRetryRequests<Input, Output, Context, FasterSession>(
Expand Down
13 changes: 12 additions & 1 deletion cs/src/core/Index/Recovery/DeltaLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@

namespace FASTER.core
{
/// <summary>
/// The type of a record in the delta (incremental) log
/// </summary>
public enum DeltaLogEntryType : int
{
DELTA, CHECKPOINT_METADATA
/// <summary>
/// The entry is a delta record
/// </summary>
DELTA,

/// <summary>
/// The entry is checkpoint metadata
/// </summary>
CHECKPOINT_METADATA
}

[StructLayout(LayoutKind.Explicit, Size = DeltaLog.HeaderSize)]
Expand Down
1 change: 0 additions & 1 deletion cs/src/core/Index/Recovery/ICheckpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public interface ICheckpointManager : IDisposable
/// <param name="version"></param>
/// <param name="commitMetadata"></param>
/// <param name="deltaLog"></param>
/// <param name="cookie">any additional (user-specified) information to persist with the checkpoint</param>
void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog);

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/Recovery/LocalCheckpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken)
/// </summary>
/// <param name="logToken">Token</param>
/// <param name="deltaLog">Delta log</param>
/// <param name="scanDelta"> whether or not to scan through the delta log to acquire latest entry</param>
/// <param name="recoverTo"> version upper bound to scan for in the delta log. Function will return the largest version metadata no greater than the given version.</param>
/// <returns>Metadata, or null if invalid</returns>
public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
{
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ private bool RecoverToInitialPage(IndexCheckpointInfo recoveredICInfo, HybridLog
currentSyncStateMachine = null;

// Set new system state after recovery
systemState.phase = Phase.REST;
systemState.version = recoveredHLCInfo.info.version + 1;
systemState.Phase = Phase.REST;
systemState.Version = recoveredHLCInfo.info.version + 1;

if (!recoveredICInfo.IsDefault() && recoveryCountdown != null)
{
Expand Down
48 changes: 24 additions & 24 deletions cs/src/core/Index/Synchronization/FasterStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public partial class FasterKV<Key, Value>
/// <summary>
/// Current version number of the store
/// </summary>
public long CurrentVersion => systemState.version;
public long CurrentVersion => systemState.Version;

/// <summary>
/// Registers the given callback to be invoked for every state machine transition. Not safe to call with
Expand Down Expand Up @@ -76,9 +76,9 @@ private bool StartStateMachine(ISynchronizationStateMachine stateMachine)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool MakeTransition(SystemState expectedState, SystemState nextState)
{
if (Interlocked.CompareExchange(ref systemState.word, nextState.word, expectedState.word) !=
expectedState.word) return false;
Debug.WriteLine("Moved to {0}, {1}", nextState.phase, nextState.version);
if (Interlocked.CompareExchange(ref systemState.Word, nextState.Word, expectedState.Word) !=
expectedState.Word) return false;
Debug.WriteLine("Moved to {0}, {1}", nextState.Phase, nextState.Version);
return true;
}

Expand Down Expand Up @@ -109,26 +109,26 @@ internal void GlobalStateMachineStep(SystemState expectedState)
currentSyncStateMachine.GlobalAfterEnteringState(nextState, this);

// Mark the state machine done as we exit the state machine.
if (nextState.phase == Phase.REST) stateMachineActive = 0;
if (nextState.Phase == Phase.REST) stateMachineActive = 0;
}


// Given the current global state, return the starting point of the state machine cycle
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private SystemState StartOfCurrentCycle(SystemState currentGlobalState)
{
return currentGlobalState.phase < Phase.REST
? SystemState.Make(Phase.REST, currentGlobalState.version - 1)
: SystemState.Make(Phase.REST, currentGlobalState.version);
return currentGlobalState.Phase < Phase.REST
? SystemState.Make(Phase.REST, currentGlobalState.Version - 1)
: SystemState.Make(Phase.REST, currentGlobalState.Version);
}

// Given the current thread state and global state, fast forward the thread state to the
// current state machine cycle if needed
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private SystemState FastForwardToCurrentCycle(SystemState threadState, SystemState targetStartState)
{
if (threadState.version < targetStartState.version ||
threadState.version == targetStartState.version && threadState.phase < targetStartState.phase)
if (threadState.Version < targetStartState.Version ||
threadState.Version == targetStartState.Version && threadState.Phase < targetStartState.Phase)
{
return targetStartState;
}
Expand All @@ -148,7 +148,7 @@ internal bool SameCycle<Input, Output, Context>(FasterExecutionContext<Input, Ou
{
var _systemState = SystemState.Copy(ref systemState);
SystemState.RemoveIntermediate(ref _systemState);
return StartOfCurrentCycle(threadState).version == StartOfCurrentCycle(_systemState).version;
return StartOfCurrentCycle(threadState).Version == StartOfCurrentCycle(_systemState).Version;
}
return ctx.threadStateMachine == currentSyncStateMachine;
}
Expand Down Expand Up @@ -189,7 +189,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
#region Get returning thread to start of current cycle, issuing completion callbacks if needed
if (ctx != null)
{
if (ctx.version < targetStartState.version)
if (ctx.version < targetStartState.Version)
{
// Issue CPR callback for full session
if (ctx.serialNum != -1)
Expand All @@ -214,7 +214,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
fasterSession?.CheckpointCompletionCallback(ctx.guid, commitPoint);
}
}
if ((ctx.version == targetStartState.version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine))
if ((ctx.version == targetStartState.Version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine))
{
IssueCompletionCallback(ctx, fasterSession);
}
Expand All @@ -223,12 +223,12 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(

// No state machine associated with target, or target is in REST phase:
// we can directly fast forward session to target state
if (currentTask == null || targetState.phase == Phase.REST)
if (currentTask == null || targetState.Phase == Phase.REST)
{
if (ctx != null)
{
ctx.phase = targetState.phase;
ctx.version = targetState.version;
ctx.phase = targetState.Phase;
ctx.version = targetState.Version;
ctx.threadStateMachine = currentTask;
}
return;
Expand Down Expand Up @@ -257,22 +257,22 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
do
{
Debug.Assert(
(threadState.version < targetState.version) ||
(threadState.version == targetState.version &&
(threadState.phase <= targetState.phase || currentTask is IndexSnapshotStateMachine)
(threadState.Version < targetState.Version) ||
(threadState.Version == targetState.Version &&
(threadState.Phase <= targetState.Phase || currentTask is IndexSnapshotStateMachine)
));

currentTask.OnThreadEnteringState(threadState, previousState, this, ctx, fasterSession, valueTasks, token);

if (ctx != null)
{
ctx.phase = threadState.phase;
ctx.version = threadState.version;
ctx.phase = threadState.Phase;
ctx.version = threadState.Version;
}

previousState.word = threadState.word;
previousState.Word = threadState.Word;
threadState = currentTask.NextState(threadState);
if (systemState.word != targetState.word)
if (systemState.Word != targetState.Word)
{
var tmp = SystemState.Copy(ref systemState);
if (currentSyncStateMachine == currentTask)
Expand All @@ -281,7 +281,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
SystemState.RemoveIntermediate(ref targetState);
}
}
} while (previousState.word != targetState.word);
} while (previousState.Word != targetState.Word);
#endregion

return;
Expand Down
14 changes: 7 additions & 7 deletions cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
SystemState next,
FasterKV<Key, Value> faster)
{
switch (next.phase)
switch (next.Phase)
{
case Phase.PREP_INDEX_CHECKPOINT:
Debug.Assert(faster._indexCheckpoint.IsDefault() &&
Expand All @@ -25,7 +25,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
faster._indexCheckpointToken = fullCheckpointToken;
faster._hybridLogCheckpointToken = fullCheckpointToken;
faster.InitializeIndexCheckpoint(faster._indexCheckpointToken);
faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version);
faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.Version);
break;
case Phase.WAIT_FLUSH:
faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress();
Expand Down Expand Up @@ -79,19 +79,19 @@ public FullCheckpointStateMachine(ISynchronizationTask checkpointBackend, long t
public override SystemState NextState(SystemState start)
{
var result = SystemState.Copy(ref start);
switch (start.phase)
switch (start.Phase)
{
case Phase.REST:
result.phase = Phase.PREP_INDEX_CHECKPOINT;
result.Phase = Phase.PREP_INDEX_CHECKPOINT;
break;
case Phase.PREP_INDEX_CHECKPOINT:
result.phase = Phase.PREPARE;
result.Phase = Phase.PREPARE;
break;
case Phase.WAIT_PENDING:
result.phase = Phase.WAIT_INDEX_CHECKPOINT;
result.Phase = Phase.WAIT_INDEX_CHECKPOINT;
break;
case Phase.WAIT_INDEX_CHECKPOINT:
result.phase = Phase.WAIT_FLUSH;
result.Phase = Phase.WAIT_FLUSH;
break;
default:
result = base.NextState(start);
Expand Down
Loading

0 comments on commit 8207000

Please sign in to comment.