Skip to content

Commit

Permalink
[C#] Fix OnPagesClosed partial-page handling (#712)
Browse files Browse the repository at this point in the history
* Wait to free a partial page until all prior portions of the page have been closed

* Add CompletePendingAsync and CompletePendingWithOutputsAsync to IFasterContext and (Lockable)UnsafeContext

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TedHartMS and badrishc authored Jun 11, 2022
1 parent 417381c commit 854b5d1
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 74 deletions.
22 changes: 10 additions & 12 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,23 +1303,21 @@ private void OnPagesClosed(long newSafeHeadAddress)
int closePage = (int)(closePageAddress >> LogPageSizeBits);
int closePageIndex = closePage % BufferSize;

// Do not free or clear partial page
// Future work: clear partial page
// If the end of the closing range is at the end of the page, free the page
if (end == closePageAddress + PageSize)
{
FreePage(closePage);
}

// If start of closing range is not at page beginning,
// spin-wait until adjacent earlier range is closed
if ((start & PageSizeMask) > 0)
{
while (ClosedUntilAddress < start)
// If the start of the closing range is not at the beginning of this page, spin-wait until the adjacent earlier ranges on this page are closed
if ((start & PageSizeMask) > 0)
{
epoch.ProtectAndDrain();
Thread.Yield();
while (ClosedUntilAddress < start)
{
epoch.ProtectAndDrain();
Thread.Yield();
}
}
FreePage(closePage);
}

Utility.MonotonicUpdate(ref PageStatusIndicator[closePageIndex].LastClosedUntilAddress, end, out _);
ShiftClosedUntilAddress();
if (ClosedUntilAddress > FlushedUntilAddress)
Expand Down
12 changes: 2 additions & 10 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -567,19 +567,11 @@ internal bool UnsafeCompletePending<FasterSession>(FasterSession fasterSession,
return result;
}

/// <summary>
/// Complete all pending synchronous FASTER operations.
/// Async operations must be completed individually.
/// </summary>
/// <returns></returns>
/// <inheritdoc/>
public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
=> CompletePendingAsync(false, waitForCommit, token);

/// <summary>
/// Complete all pending synchronous FASTER operations, returning outputs for the completed operations.
/// Async operations must be completed individually.
/// </summary>
/// <returns>Outputs completed by this operation</returns>
/// <inheritdoc/>
public async ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default)
{
InitializeCompletedOutputs();
Expand Down
14 changes: 14 additions & 0 deletions cs/src/core/ClientSession/IFasterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ public interface IFasterContext<Key, Value, Input, Output, Context>
/// <returns>True if all pending operations have completed, false otherwise</returns>
bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false);

/// <summary>
/// Complete all pending synchronous FASTER operations.
/// Async operations must be completed individually.
/// </summary>
/// <returns></returns>
ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default);

/// <summary>
/// Complete all pending synchronous FASTER operations, returning outputs for the completed operations.
/// Async operations must be completed individually.
/// </summary>
/// <returns>Outputs completed by this operation</returns>
ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default);

/// <summary>
/// Read operation
/// </summary>
Expand Down
35 changes: 21 additions & 14 deletions cs/src/core/ClientSession/LockableUnsafeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,6 @@ public void SuspendThread()
/// </summary>
public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch;

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit);
}

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit);
}

#region Acquire and Dispose
internal void Acquire()
{
Expand Down Expand Up @@ -206,6 +192,27 @@ public void Unlock(ref Key key, LockType lockType)

#region IFasterContext

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit);
}

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit);
}

/// <inheritdoc/>
public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
=> this.clientSession.CompletePendingAsync(waitForCommit, token);

/// <inheritdoc/>
public ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default)
=> this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token);
/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
Expand Down
36 changes: 22 additions & 14 deletions cs/src/core/ClientSession/UnsafeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,6 @@ public void SuspendThread()
/// </summary>
public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch;

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit);
}

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit);
}

#region Acquire and Dispose
internal void Acquire()
{
Expand All @@ -103,6 +89,28 @@ public void Dispose()

#region IFasterContext

/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit);
}

/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit);
}

/// <inheritdoc/>
public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default)
=> this.clientSession.CompletePendingAsync(waitForCommit, token);

/// <inheritdoc/>
public ValueTask<CompletedOutputIterator<Key, Value, Input, Output, Context>> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default)
=> this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token);

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
Expand Down
114 changes: 114 additions & 0 deletions cs/test/LockableUnsafeContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using FASTER.test.ReadCacheTests;
using System.Threading.Tasks;
using static FASTER.test.TestUtils;
using System.Diagnostics;

namespace FASTER.test.LockableUnsafeContext
{
Expand Down Expand Up @@ -179,6 +180,119 @@ void EnsureNoLocks()
Assert.Greater(count, numRecords - 10);
}

[Test]
[Category("FasterKV")]
[Category("Smoke")]
public async Task TestShiftHeadAddress([Values] SyncMode syncMode)
{
int input = default;
const int RandSeed = 10;
const int RandRange = numRecords;
const int NumRecs = 200;

Random r = new(RandSeed);
var sw = Stopwatch.StartNew();

// Copied from UnsafeContextTests to test Async.
using var luContext = session.GetLockableUnsafeContext();
luContext.ResumeThread(out var epoch);

try
{
for (int c = 0; c < NumRecs; c++)
{
var key1 = r.Next(RandRange);
var value = key1 + numRecords;
if (syncMode == SyncMode.Sync)
{
luContext.Upsert(ref key1, ref value, Empty.Default, 0);
}
else
{
luContext.SuspendThread();
var status = (await luContext.UpsertAsync(ref key1, ref value)).Complete();
luContext.ResumeThread();
Assert.IsFalse(status.IsPending);
}
}

r = new Random(RandSeed);
sw.Restart();

for (int c = 0; c < NumRecs; c++)
{
var key1 = r.Next(RandRange);
var value = key1 + numRecords;
int output = 0;

Status status;
if (syncMode == SyncMode.Sync || (c % 1 == 0)) // in .Async mode, half the ops should be sync to test CompletePendingAsync
{
status = luContext.Read(ref key1, ref input, ref output, Empty.Default, 0);
}
else
{
luContext.SuspendThread();
(status, output) = (await luContext.ReadAsync(ref key1, ref input)).Complete();
luContext.ResumeThread();
}
if (!status.IsPending)
{
Assert.AreEqual(value, output);
}
}
if (syncMode == SyncMode.Sync)
{
luContext.CompletePending(true);
}
else
{
luContext.SuspendThread();
await luContext.CompletePendingAsync();
luContext.ResumeThread();
}

// Shift head and retry - should not find in main memory now
fht.Log.FlushAndEvict(true);

r = new Random(RandSeed);
sw.Restart();

for (int c = 0; c < NumRecs; c++)
{
var key1 = r.Next(RandRange);
int output = 0;
Status foundStatus = luContext.Read(ref key1, ref input, ref output, Empty.Default, 0);
Assert.IsTrue(foundStatus.IsPending);
}

CompletedOutputIterator<int, int, int, int, Empty> outputs;
if (syncMode == SyncMode.Sync)
{
luContext.CompletePendingWithOutputs(out outputs, wait: true);
}
else
{
luContext.SuspendThread();
outputs = await luContext.CompletePendingWithOutputsAsync();
luContext.ResumeThread();
}

int count = 0;
while (outputs.Next())
{
count++;
Assert.AreEqual(outputs.Current.Key + numRecords, outputs.Current.Output);
}
outputs.Dispose();
Assert.AreEqual(NumRecs, count);
}
finally
{
luContext.SuspendThread();
}
}

[Test]
[Category(LockableUnsafeContextTestCategory)]
[Category(SmokeTestCategory)]
Expand Down
Loading

0 comments on commit 854b5d1

Please sign in to comment.