From f0bf05e5d0dc209cf279543b2cbb28ccbac52f59 Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Sun, 7 Nov 2021 15:29:15 +0000 Subject: [PATCH 1/2] Ensure that full flush list error callbacks do not break FlushedUntilAddress tracking --- cs/src/core/Allocator/AllocatorBase.cs | 3 ++ cs/src/core/Epochs/LightEpoch.cs | 2 +- cs/test/LogShiftTailStressTest.cs | 64 ++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 cs/test/LogShiftTailStressTest.cs diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 37f745f50..f2b5e3e7a 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1761,6 +1761,9 @@ public void AsyncFlushPages(long fromAddress, long untilAddress) } else { + // Because we are invoking the callback away from the usual codepath, need to externally + // ensure that flush address are updated in order + while (FlushedUntilAddress < asyncResult.fromAddress) Thread.Yield(); // Could not add to pending flush list, treat as a failed write AsyncFlushPageCallback(1, 0, asyncResult); } diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index d856a0530..e2773a8a5 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -345,8 +345,8 @@ public void BumpCurrentEpoch(Action onDrain) i = 0; if (++j == 500) { + // Spin until there is a free entry in the drain list j = 0; - Debug.WriteLine("Delay finding a free entry in the drain list"); } } } diff --git a/cs/test/LogShiftTailStressTest.cs b/cs/test/LogShiftTailStressTest.cs new file mode 100644 index 000000000..d5009600a --- /dev/null +++ b/cs/test/LogShiftTailStressTest.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using FASTER.core; +using NUnit.Framework; + +namespace FASTER.test +{ + [TestFixture] + internal class LogShiftTailStressTest : FasterLogTestBase + { + [SetUp] + public void Setup() => base.BaseSetup(); + + [TearDown] + public void TearDown() => base.BaseTearDown(); + + [Test] + [Category("FasterLog")] + public void FasterLogShiftTailStressTest() + { + // Get an excruciatingly slow storage device to maximize chance of clogging the flush pipeline + device = new LocalMemoryDevice(1L << 32, 1 << 30, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log"); + var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager}; + log = new FasterLog(logSettings); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + for (int i = 0; i < 5 * numEntries; i++) + log.Enqueue(entry); + + // for comparison, insert some entries without any commit records + var referenceTailLength = log.TailAddress; + + var enqueueDone = new ManualResetEventSlim(); + var commitThreads = new List(); + // Make sure to spin up many commit threads to expose lots of interleavings + for (var i = 0; i < 2 * Math.Max(1, Environment.ProcessorCount - 1); i++) + { + commitThreads.Add(new Thread(() => + { + // Otherwise, absolutely clog the commit pipeline + while (!enqueueDone.IsSet) + log.Commit(); + })); + } + + foreach (var t in commitThreads) + t.Start(); + for (int i = 0; i < 5 * numEntries; i++) + { + log.Enqueue(entry); + } + enqueueDone.Set(); + + foreach (var t in commitThreads) + t.Join(); + + // We expect the test to finish and not get stuck somewhere + } + } +} \ No newline at end of file From ebafb5cfea5872bb7768656530760ed2a1b5f863 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 10 Nov 2021 19:19:05 -0800 Subject: [PATCH 2/2] nit --- cs/src/core/Epochs/LightEpoch.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index e2773a8a5..0b5e07c22 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -309,7 +309,7 @@ public void BumpCurrentEpoch(Action onDrain) { int PriorEpoch = BumpCurrentEpoch() - 1; - int i = 0, j = 0; + int i = 0; while (true) { if (drainList[i].epoch == int.MaxValue) @@ -343,11 +343,7 @@ public void BumpCurrentEpoch(Action onDrain) { ProtectAndDrain(); i = 0; - if (++j == 500) - { - // Spin until there is a free entry in the drain list - j = 0; - } + Thread.Yield(); } }