Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Misc warning and code cleanup #560

Merged
merged 4 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ public FasterKV(long size, LogSettings logSettings,
Initialize(size, sectorSize);

systemState = default;
systemState.phase = Phase.REST;
systemState.version = 1;
systemState.Phase = Phase.REST;
systemState.Version = 1;
}

/// <summary>
Expand Down Expand Up @@ -404,6 +404,11 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp
/// <param name="checkpointType">Checkpoint type</param>
/// <param name="tryIncremental">For snapshot, try to store as incremental delta over last snapshot</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="targetVersion">
/// intended version number of the next version. Checkpoint will not execute if supplied version is not larger
/// than current version. Actual new version may have version number greater than supplied number. If the supplied
/// number is -1, checkpoint will unconditionally create a new version.
/// </param>
/// <returns>
/// (bool success, Guid token)
/// success: Whether we successfully initiated the checkpoint (initiation may
Expand Down Expand Up @@ -508,8 +513,8 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default
while (true)
{
var systemState = this.systemState;
if (systemState.phase == Phase.REST || systemState.phase == Phase.PREPARE_GROW ||
systemState.phase == Phase.IN_PROGRESS_GROW)
if (systemState.Phase == Phase.REST || systemState.Phase == Phase.PREPARE_GROW ||
systemState.Phase == Phase.IN_PROGRESS_GROW)
return;

List<ValueTask> valueTasks = new();
Expand Down Expand Up @@ -718,15 +723,15 @@ public bool GrowIndex()
while (true)
{
SystemState _systemState = SystemState.Copy(ref systemState);
if (_systemState.phase == Phase.IN_PROGRESS_GROW)
if (_systemState.Phase == Phase.IN_PROGRESS_GROW)
{
SplitBuckets(0);
epoch.ProtectAndDrain();
}
else
{
SystemState.RemoveIntermediate(ref _systemState);
if (_systemState.phase != Phase.PREPARE_GROW && _systemState.phase != Phase.IN_PROGRESS_GROW)
if (_systemState.Phase != Phase.PREPARE_GROW && _systemState.Phase != Phase.IN_PROGRESS_GROW)
{
return true;
}
Expand All @@ -752,7 +757,7 @@ public void Dispose()
checkpointManager?.Dispose();
}

private void UpdateVarLen(ref VariableLengthStructSettings<Key, Value> variableLengthStructSettings)
private static void UpdateVarLen(ref VariableLengthStructSettings<Key, Value> variableLengthStructSettings)
{
if (typeof(Key) == typeof(SpanByte))
{
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ private void HeavyEnter<Input, Output, Context, FasterSession>(long hash, Faster
{
// We spin-wait as a simplification
// Could instead do a "heavy operation" here
while (systemState.phase != Phase.IN_PROGRESS_GROW)
while (systemState.Phase != Phase.IN_PROGRESS_GROW)
Thread.SpinWait(100);
InternalRefresh(ctx, session);
}
Expand Down Expand Up @@ -1954,7 +1954,7 @@ internal OperationStatus InternalTryCopyToTail<Input, Output, Context, FasterSes
var (actualSize, allocatedSize) = hlog.GetRecordSize(ref key, ref value);

long newLogicalAddress, newPhysicalAddress;
bool copyToReadCache = noReadCache ? false : UseReadCache;
bool copyToReadCache = !noReadCache && UseReadCache;
if (copyToReadCache)
{
BlockAllocateReadCache(allocatedSize, out newLogicalAddress, currentCtx, fasterSession);
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/FASTER/FASTERLegacy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public bool CompleteCheckpoint(bool spinWait = false)
do
{
CompletePending();
if (_fasterKV.systemState.phase == Phase.REST)
if (_fasterKV.systemState.Phase == Phase.REST)
{
CompletePending();
return true;
Expand All @@ -228,7 +228,7 @@ private Guid InternalAcquire()
{
_fasterKV.epoch.Resume();
_threadCtx.InitializeThread();
Phase phase = _fasterKV.systemState.phase;
Phase phase = _fasterKV.systemState.Phase;
if (phase != Phase.REST)
{
throw new FasterException("Can acquire only in REST phase!");
Expand Down
6 changes: 3 additions & 3 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal CommitPoint InternalContinue<Input, Output, Context>(string guid, out F
// We have recovered the corresponding session.
// Now obtain the session by first locking the rest phase
var currentState = SystemState.Copy(ref systemState);
if (currentState.phase == Phase.REST)
if (currentState.Phase == Phase.REST)
{
var intermediateState = SystemState.MakeIntermediate(currentState);
if (MakeTransition(currentState, intermediateState))
Expand Down Expand Up @@ -70,7 +70,7 @@ internal void InternalRefresh<Input, Output, Context, FasterSession>(FasterExecu

// We check if we are in normal mode
var newPhaseInfo = SystemState.Copy(ref systemState);
if (ctx.phase == Phase.REST && newPhaseInfo.phase == Phase.REST && ctx.version == newPhaseInfo.version)
if (ctx.phase == Phase.REST && newPhaseInfo.Phase == Phase.REST && ctx.version == newPhaseInfo.Version)
{
return;
}
Expand Down Expand Up @@ -177,7 +177,7 @@ internal bool InternalCompletePending<Input, Output, Context, FasterSession>(
}
}

internal bool InRestPhase() => systemState.phase == Phase.REST;
internal bool InRestPhase() => systemState.Phase == Phase.REST;

#region Complete Retry Requests
internal void InternalCompleteRetryRequests<Input, Output, Context, FasterSession>(
Expand Down
13 changes: 12 additions & 1 deletion cs/src/core/Index/Recovery/DeltaLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@

namespace FASTER.core
{
/// <summary>
/// The type of a record in the delta (incremental) log
/// </summary>
public enum DeltaLogEntryType : int
{
DELTA, CHECKPOINT_METADATA
/// <summary>
/// The entry is a delta record
/// </summary>
DELTA,

/// <summary>
/// The entry is checkpoint metadata
/// </summary>
CHECKPOINT_METADATA
}

[StructLayout(LayoutKind.Explicit, Size = DeltaLog.HeaderSize)]
Expand Down
1 change: 0 additions & 1 deletion cs/src/core/Index/Recovery/ICheckpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public interface ICheckpointManager : IDisposable
/// <param name="version"></param>
/// <param name="commitMetadata"></param>
/// <param name="deltaLog"></param>
/// <param name="cookie">any additional (user-specified) information to persist with the checkpoint</param>
void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog);

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions cs/src/core/Index/Recovery/LocalCheckpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken)
/// </summary>
/// <param name="logToken">Token</param>
/// <param name="deltaLog">Delta log</param>
/// <param name="scanDelta"> whether or not to scan through the delta log to acquire latest entry</param>
/// <param name="recoverTo"> version upper bound to scan for in the delta log. Function will return the largest version metadata no greater than the given version.</param>
/// <returns>Metadata, or null if invalid</returns>
public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
{
Expand Down
4 changes: 2 additions & 2 deletions cs/src/core/Index/Recovery/Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ private bool RecoverToInitialPage(IndexCheckpointInfo recoveredICInfo, HybridLog
currentSyncStateMachine = null;

// Set new system state after recovery
systemState.phase = Phase.REST;
systemState.version = recoveredHLCInfo.info.version + 1;
systemState.Phase = Phase.REST;
systemState.Version = recoveredHLCInfo.info.version + 1;

if (!recoveredICInfo.IsDefault() && recoveryCountdown != null)
{
Expand Down
48 changes: 24 additions & 24 deletions cs/src/core/Index/Synchronization/FasterStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public partial class FasterKV<Key, Value>
/// <summary>
/// Current version number of the store
/// </summary>
public long CurrentVersion => systemState.version;
public long CurrentVersion => systemState.Version;

/// <summary>
/// Registers the given callback to be invoked for every state machine transition. Not safe to call with
Expand Down Expand Up @@ -76,9 +76,9 @@ private bool StartStateMachine(ISynchronizationStateMachine stateMachine)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool MakeTransition(SystemState expectedState, SystemState nextState)
{
if (Interlocked.CompareExchange(ref systemState.word, nextState.word, expectedState.word) !=
expectedState.word) return false;
Debug.WriteLine("Moved to {0}, {1}", nextState.phase, nextState.version);
if (Interlocked.CompareExchange(ref systemState.Word, nextState.Word, expectedState.Word) !=
expectedState.Word) return false;
Debug.WriteLine("Moved to {0}, {1}", nextState.Phase, nextState.Version);
return true;
}

Expand Down Expand Up @@ -109,26 +109,26 @@ internal void GlobalStateMachineStep(SystemState expectedState)
currentSyncStateMachine.GlobalAfterEnteringState(nextState, this);

// Mark the state machine done as we exit the state machine.
if (nextState.phase == Phase.REST) stateMachineActive = 0;
if (nextState.Phase == Phase.REST) stateMachineActive = 0;
}


// Given the current global state, return the starting point of the state machine cycle
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private SystemState StartOfCurrentCycle(SystemState currentGlobalState)
{
return currentGlobalState.phase < Phase.REST
? SystemState.Make(Phase.REST, currentGlobalState.version - 1)
: SystemState.Make(Phase.REST, currentGlobalState.version);
return currentGlobalState.Phase < Phase.REST
? SystemState.Make(Phase.REST, currentGlobalState.Version - 1)
: SystemState.Make(Phase.REST, currentGlobalState.Version);
}

// Given the current thread state and global state, fast forward the thread state to the
// current state machine cycle if needed
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private SystemState FastForwardToCurrentCycle(SystemState threadState, SystemState targetStartState)
{
if (threadState.version < targetStartState.version ||
threadState.version == targetStartState.version && threadState.phase < targetStartState.phase)
if (threadState.Version < targetStartState.Version ||
threadState.Version == targetStartState.Version && threadState.Phase < targetStartState.Phase)
{
return targetStartState;
}
Expand All @@ -148,7 +148,7 @@ internal bool SameCycle<Input, Output, Context>(FasterExecutionContext<Input, Ou
{
var _systemState = SystemState.Copy(ref systemState);
SystemState.RemoveIntermediate(ref _systemState);
return StartOfCurrentCycle(threadState).version == StartOfCurrentCycle(_systemState).version;
return StartOfCurrentCycle(threadState).Version == StartOfCurrentCycle(_systemState).Version;
}
return ctx.threadStateMachine == currentSyncStateMachine;
}
Expand Down Expand Up @@ -189,7 +189,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
#region Get returning thread to start of current cycle, issuing completion callbacks if needed
if (ctx != null)
{
if (ctx.version < targetStartState.version)
if (ctx.version < targetStartState.Version)
{
// Issue CPR callback for full session
if (ctx.serialNum != -1)
Expand All @@ -214,7 +214,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
fasterSession?.CheckpointCompletionCallback(ctx.guid, commitPoint);
}
}
if ((ctx.version == targetStartState.version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine))
if ((ctx.version == targetStartState.Version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine))
{
IssueCompletionCallback(ctx, fasterSession);
}
Expand All @@ -223,12 +223,12 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(

// No state machine associated with target, or target is in REST phase:
// we can directly fast forward session to target state
if (currentTask == null || targetState.phase == Phase.REST)
if (currentTask == null || targetState.Phase == Phase.REST)
{
if (ctx != null)
{
ctx.phase = targetState.phase;
ctx.version = targetState.version;
ctx.phase = targetState.Phase;
ctx.version = targetState.Version;
ctx.threadStateMachine = currentTask;
}
return;
Expand Down Expand Up @@ -257,22 +257,22 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
do
{
Debug.Assert(
(threadState.version < targetState.version) ||
(threadState.version == targetState.version &&
(threadState.phase <= targetState.phase || currentTask is IndexSnapshotStateMachine)
(threadState.Version < targetState.Version) ||
(threadState.Version == targetState.Version &&
(threadState.Phase <= targetState.Phase || currentTask is IndexSnapshotStateMachine)
));

currentTask.OnThreadEnteringState(threadState, previousState, this, ctx, fasterSession, valueTasks, token);

if (ctx != null)
{
ctx.phase = threadState.phase;
ctx.version = threadState.version;
ctx.phase = threadState.Phase;
ctx.version = threadState.Version;
}

previousState.word = threadState.word;
previousState.Word = threadState.Word;
threadState = currentTask.NextState(threadState);
if (systemState.word != targetState.word)
if (systemState.Word != targetState.Word)
{
var tmp = SystemState.Copy(ref systemState);
if (currentSyncStateMachine == currentTask)
Expand All @@ -281,7 +281,7 @@ private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
SystemState.RemoveIntermediate(ref targetState);
}
}
} while (previousState.word != targetState.word);
} while (previousState.Word != targetState.Word);
#endregion

return;
Expand Down
14 changes: 7 additions & 7 deletions cs/src/core/Index/Synchronization/FullCheckpointStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
SystemState next,
FasterKV<Key, Value> faster)
{
switch (next.phase)
switch (next.Phase)
{
case Phase.PREP_INDEX_CHECKPOINT:
Debug.Assert(faster._indexCheckpoint.IsDefault() &&
Expand All @@ -25,7 +25,7 @@ public void GlobalBeforeEnteringState<Key, Value>(
faster._indexCheckpointToken = fullCheckpointToken;
faster._hybridLogCheckpointToken = fullCheckpointToken;
faster.InitializeIndexCheckpoint(faster._indexCheckpointToken);
faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version);
faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.Version);
break;
case Phase.WAIT_FLUSH:
faster._indexCheckpoint.info.num_buckets = faster.overflowBucketsAllocator.GetMaxValidAddress();
Expand Down Expand Up @@ -79,19 +79,19 @@ public FullCheckpointStateMachine(ISynchronizationTask checkpointBackend, long t
public override SystemState NextState(SystemState start)
{
var result = SystemState.Copy(ref start);
switch (start.phase)
switch (start.Phase)
{
case Phase.REST:
result.phase = Phase.PREP_INDEX_CHECKPOINT;
result.Phase = Phase.PREP_INDEX_CHECKPOINT;
break;
case Phase.PREP_INDEX_CHECKPOINT:
result.phase = Phase.PREPARE;
result.Phase = Phase.PREPARE;
break;
case Phase.WAIT_PENDING:
result.phase = Phase.WAIT_INDEX_CHECKPOINT;
result.Phase = Phase.WAIT_INDEX_CHECKPOINT;
break;
case Phase.WAIT_INDEX_CHECKPOINT:
result.phase = Phase.WAIT_FLUSH;
result.Phase = Phase.WAIT_FLUSH;
break;
default:
result = base.NextState(start);
Expand Down
Loading