Skip to content

Commit

Permalink
Support true async CompleteCheckpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Aug 7, 2019
1 parent 2eafe9b commit 0bd752e
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 25 deletions.
22 changes: 18 additions & 4 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -865,13 +865,18 @@ public void CheckForAllocateComplete(ref long address)
/// Used by applications to make the current state of the database immutable quickly
/// </summary>
/// <param name="tailAddress"></param>
public void ShiftReadOnlyToTail(out long tailAddress)
/// <param name="notifyDone"></param>
public void ShiftReadOnlyToTail(out long tailAddress, out SemaphoreSlim notifyDone)
{
notifyDone = null;
tailAddress = GetTailAddress();
long localTailAddress = tailAddress;
long currentReadOnlyOffset = ReadOnlyAddress;
if (MonotonicUpdate(ref ReadOnlyAddress, tailAddress, out long oldReadOnlyOffset))
{
notifyFlushedUntilAddressSemaphore = new SemaphoreSlim(0);
notifyDone = notifyFlushedUntilAddressSemaphore;
notifyFlushedUntilAddress = localTailAddress;
epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(localTailAddress, false));
}
}
Expand Down Expand Up @@ -1104,11 +1109,18 @@ protected void ShiftFlushedUntilAddress()

if (update)
{
MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress);
if (MonotonicUpdate(ref FlushedUntilAddress, currentFlushedUntilAddress, out long oldFlushedUntilAddress))
{
if ((oldFlushedUntilAddress < notifyFlushedUntilAddress) && (currentFlushedUntilAddress >= notifyFlushedUntilAddress))
{
notifyFlushedUntilAddressSemaphore.Release();
}
}
}
}


public long notifyFlushedUntilAddress;
public SemaphoreSlim notifyFlushedUntilAddressSemaphore;

/// <summary>
/// Used by several functions to update the variable to newValue. Ignores if newValue is smaller or
Expand Down Expand Up @@ -1418,16 +1430,18 @@ public void AsyncFlushPages<TContext>(
/// <param name="device"></param>
/// <param name="objectLogDevice"></param>
/// <param name="completed"></param>
public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, IDevice device, IDevice objectLogDevice, out CountdownEvent completed)
public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, IDevice device, IDevice objectLogDevice, out CountdownEvent completed, out SemaphoreSlim completedSemaphore)
{
int totalNumPages = (int)(endPage - startPage);
completed = new CountdownEvent(totalNumPages);
completedSemaphore = new SemaphoreSlim(0);

for (long flushPage = startPage; flushPage < endPage; flushPage++)
{
var asyncResult = new PageAsyncFlushResult<Empty>
{
handle = completed,
handleSemaphore = completedSemaphore,
count = 1
};

Expand Down
25 changes: 19 additions & 6 deletions cs/src/core/Allocator/MallocFixedPageSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace FASTER.core
{
/// <summary>
/// Memory allocator for objects
/// </summary>
/// <typeparam name="T"></typeparam>
public unsafe class MallocFixedPageSize<T> : IDisposable
public class MallocFixedPageSize<T> : IDisposable
{
private const bool ForceUnpinnedAllocation = false;

Expand Down Expand Up @@ -47,6 +48,7 @@ public unsafe class MallocFixedPageSize<T> : IDisposable
private readonly bool ReturnPhysicalAddress;

private CountdownEvent checkpointEvent;
private SemaphoreSlim checkpointSemaphore;

private ConcurrentQueue<long> freeList;

Expand Down Expand Up @@ -445,15 +447,24 @@ public bool IsCheckpointCompleted(bool waitUntilComplete = false)
return completed;
}

/// <summary>
/// Is checkpoint completed
/// </summary>
/// <returns></returns>
public async ValueTask IsCheckpointCompletedAsync()
{
await checkpointSemaphore.WaitAsync();
checkpointSemaphore.Release();
}

internal void BeginCheckpoint(IDevice device, ulong offset, out ulong numBytesWritten)
internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong numBytesWritten)
{
int localCount = count;
int recordsCountInLastLevel = localCount & PageSizeMask;
int numCompleteLevels = localCount >> PageSizeBits;
int numLevels = numCompleteLevels + (recordsCountInLastLevel > 0 ? 1 : 0);
checkpointEvent = new CountdownEvent(numLevels);

checkpointSemaphore = new SemaphoreSlim(0);
uint alignedPageSize = PageSize * (uint)RecordSize;
uint lastLevelSize = (uint)recordsCountInLastLevel * (uint)RecordSize;

Expand All @@ -470,7 +481,7 @@ internal void BeginCheckpoint(IDevice device, ulong offset, out ulong numBytesWr
}
}

private void AsyncFlushCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap)
private unsafe void AsyncFlushCallback(uint errorCode, uint numBytes, NativeOverlapped* overlap)
{
try
{
Expand All @@ -486,6 +497,8 @@ private void AsyncFlushCallback(uint errorCode, uint numBytes, NativeOverlapped*
finally
{
checkpointEvent.Signal();
if (checkpointEvent.CurrentCount == 0)
checkpointSemaphore.Release();
Overlapped.Free(overlap);
}
}
Expand Down Expand Up @@ -543,7 +556,7 @@ public bool IsRecoveryCompleted(bool waitUntilComplete = false)
// Implementation of asynchronous recovery
private int numLevelsToBeRecovered;

internal void BeginRecovery(IDevice device,
internal unsafe void BeginRecovery(IDevice device,
ulong offset,
int buckets,
ulong numBytesToRead,
Expand Down Expand Up @@ -575,7 +588,7 @@ internal void BeginRecovery(IDevice device,
}
}

private void AsyncPageReadCallback(
private unsafe void AsyncPageReadCallback(
uint errorCode,
uint numBytes,
NativeOverlapped* overlap)
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Index/Common/Contexts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ internal struct HybridLogCheckpointInfo
public IDevice snapshotFileDevice;
public IDevice snapshotFileObjectLogDevice;
public CountdownEvent flushed;
public SemaphoreSlim flushedSemaphore;
public long started;

public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager)
Expand Down
21 changes: 21 additions & 0 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,27 @@ public bool CompleteCheckpoint(bool wait = false)
return false;
}

/// <summary>
/// Complete the ongoing checkpoint (if any)
/// </summary>
/// <returns></returns>
public async ValueTask CompleteCheckpointAsync()
{
// Thread has an active session.
// So we need to constantly complete pending
// and refresh (done inside CompletePending)
// for the checkpoint to be proceed
do
{
await CompletePendingAsync();
if (_systemState.phase == Phase.REST)
{
await CompletePendingAsync();
return;
}
} while (true);
}

/// <summary>
/// Read
/// </summary>
Expand Down
26 changes: 24 additions & 2 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,28 @@ internal void InternalRefresh()
HandleCheckpointingPhases();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal async ValueTask InternalRefreshAsync()
{
epoch.ProtectAndDrain();

// We check if we are in normal mode
var newPhaseInfo = SystemState.Copy(ref _systemState);
if (threadCtx.Value.phase == Phase.REST && newPhaseInfo.phase == Phase.REST && threadCtx.Value.version == newPhaseInfo.version)
{
return;
}

// Moving to non-checkpointing phases
if (newPhaseInfo.phase == Phase.GC || newPhaseInfo.phase == Phase.PREPARE_GROW || newPhaseInfo.phase == Phase.IN_PROGRESS_GROW)
{
threadCtx.Value.phase = newPhaseInfo.phase;
return;
}

await HandleCheckpointingPhasesAsync();
}

internal void InternalRelease()
{
Debug.Assert(threadCtx.Value.retryRequests.Count == 0 &&
Expand Down Expand Up @@ -211,7 +233,7 @@ internal async ValueTask InternalCompletePendingAsync()
await CompleteIOPendingRequestsAsync(prevThreadCtx.Value);
Debug.Assert(prevThreadCtx.Value.ioPendingRequests.Count == 0);

InternalRefresh();
await InternalRefreshAsync();
CompleteRetryRequests(prevThreadCtx.Value);

done &= (prevThreadCtx.Value.ioPendingRequests.Count == 0);
Expand All @@ -226,7 +248,7 @@ internal async ValueTask InternalCompletePendingAsync()
await CompleteIOPendingRequestsAsync(threadCtx.Value);
Debug.Assert(threadCtx.Value.ioPendingRequests.Count == 0);
}
InternalRefresh();
await InternalRefreshAsync();
CompleteRetryRequests(threadCtx.Value);

done &= (threadCtx.Value.ioPendingRequests.Count == 0);
Expand Down
Loading

0 comments on commit 0bd752e

Please sign in to comment.