From 1c2a26d61dd5daa52401c2739631e7ba08530e38 Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Thu, 11 Nov 2021 11:53:48 -0500 Subject: [PATCH 1/2] [C#] Fix error behavior under frequent commit situations (#590) * Ensure that full flush list error callbacks do not break FlushedUntilAddress tracking * nit Co-authored-by: Badrish Chandramouli --- cs/src/core/Allocator/AllocatorBase.cs | 3 ++ cs/src/core/Epochs/LightEpoch.cs | 8 +--- cs/test/LogShiftTailStressTest.cs | 64 ++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 6 deletions(-) create mode 100644 cs/test/LogShiftTailStressTest.cs diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 37f745f50..f2b5e3e7a 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -1761,6 +1761,9 @@ public void AsyncFlushPages(long fromAddress, long untilAddress) } else { + // Because we are invoking the callback away from the usual codepath, need to externally + // ensure that flush address are updated in order + while (FlushedUntilAddress < asyncResult.fromAddress) Thread.Yield(); // Could not add to pending flush list, treat as a failed write AsyncFlushPageCallback(1, 0, asyncResult); } diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index d856a0530..0b5e07c22 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -309,7 +309,7 @@ public void BumpCurrentEpoch(Action onDrain) { int PriorEpoch = BumpCurrentEpoch() - 1; - int i = 0, j = 0; + int i = 0; while (true) { if (drainList[i].epoch == int.MaxValue) @@ -343,11 +343,7 @@ public void BumpCurrentEpoch(Action onDrain) { ProtectAndDrain(); i = 0; - if (++j == 500) - { - j = 0; - Debug.WriteLine("Delay finding a free entry in the drain list"); - } + Thread.Yield(); } } diff --git a/cs/test/LogShiftTailStressTest.cs b/cs/test/LogShiftTailStressTest.cs new file mode 100644 index 000000000..d5009600a --- /dev/null +++ b/cs/test/LogShiftTailStressTest.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using FASTER.core; +using NUnit.Framework; + +namespace FASTER.test +{ + [TestFixture] + internal class LogShiftTailStressTest : FasterLogTestBase + { + [SetUp] + public void Setup() => base.BaseSetup(); + + [TearDown] + public void TearDown() => base.BaseTearDown(); + + [Test] + [Category("FasterLog")] + public void FasterLogShiftTailStressTest() + { + // Get an excruciatingly slow storage device to maximize chance of clogging the flush pipeline + device = new LocalMemoryDevice(1L << 32, 1 << 30, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log"); + var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager}; + log = new FasterLog(logSettings); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + for (int i = 0; i < 5 * numEntries; i++) + log.Enqueue(entry); + + // for comparison, insert some entries without any commit records + var referenceTailLength = log.TailAddress; + + var enqueueDone = new ManualResetEventSlim(); + var commitThreads = new List(); + // Make sure to spin up many commit threads to expose lots of interleavings + for (var i = 0; i < 2 * Math.Max(1, Environment.ProcessorCount - 1); i++) + { + commitThreads.Add(new Thread(() => + { + // Otherwise, absolutely clog the commit pipeline + while (!enqueueDone.IsSet) + log.Commit(); + })); + } + + foreach (var t in commitThreads) + t.Start(); + for (int i = 0; i < 5 * numEntries; i++) + { + log.Enqueue(entry); + } + enqueueDone.Set(); + + foreach (var t in commitThreads) + t.Join(); + + // We expect the test to finish and not get stuck somewhere + } + } +} \ No newline at end of file From ef0775aac44188a35f8141bfda68d359ae6319dd Mon Sep 17 00:00:00 2001 From: Tianyu Li Date: Sat, 13 Nov 2021 13:24:40 -0500 Subject: [PATCH 2/2] reduce mem size to prevent out of mem error on test machine (#592) --- cs/test/LogShiftTailStressTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs/test/LogShiftTailStressTest.cs b/cs/test/LogShiftTailStressTest.cs index d5009600a..4633125b5 100644 --- a/cs/test/LogShiftTailStressTest.cs +++ b/cs/test/LogShiftTailStressTest.cs @@ -20,7 +20,7 @@ internal class LogShiftTailStressTest : FasterLogTestBase public void FasterLogShiftTailStressTest() { // Get an excruciatingly slow storage device to maximize chance of clogging the flush pipeline - device = new LocalMemoryDevice(1L << 32, 1 << 30, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log"); + device = new LocalMemoryDevice(1L << 30, 1 << 30, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log"); var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager}; log = new FasterLog(logSettings);