diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 37f745f50..f2b5e3e7a 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1761,6 +1761,9 @@ public void AsyncFlushPages(long fromAddress, long untilAddress) } else { + // Because we are invoking the callback away from the usual codepath, need to externally + // ensure that flush address are updated in order + while (FlushedUntilAddress < asyncResult.fromAddress) Thread.Yield(); // Could not add to pending flush list, treat as a failed write AsyncFlushPageCallback(1, 0, asyncResult); } diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index d856a0530..0b5e07c22 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -309,7 +309,7 @@ public void BumpCurrentEpoch(Action onDrain) { int PriorEpoch = BumpCurrentEpoch() - 1; - int i = 0, j = 0; + int i = 0; while (true) { if (drainList[i].epoch == int.MaxValue) @@ -343,11 +343,7 @@ public void BumpCurrentEpoch(Action onDrain) { ProtectAndDrain(); i = 0; - if (++j == 500) - { - j = 0; - Debug.WriteLine("Delay finding a free entry in the drain list"); - } + Thread.Yield(); } } diff --git a/cs/test/LogShiftTailStressTest.cs b/cs/test/LogShiftTailStressTest.cs new file mode 100644 index 000000000..d5009600a --- /dev/null +++ b/cs/test/LogShiftTailStressTest.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using FASTER.core; +using NUnit.Framework; + +namespace FASTER.test +{ + [TestFixture] + internal class LogShiftTailStressTest : FasterLogTestBase + { + [SetUp] + public void Setup() => base.BaseSetup(); + + [TearDown] + public void TearDown() => base.BaseTearDown(); + + [Test] + [Category("FasterLog")] + public void FasterLogShiftTailStressTest() + { + // Get an excruciatingly slow storage device to maximize chance of clogging the flush pipeline + device = new LocalMemoryDevice(1L << 32, 1 << 30, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log"); + var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager}; + log = new FasterLog(logSettings); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + for (int i = 0; i < 5 * numEntries; i++) + log.Enqueue(entry); + + // for comparison, insert some entries without any commit records + var referenceTailLength = log.TailAddress; + + var enqueueDone = new ManualResetEventSlim(); + var commitThreads = new List(); + // Make sure to spin up many commit threads to expose lots of interleavings + for (var i = 0; i < 2 * Math.Max(1, Environment.ProcessorCount - 1); i++) + { + commitThreads.Add(new Thread(() => + { + // Otherwise, absolutely clog the commit pipeline + while (!enqueueDone.IsSet) + log.Commit(); + })); + } + + foreach (var t in commitThreads) + t.Start(); + for (int i = 0; i < 5 * numEntries; i++) + { + log.Enqueue(entry); + } + enqueueDone.Set(); + + foreach (var t in commitThreads) + t.Join(); + + // We expect the test to finish and not get stuck somewhere + } + } +} \ No newline at end of file