diff --git a/cs/src/core/Utilities/SimpleVersionScheme.cs b/cs/src/core/Utilities/SimpleVersionScheme.cs deleted file mode 100644 index 6be2412db..000000000 --- a/cs/src/core/Utilities/SimpleVersionScheme.cs +++ /dev/null @@ -1,114 +0,0 @@ -using System; -using System.Threading; -using FASTER.core; - -namespace FASTER.core -{ - /// - /// SimpleVersionScheme operates like a read-write latch that ensures protected code segments are not interleaved - /// with special, infrequent "version change" code segments that run sequentially. It is more scalable than a - /// typical reader-writer latch by taking advantage of the epoch protection framework and avoiding false sharing - /// in the common case. - /// - internal class SimpleVersionScheme - { - private LightEpoch epoch; - private long version = 1; - private ManualResetEventSlim versionChanged; - - /// - /// Creates a new SimpleVersionScheme - /// - public SimpleVersionScheme() - { - epoch = new LightEpoch(); - } - - /// - /// Returns the current version - /// - /// the current version - public long Version() - { - return Interlocked.Read(ref version); - } - - /// - /// Protects later processing from concurrent version changes until Leave() is called. Method may block if - /// version change is under way. Leave() must eventually be called on the same thread so the rest of the - /// system makes meaningful progress. - /// - /// - /// current version number. This guarantees that any version change logic from smaller version numbers - /// have finished, and not version change logic to larger versions will begin during protection. - /// - public long Enter() - { - epoch.Resume(); - // Temporarily block if a version change is under way --- depending on whether the thread observes - // versionChanged, they are either in the current version or the next - while (true) - { - var ev = versionChanged; - if (ev == null) break; - // Allow version change to complete by leaving this epoch. - epoch.Suspend(); - ev.Wait(); - epoch.Resume(); - } - - return Interlocked.Read(ref version); - } - - /// - /// Drops protection for a thread. - /// - public void Leave() - { - epoch.Suspend(); - } - - /// - /// Attempts to advance the version to the target version, executing the given action in a critical section - /// where no batches are being processed before entering the next version. Each version will be advanced to - /// exactly once. This method may fail and return false if given target version is not larger than the - /// current version (possibly due to concurrent invocations to advance to the same version). - /// After the method returns, subsequent calls to Version() and Enter() will return at least the value of - /// targetVersion. - /// - /// The logic to execute in a critical section - /// The version to advance to, or -1 for the immediate next version - /// Whether the advance was successful - public bool TryAdvanceVersion(Action criticalSection, long targetVersion = -1) - { - var ev = new ManualResetEventSlim(); - // Compare and exchange to install our advance - while (Interlocked.CompareExchange(ref versionChanged, ev, null) != null) {} - - if (targetVersion != -1 && targetVersion <= version) - { - versionChanged.Set(); - versionChanged = null; - return false; - } - - // Any thread that sees ev will be in v + 1, because the bump happens only after ev is set. - var original = Interlocked.Read(ref version); - epoch.BumpCurrentEpoch(() => - { - version = targetVersion == -1 ? original + 1 : targetVersion; - criticalSection(original, version); - versionChanged.Set(); - versionChanged = null; - }); - - // Make sure that even if we are the only thread, we are able to make progress - if (!epoch.ThisInstanceProtected()) - { - epoch.Resume(); - epoch.Suspend(); - } - return true; - } - } -} \ No newline at end of file diff --git a/cs/test/SimpleVersionSchemeTest.cs b/cs/test/SimpleVersionSchemeTest.cs index 4ac088ccd..83afdbe27 100644 --- a/cs/test/SimpleVersionSchemeTest.cs +++ b/cs/test/SimpleVersionSchemeTest.cs @@ -13,12 +13,12 @@ internal class SimpleVersionSchemeTest [Category("FasterLog")] public void SimpleTest() { - var tested = new SimpleVersionScheme(); + var tested = new EpochProtectedVersionScheme(new LightEpoch()); var protectedVal = 0; var v = tested.Enter(); - Assert.AreEqual(1, v); - tested.TryAdvanceVersion((_, _) => protectedVal = 1); + Assert.AreEqual(1, v.Version); + tested.TryAdvanceVersionWithCriticalSection((_, _) => protectedVal = 1); Thread.Sleep(10); // because of ongoing protection, nothing should happen yet tested.Leave(); @@ -27,7 +27,7 @@ public void SimpleTest() // Next thread sees new version v = tested.Enter(); - Assert.AreEqual(v, 2); + Assert.AreEqual(v.Version, 2); tested.Leave(); } @@ -35,21 +35,21 @@ public void SimpleTest() [Category("FasterLog")] public void SingleThreadTest() { - var tested = new SimpleVersionScheme(); + var tested = new EpochProtectedVersionScheme(new LightEpoch()); var protectedVal = 0; var v = tested.Enter(); - Assert.AreEqual(1, v); + Assert.AreEqual(1, v.Version); tested.Leave(); - tested.TryAdvanceVersion((_, _) => protectedVal = 1); + tested.TryAdvanceVersionWithCriticalSection((_, _) => protectedVal = 1); Assert.AreEqual(1, protectedVal); - tested.TryAdvanceVersion((_, _) => protectedVal = 2, 4); + tested.TryAdvanceVersionWithCriticalSection((_, _) => protectedVal = 2, 4); Assert.AreEqual(2, protectedVal); v = tested.Enter(); - Assert.AreEqual(4, v); + Assert.AreEqual(4, v.Version); tested.Leave(); } @@ -57,7 +57,7 @@ public void SingleThreadTest() [Category("FasterLog")] public void LargeConcurrentTest() { - var tested = new SimpleVersionScheme(); + var tested = new EpochProtectedVersionScheme(new LightEpoch()); var protectedVal = 1L; var termination = new ManualResetEventSlim(); @@ -71,7 +71,7 @@ public void LargeConcurrentTest() while (!termination.IsSet) { var v = tested.Enter(); - Assert.AreEqual(v, Interlocked.Read(ref protectedVal)); + Assert.AreEqual(v.Version, Interlocked.Read(ref protectedVal)); tested.Leave(); } }); @@ -81,7 +81,7 @@ public void LargeConcurrentTest() for (var i = 0; i < 1000; i++) { - tested.TryAdvanceVersion((vOld, vNew) => + tested.TryAdvanceVersionWithCriticalSection((vOld, vNew) => { Assert.AreEqual(vOld, Interlocked.Read(ref protectedVal)); // Flip sign to simulate critical section processing diff --git a/cs/test/SimulatedFlakyDevice.cs b/cs/test/SimulatedFlakyDevice.cs index 43d1ee74c..876910ef8 100644 --- a/cs/test/SimulatedFlakyDevice.cs +++ b/cs/test/SimulatedFlakyDevice.cs @@ -19,7 +19,7 @@ public class SimulatedFlakyDevice : StorageDeviceBase private ErrorSimulationOptions options; private ThreadLocal random; private List permanentlyFailedRangesStart, permanentlyFailedRangesEnd; - private SimpleVersionScheme versionScheme; + private EpochProtectedVersionScheme versionScheme; public SimulatedFlakyDevice(IDevice underlying, ErrorSimulationOptions options) : base(underlying.FileName, underlying.SectorSize, underlying.Capacity) { @@ -27,7 +27,7 @@ public SimulatedFlakyDevice(IDevice underlying, ErrorSimulationOptions options) this.options = options; permanentlyFailedRangesStart = new List(); permanentlyFailedRangesEnd = new List(); - versionScheme = new SimpleVersionScheme(); + versionScheme = new EpochProtectedVersionScheme(new LightEpoch()); random = new ThreadLocal(() => new Random()); } @@ -73,7 +73,7 @@ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong desti else if (random.Value.NextDouble() < options.writePermanentErrorRate) { callback(42, numBytesToWrite, context); - versionScheme.TryAdvanceVersion((_, _) => + versionScheme.TryAdvanceVersionWithCriticalSection((_, _) => { var index = permanentlyFailedRangesStart.BinarySearch(logicalDestStart); if (index >= 0) @@ -129,7 +129,7 @@ public override void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destin { callback(42, readLength, context); - versionScheme.TryAdvanceVersion((_, _) => + versionScheme.TryAdvanceVersionWithCriticalSection((_, _) => { var index = permanentlyFailedRangesStart.BinarySearch(logicalSrcStart); if (index >= 0)