Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Fix error behavior under frequent commit situations #590

Merged
merged 2 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,9 @@ public void AsyncFlushPages(long fromAddress, long untilAddress)
}
else
{
// Because we are invoking the callback away from the usual codepath, need to externally
// ensure that flush address are updated in order
while (FlushedUntilAddress < asyncResult.fromAddress) Thread.Yield();
// Could not add to pending flush list, treat as a failed write
AsyncFlushPageCallback(1, 0, asyncResult);
}
Expand Down
8 changes: 2 additions & 6 deletions cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public void BumpCurrentEpoch(Action onDrain)
{
int PriorEpoch = BumpCurrentEpoch() - 1;

int i = 0, j = 0;
int i = 0;
while (true)
{
if (drainList[i].epoch == int.MaxValue)
Expand Down Expand Up @@ -343,11 +343,7 @@ public void BumpCurrentEpoch(Action onDrain)
{
ProtectAndDrain();
i = 0;
if (++j == 500)
{
j = 0;
Debug.WriteLine("Delay finding a free entry in the drain list");
}
Thread.Yield();
}
}

Expand Down
64 changes: 64 additions & 0 deletions cs/test/LogShiftTailStressTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Threading;
using FASTER.core;
using NUnit.Framework;

namespace FASTER.test
{
[TestFixture]
internal class LogShiftTailStressTest : FasterLogTestBase
{
[SetUp]
public void Setup() => base.BaseSetup();

[TearDown]
public void TearDown() => base.BaseTearDown();

[Test]
[Category("FasterLog")]
public void FasterLogShiftTailStressTest()
{
// Get an excruciatingly slow storage device to maximize chance of clogging the flush pipeline
device = new LocalMemoryDevice(1L << 32, 1 << 30, 2, sector_size: 512, latencyMs: 50, fileName: "stress.log");
var logSettings = new FasterLogSettings { LogDevice = device, LogChecksum = LogChecksumType.None, LogCommitManager = manager};
log = new FasterLog(logSettings);

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
entry[i] = (byte)i;

for (int i = 0; i < 5 * numEntries; i++)
log.Enqueue(entry);

// for comparison, insert some entries without any commit records
var referenceTailLength = log.TailAddress;

var enqueueDone = new ManualResetEventSlim();
var commitThreads = new List<Thread>();
// Make sure to spin up many commit threads to expose lots of interleavings
for (var i = 0; i < 2 * Math.Max(1, Environment.ProcessorCount - 1); i++)
{
commitThreads.Add(new Thread(() =>
{
// Otherwise, absolutely clog the commit pipeline
while (!enqueueDone.IsSet)
log.Commit();
}));
}

foreach (var t in commitThreads)
t.Start();
for (int i = 0; i < 5 * numEntries; i++)
{
log.Enqueue(entry);
}
enqueueDone.Set();

foreach (var t in commitThreads)
t.Join();

// We expect the test to finish and not get stuck somewhere
}
}
}