Skip to content

Commit

Permalink
Background deletes, FastCommit throttle metadata writes (#802)
Browse files Browse the repository at this point in the history
* Try work queue for deletes

* run truncate in its own task

* add fastcommit throttling

* nit
  • Loading branch information
badrishc authored Mar 11, 2023
1 parent 519533b commit 3d09097
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 14 deletions.
10 changes: 5 additions & 5 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,10 @@ private protected void VerifyCompatibleSectorSize(IDevice device)
/// <param name="version"></param>
/// <param name="deltaLog"></param>
/// <param name="completedSemaphore"></param>
/// <param name="throttleCheckpointFlush"></param>
/// <param name="throttleCheckpointFlushDelayMs"></param>
internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async delta log flush with throtting {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");
logger?.LogTrace("Starting async delta log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;
Expand Down Expand Up @@ -1297,7 +1297,7 @@ public void ShiftBeginAddress(long newBeginAddress, bool truncateLog)
/// <param name="toAddress"></param>
protected virtual void TruncateUntilAddress(long toAddress)
{
device.TruncateUntilAddress(toAddress);
Task.Run(() => device.TruncateUntilAddress(toAddress));
}

internal virtual bool TryComplete()
Expand Down Expand Up @@ -1866,10 +1866,10 @@ public void AsyncFlushPages<TContext>(long flushPageStart, int numPages, DeviceI
/// <param name="device"></param>
/// <param name="objectLogDevice"></param>
/// <param name="completedSemaphore"></param>
/// <param name="throttleCheckpointFlush"></param>
/// <param name="throttleCheckpointFlushDelayMs"></param>
public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, long fuzzyStartLogicalAddress, IDevice device, IDevice objectLogDevice, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async delta log flush with throtting {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");
logger?.LogTrace("Starting async delta log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;
Expand Down
100 changes: 100 additions & 0 deletions cs/src/core/Allocator/WorkQueueFIFO.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace FASTER.core
{
/// <summary>
/// Shared work queue that ensures one worker at any given time. Uses FIFO ordering of work.
/// </summary>
/// <typeparam name="T"></typeparam>
class WorkQueueFIFO<T> : IDisposable
{
const int kMaxQueueSize = 1 << 30;
readonly ConcurrentQueue<T> _queue;
readonly Action<T> _work;
private int _count;
private bool _disposed;

public WorkQueueFIFO(Action<T> work)
{
_queue = new ConcurrentQueue<T>();
_work = work;
_count = 0;
_disposed = false;
}

public void Dispose()
{
_disposed = true;
// All future enqueue requests will no longer perform work after _disposed is set to true.
while (_count != 0)
Thread.Yield();
// After this point, any previous work must have completed. Even if another enqueue request manipulates the
// count field, they are guaranteed to see disposed and not enqueue any actual work.
}

/// <summary>
/// Enqueue work item, take ownership of draining the work queue
/// if needed
/// </summary>
/// <param name="work">Work to enqueue</param>
/// <param name="asTask">Process work as separate task</param>
/// <returns> whether the enqueue is successful. Enqueuing into a disposed WorkQueue will fail and the task will not be performed</returns>>
public bool EnqueueAndTryWork(T work, bool asTask)
{
Interlocked.Increment(ref _count);
if (_disposed)
{
// Remove self from count in case Dispose() is actively waiting for completion
Interlocked.Decrement(ref _count);
return false;
}

_queue.Enqueue(work);

// Try to take over work queue processing if needed
while (true)
{
int count = _count;
if (count >= kMaxQueueSize) return true;
if (Interlocked.CompareExchange(ref _count, count + kMaxQueueSize, count) == count)
break;
}

if (asTask)
_ = Task.Run(() => ProcessQueue());
else
ProcessQueue();
return true;
}

private void ProcessQueue()
{
// Process items in work queue
while (true)
{
while (_queue.TryDequeue(out var workItem))
{
try
{
_work(workItem);
}
catch { }
Interlocked.Decrement(ref _count);
}

int count = _count;
if (count != kMaxQueueSize) continue;
if (Interlocked.CompareExchange(ref _count, 0, count) == count)
break;
}
}
}
}
17 changes: 10 additions & 7 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1844,14 +1844,17 @@ private void WriteCommitMetadata(FasterLogRecoveryInfo recoveryInfo)
if (!fastCommitMode)
UpdateCommittedState(recoveryInfo);
// Issue any potential physical deletes due to shifts in begin address
try
{
epoch.Resume();
allocator.ShiftBeginAddress(recoveryInfo.BeginAddress, true);
}
finally
if (allocator.BeginAddress < recoveryInfo.BeginAddress)
{
epoch.Suspend();
try
{
epoch.Resume();
allocator.ShiftBeginAddress(recoveryInfo.BeginAddress, true);
}
finally
{
epoch.Suspend();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,32 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
private byte indexTokenHistoryOffset, logTokenHistoryOffset, flogCommitHistoryOffset;

readonly ILogger logger;
readonly WorkQueueFIFO<long> deleteQueue;
readonly int fastCommitThrottleFreq;
int commitCount;

/// <summary>
/// Create new instance of log commit manager
/// </summary>
/// <param name="deviceFactory">Factory for getting devices</param>
/// <param name="checkpointNamingScheme">Checkpoint naming helper</param>
/// <param name="removeOutdated">Remote older FASTER log commits</param>
/// <param name="fastCommitThrottleFreq">FastCommit throttle frequency - use only in FastCommit mode</param>
/// <param name="logger">Remote older FASTER log commits</param>
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, ILogger logger = null)
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, int fastCommitThrottleFreq = 0, ILogger logger = null)
{
this.logger = logger;
this.deviceFactory = deviceFactory;
this.checkpointNamingScheme = checkpointNamingScheme;
this.fastCommitThrottleFreq = fastCommitThrottleFreq;

this.semaphore = new SemaphoreSlim(0);

this.removeOutdated = removeOutdated;
if (removeOutdated)
{
deleteQueue = new WorkQueueFIFO<long>(prior => deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(prior)));

// We keep two index checkpoints as the latest index might not have a
// later log checkpoint to work with
indexTokenHistory = new Guid[indexTokenCount];
Expand Down Expand Up @@ -104,6 +111,8 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, strin
/// <inheritdoc />
public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum)
{
if (fastCommitThrottleFreq > 0 && (commitCount++ % fastCommitThrottleFreq != 0)) return;

using var device = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));

// Two phase to ensure we write metadata in single Write operation
Expand All @@ -120,7 +129,10 @@ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMet
flogCommitHistory[flogCommitHistoryOffset] = commitNum;
flogCommitHistoryOffset = (byte)((flogCommitHistoryOffset + 1) % flogCommitCount);
if (prior != default)
deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(prior));
{
// System.Threading.Tasks.Task.Run(() => deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(prior)));
deleteQueue.EnqueueAndTryWork(prior, true);
}
}
}

Expand Down

0 comments on commit 3d09097

Please sign in to comment.