Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background deletes, FastCommit throttle metadata writes #802

Merged
merged 6 commits into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,10 @@ private protected void VerifyCompatibleSectorSize(IDevice device)
/// <param name="version"></param>
/// <param name="deltaLog"></param>
/// <param name="completedSemaphore"></param>
/// <param name="throttleCheckpointFlush"></param>
/// <param name="throttleCheckpointFlushDelayMs"></param>
internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async delta log flush with throtting {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");
logger?.LogTrace("Starting async delta log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;
Expand Down Expand Up @@ -1297,7 +1297,7 @@ public void ShiftBeginAddress(long newBeginAddress, bool truncateLog)
/// <param name="toAddress"></param>
protected virtual void TruncateUntilAddress(long toAddress)
{
device.TruncateUntilAddress(toAddress);
Task.Run(() => device.TruncateUntilAddress(toAddress));
}

internal virtual bool TryComplete()
Expand Down Expand Up @@ -1866,10 +1866,10 @@ public void AsyncFlushPages<TContext>(long flushPageStart, int numPages, DeviceI
/// <param name="device"></param>
/// <param name="objectLogDevice"></param>
/// <param name="completedSemaphore"></param>
/// <param name="throttleCheckpointFlush"></param>
/// <param name="throttleCheckpointFlushDelayMs"></param>
public void AsyncFlushPagesToDevice(long startPage, long endPage, long endLogicalAddress, long fuzzyStartLogicalAddress, IDevice device, IDevice objectLogDevice, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async delta log flush with throtting {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");
logger?.LogTrace("Starting async delta log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;
Expand Down
100 changes: 100 additions & 0 deletions cs/src/core/Allocator/WorkQueueFIFO.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace FASTER.core
{
/// <summary>
/// Shared work queue that ensures one worker at any given time. Uses FIFO ordering of work.
/// </summary>
/// <typeparam name="T"></typeparam>
class WorkQueueFIFO<T> : IDisposable
{
const int kMaxQueueSize = 1 << 30;
readonly ConcurrentQueue<T> _queue;
readonly Action<T> _work;
private int _count;
private bool _disposed;

public WorkQueueFIFO(Action<T> work)
{
_queue = new ConcurrentQueue<T>();
_work = work;
_count = 0;
_disposed = false;
}

public void Dispose()
{
_disposed = true;
// All future enqueue requests will no longer perform work after _disposed is set to true.
while (_count != 0)
Thread.Yield();
// After this point, any previous work must have completed. Even if another enqueue request manipulates the
// count field, they are guaranteed to see disposed and not enqueue any actual work.
}

/// <summary>
/// Enqueue work item, take ownership of draining the work queue
/// if needed
/// </summary>
/// <param name="work">Work to enqueue</param>
/// <param name="asTask">Process work as separate task</param>
/// <returns> whether the enqueue is successful. Enqueuing into a disposed WorkQueue will fail and the task will not be performed</returns>>
public bool EnqueueAndTryWork(T work, bool asTask)
{
Interlocked.Increment(ref _count);
if (_disposed)
{
// Remove self from count in case Dispose() is actively waiting for completion
Interlocked.Decrement(ref _count);
return false;
}

_queue.Enqueue(work);

// Try to take over work queue processing if needed
while (true)
{
int count = _count;
if (count >= kMaxQueueSize) return true;
if (Interlocked.CompareExchange(ref _count, count + kMaxQueueSize, count) == count)
break;
}

if (asTask)
_ = Task.Run(() => ProcessQueue());
else
ProcessQueue();
return true;
}

private void ProcessQueue()
{
// Process items in work queue
while (true)
{
while (_queue.TryDequeue(out var workItem))
{
try
{
_work(workItem);
}
catch { }
Interlocked.Decrement(ref _count);
}

int count = _count;
if (count != kMaxQueueSize) continue;
if (Interlocked.CompareExchange(ref _count, 0, count) == count)
break;
}
}
}
}
17 changes: 10 additions & 7 deletions cs/src/core/FasterLog/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1844,14 +1844,17 @@ private void WriteCommitMetadata(FasterLogRecoveryInfo recoveryInfo)
if (!fastCommitMode)
UpdateCommittedState(recoveryInfo);
// Issue any potential physical deletes due to shifts in begin address
try
{
epoch.Resume();
allocator.ShiftBeginAddress(recoveryInfo.BeginAddress, true);
}
finally
if (allocator.BeginAddress < recoveryInfo.BeginAddress)
{
epoch.Suspend();
try
{
epoch.Resume();
allocator.ShiftBeginAddress(recoveryInfo.BeginAddress, true);
}
finally
{
epoch.Suspend();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,32 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
private byte indexTokenHistoryOffset, logTokenHistoryOffset, flogCommitHistoryOffset;

readonly ILogger logger;
readonly WorkQueueFIFO<long> deleteQueue;
readonly int fastCommitThrottleFreq;
int commitCount;

/// <summary>
/// Create new instance of log commit manager
/// </summary>
/// <param name="deviceFactory">Factory for getting devices</param>
/// <param name="checkpointNamingScheme">Checkpoint naming helper</param>
/// <param name="removeOutdated">Remote older FASTER log commits</param>
/// <param name="fastCommitThrottleFreq">FastCommit throttle frequency - use only in FastCommit mode</param>
/// <param name="logger">Remote older FASTER log commits</param>
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, ILogger logger = null)
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool removeOutdated = true, int fastCommitThrottleFreq = 0, ILogger logger = null)
{
this.logger = logger;
this.deviceFactory = deviceFactory;
this.checkpointNamingScheme = checkpointNamingScheme;
this.fastCommitThrottleFreq = fastCommitThrottleFreq;

this.semaphore = new SemaphoreSlim(0);

this.removeOutdated = removeOutdated;
if (removeOutdated)
{
deleteQueue = new WorkQueueFIFO<long>(prior => deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(prior)));

// We keep two index checkpoints as the latest index might not have a
// later log checkpoint to work with
indexTokenHistory = new Guid[indexTokenCount];
Expand Down Expand Up @@ -104,6 +111,8 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, strin
/// <inheritdoc />
public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMetadata, long commitNum)
{
if (fastCommitThrottleFreq > 0 && (commitCount++ % fastCommitThrottleFreq != 0)) return;

using var device = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));

// Two phase to ensure we write metadata in single Write operation
Expand All @@ -120,7 +129,10 @@ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMet
flogCommitHistory[flogCommitHistoryOffset] = commitNum;
flogCommitHistoryOffset = (byte)((flogCommitHistoryOffset + 1) % flogCommitCount);
if (prior != default)
deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(prior));
{
// System.Threading.Tasks.Task.Run(() => deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(prior)));
deleteQueue.EnqueueAndTryWork(prior, true);
}
}
}

Expand Down