diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 5b4b8c80a..759bec264 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1303,23 +1303,21 @@ private void OnPagesClosed(long newSafeHeadAddress) int closePage = (int)(closePageAddress >> LogPageSizeBits); int closePageIndex = closePage % BufferSize; - // Do not free or clear partial page - // Future work: clear partial page + // If the end of the closing range is at the end of the page, free the page if (end == closePageAddress + PageSize) { - FreePage(closePage); - } - - // If start of closing range is not at page beginning, - // spin-wait until adjacent earlier range is closed - if ((start & PageSizeMask) > 0) - { - while (ClosedUntilAddress < start) + // If the start of the closing range is not at the beginning of this page, spin-wait until the adjacent earlier ranges on this page are closed + if ((start & PageSizeMask) > 0) { - epoch.ProtectAndDrain(); - Thread.Yield(); + while (ClosedUntilAddress < start) + { + epoch.ProtectAndDrain(); + Thread.Yield(); + } } + FreePage(closePage); } + Utility.MonotonicUpdate(ref PageStatusIndicator[closePageIndex].LastClosedUntilAddress, end, out _); ShiftClosedUntilAddress(); if (ClosedUntilAddress > FlushedUntilAddress) diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index afbe8fe16..1cb052268 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -567,19 +567,11 @@ internal bool UnsafeCompletePending(FasterSession fasterSession, return result; } - /// - /// Complete all pending synchronous FASTER operations. - /// Async operations must be completed individually. - /// - /// + /// public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) => CompletePendingAsync(false, waitForCommit, token); - /// - /// Complete all pending synchronous FASTER operations, returning outputs for the completed operations. - /// Async operations must be completed individually. - /// - /// Outputs completed by this operation + /// public async ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) { InitializeCompletedOutputs(); diff --git a/cs/src/core/ClientSession/IFasterContext.cs b/cs/src/core/ClientSession/IFasterContext.cs index a7033a097..6dc2c16ba 100644 --- a/cs/src/core/ClientSession/IFasterContext.cs +++ b/cs/src/core/ClientSession/IFasterContext.cs @@ -30,6 +30,20 @@ public interface IFasterContext /// True if all pending operations have completed, false otherwise bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false); + /// + /// Complete all pending synchronous FASTER operations. + /// Async operations must be completed individually. + /// + /// + ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default); + + /// + /// Complete all pending synchronous FASTER operations, returning outputs for the completed operations. + /// Async operations must be completed individually. + /// + /// Outputs completed by this operation + ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default); + /// /// Read operation /// diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs index 211b197e5..386665c0e 100644 --- a/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -72,20 +72,6 @@ public void SuspendThread() /// public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch; - /// - public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) - { - Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); - } - - /// - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) - { - Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit); - } - #region Acquire and Dispose internal void Acquire() { @@ -206,6 +192,27 @@ public void Unlock(ref Key key, LockType lockType) #region IFasterContext + /// + public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) + { + Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); + return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); + } + + /// + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) + { + Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); + return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit); + } + + /// + public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) + => this.clientSession.CompletePendingAsync(waitForCommit, token); + + /// + public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) + => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token); /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) diff --git a/cs/src/core/ClientSession/UnsafeContext.cs b/cs/src/core/ClientSession/UnsafeContext.cs index 1e9ca4ff3..76e035573 100644 --- a/cs/src/core/ClientSession/UnsafeContext.cs +++ b/cs/src/core/ClientSession/UnsafeContext.cs @@ -68,20 +68,6 @@ public void SuspendThread() /// public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch; - /// - public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) - { - Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); - } - - /// - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) - { - Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit); - } - #region Acquire and Dispose internal void Acquire() { @@ -103,6 +89,28 @@ public void Dispose() #region IFasterContext + /// + public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) + { + Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); + return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); + } + + /// + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) + { + Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); + return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit); + } + + /// + public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) + => this.clientSession.CompletePendingAsync(waitForCommit, token); + + /// + public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) + => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token); + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) diff --git a/cs/test/LockableUnsafeContextTests.cs b/cs/test/LockableUnsafeContextTests.cs index 8d1093fea..e16b0bdeb 100644 --- a/cs/test/LockableUnsafeContextTests.cs +++ b/cs/test/LockableUnsafeContextTests.cs @@ -11,6 +11,7 @@ using FASTER.test.ReadCacheTests; using System.Threading.Tasks; using static FASTER.test.TestUtils; +using System.Diagnostics; namespace FASTER.test.LockableUnsafeContext { @@ -179,6 +180,119 @@ void EnsureNoLocks() Assert.Greater(count, numRecords - 10); } + [Test] + [Category("FasterKV")] + [Category("Smoke")] + public async Task TestShiftHeadAddress([Values] SyncMode syncMode) + { + int input = default; + const int RandSeed = 10; + const int RandRange = numRecords; + const int NumRecs = 200; + + Random r = new(RandSeed); + var sw = Stopwatch.StartNew(); + + // Copied from UnsafeContextTests to test Async. + using var luContext = session.GetLockableUnsafeContext(); + luContext.ResumeThread(out var epoch); + + try + { + for (int c = 0; c < NumRecs; c++) + { + var key1 = r.Next(RandRange); + var value = key1 + numRecords; + if (syncMode == SyncMode.Sync) + { + luContext.Upsert(ref key1, ref value, Empty.Default, 0); + } + else + { + luContext.SuspendThread(); + var status = (await luContext.UpsertAsync(ref key1, ref value)).Complete(); + luContext.ResumeThread(); + Assert.IsFalse(status.IsPending); + } + } + + r = new Random(RandSeed); + sw.Restart(); + + for (int c = 0; c < NumRecs; c++) + { + var key1 = r.Next(RandRange); + var value = key1 + numRecords; + int output = 0; + + Status status; + if (syncMode == SyncMode.Sync || (c % 1 == 0)) // in .Async mode, half the ops should be sync to test CompletePendingAsync + { + status = luContext.Read(ref key1, ref input, ref output, Empty.Default, 0); + } + else + { + luContext.SuspendThread(); + (status, output) = (await luContext.ReadAsync(ref key1, ref input)).Complete(); + luContext.ResumeThread(); + } + if (!status.IsPending) + { + Assert.AreEqual(value, output); + } + } + if (syncMode == SyncMode.Sync) + { + luContext.CompletePending(true); + } + else + { + luContext.SuspendThread(); + await luContext.CompletePendingAsync(); + luContext.ResumeThread(); + } + + // Shift head and retry - should not find in main memory now + fht.Log.FlushAndEvict(true); + + r = new Random(RandSeed); + sw.Restart(); + + for (int c = 0; c < NumRecs; c++) + { + var key1 = r.Next(RandRange); + int output = 0; + Status foundStatus = luContext.Read(ref key1, ref input, ref output, Empty.Default, 0); + Assert.IsTrue(foundStatus.IsPending); + } + + CompletedOutputIterator outputs; + if (syncMode == SyncMode.Sync) + { + luContext.CompletePendingWithOutputs(out outputs, wait: true); + } + else + { + luContext.SuspendThread(); + outputs = await luContext.CompletePendingWithOutputsAsync(); + luContext.ResumeThread(); + } + + int count = 0; + while (outputs.Next()) + { + count++; + Assert.AreEqual(outputs.Current.Key + numRecords, outputs.Current.Output); + } + outputs.Dispose(); + Assert.AreEqual(NumRecs, count); + } + finally + { + luContext.SuspendThread(); + } + } + [Test] [Category(LockableUnsafeContextTestCategory)] [Category(SmokeTestCategory)] diff --git a/cs/test/UnsafeContextTests.cs b/cs/test/UnsafeContextTests.cs index 6313d0c94..6aa62e398 100644 --- a/cs/test/UnsafeContextTests.cs +++ b/cs/test/UnsafeContextTests.cs @@ -4,8 +4,10 @@ using System; using System.Diagnostics; using System.Linq; +using System.Threading.Tasks; using FASTER.core; using NUnit.Framework; +using static FASTER.test.TestUtils; namespace FASTER.test.UnsafeContext { @@ -19,21 +21,21 @@ internal class BasicUnsafeContextTests private UnsafeContext uContext; private IDevice log; private string path; - TestUtils.DeviceType deviceType; + DeviceType deviceType; [SetUp] public void Setup() { - path = TestUtils.MethodTestDir + "/"; + path = MethodTestDir + "/"; // Clean up log files from previous test runs in case they weren't cleaned up - TestUtils.DeleteDirectory(path, wait: true); + DeleteDirectory(path, wait: true); } - private void Setup(long size, LogSettings logSettings, TestUtils.DeviceType deviceType) + private void Setup(long size, LogSettings logSettings, DeviceType deviceType) { string filename = path + TestContext.CurrentContext.Test.Name + deviceType.ToString() + ".log"; - log = TestUtils.CreateTestDevice(deviceType, filename); + log = CreateTestDevice(deviceType, filename); logSettings.LogDevice = log; fht = new FasterKV(size, logSettings); fullSession = fht.For(new Functions()).NewSession(); @@ -51,7 +53,7 @@ public void TearDown() fht = null; log?.Dispose(); log = null; - TestUtils.DeleteDirectory(path); + DeleteDirectory(path); } private void AssertCompleted(Status expected, Status actual) @@ -64,13 +66,13 @@ private void AssertCompleted(Status expected, Status actual) private (Status status, OutputStruct output) CompletePendingResult() { uContext.CompletePendingWithOutputs(out var completedOutputs); - return TestUtils.GetSinglePendingResult(completedOutputs); + return GetSinglePendingResult(completedOutputs); } [Test] [Category("FasterKV")] [Category("Smoke")] - public void NativeInMemWriteRead([Values] TestUtils.DeviceType deviceType) + public void NativeInMemWriteRead([Values] DeviceType deviceType) { Setup(128, new LogSettings { PageSizeBits = 10, MemorySizeBits = 12, SegmentSizeBits = 22 }, deviceType); uContext.ResumeThread(); @@ -99,7 +101,7 @@ public void NativeInMemWriteRead([Values] TestUtils.DeviceType deviceType) [Test] [Category("FasterKV")] [Category("Smoke")] - public void NativeInMemWriteReadDelete([Values] TestUtils.DeviceType deviceType) + public void NativeInMemWriteReadDelete([Values] DeviceType deviceType) { Setup(128, new LogSettings { PageSizeBits = 10, MemorySizeBits = 12, SegmentSizeBits = 22 }, deviceType); uContext.ResumeThread(); @@ -144,7 +146,7 @@ public void NativeInMemWriteReadDelete([Values] TestUtils.DeviceType deviceType) public void NativeInMemWriteReadDelete2() { // Just set this one since Write Read Delete already does all four devices - deviceType = TestUtils.DeviceType.MLSD; + deviceType = DeviceType.MLSD; const int count = 10; @@ -201,7 +203,7 @@ public void NativeInMemWriteReadDelete2() public unsafe void NativeInMemWriteRead2() { // Just use this one instead of all four devices since InMemWriteRead covers all four devices - deviceType = TestUtils.DeviceType.MLSD; + deviceType = DeviceType.MLSD; int count = 200; @@ -261,7 +263,7 @@ public unsafe void NativeInMemWriteRead2() [Test] [Category("FasterKV")] [Category("Smoke")] - public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType) + public async Task TestShiftHeadAddress([Values] DeviceType deviceType, [Values] SyncMode syncMode) { InputStruct input = default; const int RandSeed = 10; @@ -281,7 +283,17 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType var i = r.Next(RandRange); var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; - uContext.Upsert(ref key1, ref value, Empty.Default, 0); + if (syncMode == SyncMode.Sync) + { + uContext.Upsert(ref key1, ref value, Empty.Default, 0); + } + else + { + uContext.SuspendThread(); + var status = (await uContext.UpsertAsync(ref key1, ref value)).Complete(); + uContext.ResumeThread(); + Assert.IsFalse(status.IsPending); + } } r = new Random(RandSeed); @@ -294,13 +306,33 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; - if (!uContext.Read(ref key1, ref input, ref output, Empty.Default, 0).IsPending) + Status status; + if (syncMode == SyncMode.Sync || (c % 1 == 0)) // in .Async mode, half the ops should be sync to test CompletePendingAsync + { + status = uContext.Read(ref key1, ref input, ref output, Empty.Default, 0); + } + else + { + uContext.SuspendThread(); + (status, output) = (await uContext.ReadAsync(ref key1, ref input)).Complete(); + uContext.ResumeThread(); + } + if (!status.IsPending) { Assert.AreEqual(value.vfield1, output.value.vfield1); Assert.AreEqual(value.vfield2, output.value.vfield2); } } - uContext.CompletePending(true); + if (syncMode == SyncMode.Sync) + { + uContext.CompletePending(true); + } + else + { + uContext.SuspendThread(); + await uContext.CompletePendingAsync(); + uContext.ResumeThread(); + } // Shift head and retry - should not find in main memory now fht.Log.FlushAndEvict(true); @@ -317,7 +349,18 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType Assert.IsTrue(foundStatus.IsPending); } - uContext.CompletePendingWithOutputs(out var outputs, wait: true); + CompletedOutputIterator outputs; + if (syncMode == SyncMode.Sync) + { + uContext.CompletePendingWithOutputs(out outputs, wait: true); + } + else + { + uContext.SuspendThread(); + outputs = await uContext.CompletePendingWithOutputsAsync(); + uContext.ResumeThread(); + } + int count = 0; while (outputs.Next()) { @@ -337,7 +380,7 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType [Test] [Category("FasterKV")] [Category("Smoke")] - public unsafe void NativeInMemRMWRefKeys([Values] TestUtils.DeviceType deviceType) + public unsafe void NativeInMemRMWRefKeys([Values] DeviceType deviceType) { InputStruct input = default; OutputStruct output = default; @@ -410,7 +453,7 @@ public unsafe void NativeInMemRMWRefKeys([Values] TestUtils.DeviceType deviceTyp // Tests the overload where no reference params used: key,input,userContext,serialNo [Test] [Category("FasterKV")] - public unsafe void NativeInMemRMWNoRefKeys([Values] TestUtils.DeviceType deviceType) + public unsafe void NativeInMemRMWNoRefKeys([Values] DeviceType deviceType) { InputStruct input = default; @@ -476,7 +519,7 @@ public unsafe void NativeInMemRMWNoRefKeys([Values] TestUtils.DeviceType deviceT [Test] [Category("FasterKV")] [Category("Smoke")] - public void ReadNoRefKeyInputOutput([Values] TestUtils.DeviceType deviceType) + public void ReadNoRefKeyInputOutput([Values] DeviceType deviceType) { InputStruct input = default; @@ -507,7 +550,7 @@ public void ReadNoRefKeyInputOutput([Values] TestUtils.DeviceType deviceType) // Test the overload call of .Read (key, out output, userContext, serialNo) [Test] [Category("FasterKV")] - public void ReadNoRefKey([Values] TestUtils.DeviceType deviceType) + public void ReadNoRefKey([Values] DeviceType deviceType) { Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); uContext.ResumeThread(); @@ -538,7 +581,7 @@ public void ReadNoRefKey([Values] TestUtils.DeviceType deviceType) [Test] [Category("FasterKV")] [Category("Smoke")] - public void ReadWithoutInput([Values] TestUtils.DeviceType deviceType) + public void ReadWithoutInput([Values] DeviceType deviceType) { Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); uContext.ResumeThread(); @@ -573,7 +616,7 @@ public void ReadWithoutInput([Values] TestUtils.DeviceType deviceType) public void ReadWithoutSerialID() { // Just checking without Serial ID so one device type is enough - deviceType = TestUtils.DeviceType.MLSD; + deviceType = DeviceType.MLSD; Setup(128, new LogSettings { MemorySizeBits = 29 }, deviceType); uContext.ResumeThread(); @@ -605,7 +648,7 @@ public void ReadWithoutSerialID() [Test] [Category("FasterKV")] [Category("Smoke")] - public void ReadBareMinParams([Values] TestUtils.DeviceType deviceType) + public void ReadBareMinParams([Values] DeviceType deviceType) { Setup(128, new LogSettings { MemorySizeBits = 22, SegmentSizeBits = 22, PageSizeBits = 10 }, deviceType); uContext.ResumeThread(); @@ -638,7 +681,7 @@ public void ReadBareMinParams([Values] TestUtils.DeviceType deviceType) public void ReadAtAddressReadFlagsNone() { // Just functional test of ReadFlag so one device is enough - deviceType = TestUtils.DeviceType.MLSD; + deviceType = DeviceType.MLSD; Setup(128, new LogSettings { MemorySizeBits = 29 }, deviceType); uContext.ResumeThread();