Skip to content

Commit

Permalink
Updates and bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Aug 12, 2019
1 parent 733e4de commit 30adeca
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 309 deletions.
29 changes: 16 additions & 13 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ public void ShiftReadOnlyToTail(out long tailAddress, out SemaphoreSlim notifyDo
notifyFlushedUntilAddressSemaphore = new SemaphoreSlim(0);
notifyDone = notifyFlushedUntilAddressSemaphore;
notifyFlushedUntilAddress = localTailAddress;
epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(localTailAddress, false));
epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(localTailAddress));
}
}

Expand All @@ -889,7 +889,7 @@ public bool ShiftReadOnlyAddress(long newReadOnlyAddress)
{
if (MonotonicUpdate(ref ReadOnlyAddress, newReadOnlyAddress, out long oldReadOnlyOffset))
{
epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(newReadOnlyAddress, false));
epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(newReadOnlyAddress));
return true;
}
return false;
Expand Down Expand Up @@ -937,8 +937,7 @@ protected virtual void DeleteAddressRange(long fromAddress, long toAddress)
/// Flush: send page to secondary store
/// </summary>
/// <param name="newSafeReadOnlyAddress"></param>
/// <param name="waitForPendingFlushComplete"></param>
public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress, bool waitForPendingFlushComplete = false)
public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress)
{
if (MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress))
{
Expand Down Expand Up @@ -1119,7 +1118,13 @@ protected void ShiftFlushedUntilAddress()
}
}

/// <summary>
/// Address for notification of flushed-until
/// </summary>
public long notifyFlushedUntilAddress;
/// <summary>
/// Semaphore for notification of flushed-until
/// </summary>
public SemaphoreSlim notifyFlushedUntilAddressSemaphore;

/// <summary>
Expand Down Expand Up @@ -1429,21 +1434,19 @@ public void AsyncFlushPages<TContext>(
/// <param name="endLogicalAddress"></param>
/// <param name="device"></param>
/// <param name="objectLogDevice"></param>
/// <param name="completed"></param>
public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, IDevice device, IDevice objectLogDevice, out CountdownEvent completed, out SemaphoreSlim completedSemaphore)
/// <param name="completedSemaphore"></param>
public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, IDevice device, IDevice objectLogDevice, out SemaphoreSlim completedSemaphore)
{
int totalNumPages = (int)(endPage - startPage);
completed = new CountdownEvent(totalNumPages);
completedSemaphore = new SemaphoreSlim(0);
var asyncResult = new PageAsyncFlushResult<Empty>
{
completedSemaphore = completedSemaphore,
count = totalNumPages
};

for (long flushPage = startPage; flushPage < endPage; flushPage++)
{
var asyncResult = new PageAsyncFlushResult<Empty>
{
handle = completed,
handleSemaphore = completedSemaphore,
count = 1
};

var pageSize = PageSize;

Expand Down
20 changes: 7 additions & 13 deletions cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class MallocFixedPageSize<T> : IDisposable
private readonly bool IsPinned;
private readonly bool ReturnPhysicalAddress;

private CountdownEvent checkpointEvent;
private int checkpointCallbackCount;
private SemaphoreSlim checkpointSemaphore;

private ConcurrentQueue<long> freeList;
Expand Down Expand Up @@ -434,17 +434,10 @@ public void TakeCheckpoint(IDevice device, ulong start_offset, out ulong numByte
/// <summary>
/// Is checkpoint complete
/// </summary>
/// <param name="waitUntilComplete"></param>
/// <returns></returns>
public bool IsCheckpointCompleted(bool waitUntilComplete = false)
public bool IsCheckpointCompleted()
{
bool completed = checkpointEvent.IsSet;
if (!completed && waitUntilComplete)
{
checkpointEvent.Wait();
return true;
}
return completed;
return checkpointCallbackCount == 0;
}

/// <summary>
Expand All @@ -463,7 +456,7 @@ internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong num
int recordsCountInLastLevel = localCount & PageSizeMask;
int numCompleteLevels = localCount >> PageSizeBits;
int numLevels = numCompleteLevels + (recordsCountInLastLevel > 0 ? 1 : 0);
checkpointEvent = new CountdownEvent(numLevels);
checkpointCallbackCount = numLevels;
checkpointSemaphore = new SemaphoreSlim(0);
uint alignedPageSize = PageSize * (uint)RecordSize;
uint lastLevelSize = (uint)recordsCountInLastLevel * (uint)RecordSize;
Expand Down Expand Up @@ -496,9 +489,10 @@ private unsafe void AsyncFlushCallback(uint errorCode, uint numBytes, NativeOver
}
finally
{
checkpointEvent.Signal();
if (checkpointEvent.CurrentCount == 0)
if (Interlocked.Decrement(ref checkpointCallbackCount) == 0)
{
checkpointSemaphore.Release();
}
Overlapped.Free(overlap);
}
}
Expand Down
63 changes: 26 additions & 37 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O
{
return;
}
else
{
throw new Exception("CompletePending loops");
}
} while (true);
}

Expand All @@ -83,6 +87,9 @@ internal async ValueTask CompleteCheckpointAsync(ClientSession<Key, Value, Input
// for the checkpoint to be proceed
do
{
await InternalRefreshAsync(clientSession);
clientSession.ResumeThread();

await CompletePendingAsync(clientSession);
clientSession.ResumeThread();

Expand Down Expand Up @@ -133,46 +140,30 @@ private bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext
return false;
}

private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, bool async = true)
{
var previousState = SystemState.Make(threadCtx.Value.phase, threadCtx.Value.version);
var finalState = SystemState.Copy(ref _systemState);

while (finalState.phase == Phase.INTERMEDIATE)
finalState = SystemState.Copy(ref _systemState);

// We need to move from previousState to finalState one step at a time
do
{
// Coming back - need to complete older checkpoint first
if ((finalState.version > previousState.version + 1) ||
(finalState.version > previousState.version && finalState.phase == Phase.REST))
{
if (lastFullCheckpointVersion > previousState.version)
{
functions.CheckpointCompletionCallback(threadCtx.Value.guid, threadCtx.Value.serialNum);
}

// Get system out of intermediate phase
while (finalState.phase == Phase.INTERMEDIATE)
{
finalState = SystemState.Copy(ref _systemState);
}

// Fast forward to current global state
previousState.version = finalState.version;
previousState.phase = finalState.phase;
}

// Don't play around when system state is being changed
if (finalState.phase == Phase.INTERMEDIATE)
{
return;
}


var currentState = default(SystemState);
if (previousState.word == finalState.word)
{
currentState.word = previousState.word;
}
else if (previousState.version < finalState.version)
{
if (finalState.phase <= Phase.PREPARE)
previousState.version = finalState.version;
else
previousState.version = finalState.version - 1;
currentState = GetNextState(previousState, CheckpointType.FULL);
}
else
{
currentState = GetNextState(previousState, _checkpointType);
Expand Down Expand Up @@ -200,7 +191,7 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
threadCtx.Value.markers[EpochPhaseIdx.PrepareForIndexCheckpt] = false;
}

if (!IsIndexFuzzyCheckpointCompleted())
if (async && !IsIndexFuzzyCheckpointCompleted())
{
clientSession.SuspendThread();
await IsIndexFuzzyCheckpointCompletedAsync();
Expand Down Expand Up @@ -235,13 +226,13 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
{
// Need to be very careful here as threadCtx is changing
FasterExecutionContext ctx;
if (previousState.phase == Phase.PREPARE)
if (previousState.phase == Phase.IN_PROGRESS)
{
ctx = threadCtx.Value;
ctx = prevThreadCtx.Value;
}
else
{
ctx = prevThreadCtx.Value;
ctx = threadCtx.Value;
}

if (!ctx.markers[EpochPhaseIdx.InProgress])
Expand Down Expand Up @@ -288,13 +279,12 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
}
else
{
notify = (_hybridLogCheckpoint.flushed != null) && _hybridLogCheckpoint.flushed.IsSet;
notify = (_hybridLogCheckpoint.flushedSemaphore != null) && _hybridLogCheckpoint.flushedSemaphore.CurrentCount > 0;
}

if (!notify)
if (async && !notify)
{
Debug.Assert(_hybridLogCheckpoint.flushedSemaphore != null);

clientSession.SuspendThread();
await _hybridLogCheckpoint.flushedSemaphore.WaitAsync();
clientSession.ResumeThread();
Expand All @@ -308,7 +298,7 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
{
notify = notify && IsIndexFuzzyCheckpointCompleted();

if (!notify)
if (async && !notify)
{
clientSession.SuspendThread();
await IsIndexFuzzyCheckpointCompletedAsync();
Expand Down Expand Up @@ -340,7 +330,6 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,

if (epoch.MarkAndCheckIsComplete(EpochPhaseIdx.CheckpointCompletionCallback, prevThreadCtx.Value.version))
{
lastFullCheckpointVersion = currentState.version;
GlobalMoveToNextCheckpointState(currentState);
}

Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ internal struct HybridLogCheckpointInfo
public HybridLogRecoveryInfo info;
public IDevice snapshotFileDevice;
public IDevice snapshotFileObjectLogDevice;
public CountdownEvent flushed;
public SemaphoreSlim flushedSemaphore;
public long started;

Expand All @@ -349,7 +348,7 @@ public void Recover(Guid token, ICheckpointManager checkpointManager)
public void Reset()
{
started = 0;
flushed = null;
flushedSemaphore = null;
info.Reset();
if (snapshotFileDevice != null) snapshotFileDevice.Close();
if (snapshotFileObjectLogDevice != null) snapshotFileObjectLogDevice.Close();
Expand Down
3 changes: 1 addition & 2 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ private enum CheckpointType
{
INDEX_ONLY,
HYBRID_LOG_ONLY,
FULL,
NONE
FULL
}

private CheckpointType _checkpointType;
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ internal long InternalContinue(Guid guid)
InitContext(prevThreadCtx.Value, guid);
prevThreadCtx.Value.version--;
threadCtx.Value.serialNum = serialNum;
InternalRefresh();
}
else
{
Expand All @@ -75,6 +74,7 @@ internal long InternalContinue(Guid guid)
}

MakeTransition(intermediateState, currentState);
InternalRefresh();
return serialNum;
}
}
Expand Down
Loading

0 comments on commit 30adeca

Please sign in to comment.