diff --git a/cs/FASTER.sln b/cs/FASTER.sln index 299bd0af7..35ea1ec57 100644 --- a/cs/FASTER.sln +++ b/cs/FASTER.sln @@ -90,6 +90,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{C60F148B-2 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TstRunner", "playground\TstRunner\TstRunner.csproj", "{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EpvsSample", "samples\EpvsSample\EpvsSample.csproj", "{580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -289,6 +291,14 @@ Global {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|Any CPU.ActiveCfg = Release|x64 {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.ActiveCfg = Release|x64 {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.Build.0 = Release|x64 + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|x64.ActiveCfg = Debug|Any CPU + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|x64.Build.0 = Debug|Any CPU + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|Any CPU.Build.0 = Release|Any CPU + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|x64.ActiveCfg = Release|Any CPU + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -320,6 +330,7 @@ Global {9EFCF8C5-320B-473C-83DE-3815981D465B} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {E8C7FB0F-38B8-468A-B1CA-8793DF8F2693} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611} = {E6026D6A-01C5-4582-B2C1-64751490DABE} + {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC} diff --git a/cs/remote/FASTER.remote.sln b/cs/remote/FASTER.remote.sln index 290299e1b..85364f664 100644 --- a/cs/remote/FASTER.remote.sln +++ b/cs/remote/FASTER.remote.sln @@ -153,6 +153,14 @@ Global {2238A430-8D61-40A3-A23B-B1163A4CCBC6}.Release|Any CPU.Build.0 = Release|Any CPU {2238A430-8D61-40A3-A23B-B1163A4CCBC6}.Release|x64.ActiveCfg = Release|x64 {2238A430-8D61-40A3-A23B-B1163A4CCBC6}.Release|x64.Build.0 = Release|x64 + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|Any CPU.Build.0 = Debug|Any CPU + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|x64.ActiveCfg = Debug|Any CPU + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|x64.Build.0 = Debug|Any CPU + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|Any CPU.ActiveCfg = Release|Any CPU + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|Any CPU.Build.0 = Release|Any CPU + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|x64.ActiveCfg = Release|Any CPU + {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -169,6 +177,7 @@ Global {6A49ADD2-DC25-47E1-9D29-5DC6380E880A} = {6B8D1038-C9D5-4111-B5CE-BF64E7D12AE1} {2238A430-8D61-40A3-A23B-B1163A4CCBC6} = {8CF11B91-A6B6-4B81-AD43-2B07CF60F8FF} {8D1F793C-DA1E-4C75-B824-E510BB54534E} = {1065AE7E-DEA5-4E21-AE39-95B93C074B17} + {09E373B7-2C08-4EDD-AE68-8087A03AD490} = {6B8D1038-C9D5-4111-B5CE-BF64E7D12AE1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {FB603D60-F72D-4DAD-9349-442A45E20276} diff --git a/cs/samples/EpvsSample/EpvsBench.cs b/cs/samples/EpvsSample/EpvsBench.cs new file mode 100644 index 000000000..ce895ad13 --- /dev/null +++ b/cs/samples/EpvsSample/EpvsBench.cs @@ -0,0 +1,172 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Security.Cryptography; +using System.Threading; +using FASTER.core; + +namespace EpvsSample +{ + internal class EpvsBench + { + internal SemaphoreSlim testedLatch = null!; + internal EpochProtectedVersionScheme tested = null!; + internal byte[] hashBytes = null!; + + internal class Worker + { + private byte[] scratchPad; + + private HashAlgorithm hasher; + + // private long scratchPad; + private EpvsBench parent; + private List versionChangeIndexes; + private int numOps, versionChangeDelay, numaStyle, threadId; + + private byte syncMode; + + internal Worker(EpvsBench parent, Options options, Random random, int threadId) + { + hasher = new SHA256Managed(); + scratchPad = new byte[hasher.HashSize / 8]; + this.parent = parent; + versionChangeIndexes = new List(); + numOps = options.NumOps; + versionChangeDelay = options.VersionChangeDelay; + numaStyle = options.NumaStyle; + this.threadId = threadId; + + for (var i = 0; i < numOps; i++) + { + if (random.NextDouble() < options.VersionChangeProbability) + versionChangeIndexes.Add(i); + } + + switch (options.SynchronizationMode) + { + case "epvs": + syncMode = 1; + break; + case "epvs-refresh": + syncMode = 2; + break; + case "latch": + syncMode = 3; + break; + } + + } + + private void DoWork(int numUnits) + { + for (var i = 0; i < numUnits; i++) + hasher.TryComputeHash(parent.hashBytes, scratchPad, out _); + // scratchPad++; + } + + internal void RunOneThread() + { + if (numaStyle == 0) + Native32.AffinitizeThreadRoundRobin((uint) threadId); + else if (numaStyle == 1) + Native32.AffinitizeThreadShardedNuma((uint) threadId, 2); // assuming two NUMA sockets + + if (syncMode == 2) + parent.tested.Enter(); + + var nextChangeIndex = 0; + for (var i = 0; i < numOps; i++) + { + switch (syncMode) + { + case 1: + if (nextChangeIndex < versionChangeIndexes.Count && + i == versionChangeIndexes[nextChangeIndex]) + { + parent.tested.TryAdvanceVersionWithCriticalSection((_, _) => DoWork(versionChangeDelay)); + nextChangeIndex++; + } + else + { + parent.tested.Enter(); + DoWork(1); + parent.tested.Leave(); + } + break; + case 2: + if (nextChangeIndex < versionChangeIndexes.Count && + i == versionChangeIndexes[nextChangeIndex]) + { + while (parent.tested.TryAdvanceVersionWithCriticalSection((_, _) => + { + DoWork(versionChangeDelay); + }) == StateMachineExecutionStatus.RETRY) + parent.tested.Refresh(); + nextChangeIndex++; + } + else + { + parent.tested.Refresh(); + DoWork(1); + } + + break; + case 3: + parent.testedLatch.Wait(); + if (nextChangeIndex < versionChangeIndexes.Count && + i == versionChangeIndexes[nextChangeIndex]) + { + DoWork(versionChangeDelay); + nextChangeIndex++; + } + else + { + DoWork(1); + } + parent.testedLatch.Release(); + break; + default: + throw new NotImplementedException(); + } + } + + if (syncMode == 2) + parent.tested.Leave(); + } + } + + + internal void RunExperiment(Options options) + { + hashBytes = new byte[8]; + new Random().NextBytes(hashBytes); + tested = new EpochProtectedVersionScheme(new LightEpoch()); + testedLatch = new SemaphoreSlim(1, 1); + + var threads = new List(); + var random = new Random(); + for (var i = 0; i < options.NumThreads; i++) + { + var worker = new Worker(this, options, random, i); + var t = new Thread(() => worker.RunOneThread()); + threads.Add(t); + } + + var sw = Stopwatch.StartNew(); + foreach (var t in threads) + t.Start(); + foreach (var t in threads) + t.Join(); + var timeMilli = sw.ElapsedMilliseconds; + var throughput = options.NumOps * options.NumThreads * 1000.0 / timeMilli; + Console.WriteLine(throughput); + if (!options.OutputFile.Equals("")) + { + using var outputFile = new StreamWriter(options.OutputFile, true); + outputFile.WriteLine(throughput); + } + } + } +} \ No newline at end of file diff --git a/cs/samples/EpvsSample/EpvsSample.csproj b/cs/samples/EpvsSample/EpvsSample.csproj new file mode 100644 index 000000000..791e58a66 --- /dev/null +++ b/cs/samples/EpvsSample/EpvsSample.csproj @@ -0,0 +1,17 @@ + + + + Exe + net5.0 + enable + + + + + + + + + + + diff --git a/cs/samples/EpvsSample/ListExample.cs b/cs/samples/EpvsSample/ListExample.cs new file mode 100644 index 000000000..f25100ab4 --- /dev/null +++ b/cs/samples/EpvsSample/ListExample.cs @@ -0,0 +1,389 @@ +using System; +using System.Threading; +using FASTER.core; + +namespace EpvsSample +{ + public interface IResizableList + { + public int Count(); + + public long Read(int index); + + public void Write(int index, long value); + + public int Push(long value); + } + + + public class SingleThreadedResizableList : IResizableList + { + private long[] list; + private int count; + + public SingleThreadedResizableList() + { + list = new long[16]; + count = 0; + } + + public int Count() => count; + + public long Read(int index) + { + if (index < 0 || index >= count) throw new IndexOutOfRangeException(); + return list[index]; + } + + public void Write(int index, long value) + { + if (index < 0 || index >= count) throw new IndexOutOfRangeException(); + list[index] = value; + } + + public int Push(long value) + { + if (count == list.Length) + { + var newList = new long[2 * count]; + Array.Copy(list, newList, list.Length); + list = newList; + } + + list[count] = value; + return count++; + } + } + + public class LatchedResizableList : IResizableList + { + private ReaderWriterLockSlim rwLatch; + private long[] list; + private int count; + + public LatchedResizableList() + { + rwLatch = new ReaderWriterLockSlim(); + list = new long[16]; + count = 0; + } + + public int Count() => Math.Min(count, list.Length); + + public long Read(int index) + { + if (index < 0) throw new IndexOutOfRangeException(); + try + { + rwLatch.EnterReadLock(); + if (index < list.Length) return list[index]; + if (index < count) return default; + throw new IndexOutOfRangeException(); + } + finally + { + rwLatch.ExitReadLock(); + } + } + + public void Write(int index, long value) + { + rwLatch.EnterReadLock(); + list[index] = value; + rwLatch.ExitReadLock(); + } + + private void Resize() + { + try + { + rwLatch.EnterWriteLock(); + var newList = new long[2 * list.Length]; + Array.Copy(list, newList, list.Length); + list = newList; + } + finally + { + rwLatch.ExitWriteLock(); + } + } + + public int Push(long value) + { + var result = Interlocked.Increment(ref count) - 1; + while (true) + { + if (result == list.Length) + Resize(); + try + { + rwLatch.EnterReadLock(); + if (result >= list.Length) continue; + list[result] = value; + return result; + } + finally + { + rwLatch.ExitReadLock(); + } + } + } + } + + public class SimpleVersionSchemeResizableList : IResizableList + { + private EpochProtectedVersionScheme epvs; + private long[] list; + private int count; + + public SimpleVersionSchemeResizableList() + { + epvs = new EpochProtectedVersionScheme(new LightEpoch()); + list = new long[16]; + count = 0; + } + + public int Count() => Math.Min(count, list.Length); + + public long Read(int index) + { + if (index < 0) throw new IndexOutOfRangeException(); + try + { + epvs.Enter(); + if (index < list.Length) return list[index]; + if (index < count) return default; + throw new IndexOutOfRangeException(); + } + finally + { + epvs.Leave(); + } + } + + public void Write(int index, long value) + { + try + { + epvs.Enter(); + if (index < 0 || index >= count) throw new IndexOutOfRangeException(); + list[index] = value; + } + finally + { + epvs.Leave(); + } + } + + private void Resize() + { + var newList = new long[2 * list.Length]; + Array.Copy(list, newList, list.Length); + list = newList; + } + + public int Push(long value) + { + try + { + var v = epvs.Enter(); + var result = Interlocked.Increment(ref count) - 1; + + while (true) + { + if (result < list.Length) + { + list[result] = value; + return result; + } + + epvs.Leave(); + if (result == list.Length) + epvs.AdvanceVersionWithCriticalSection((_, _) => Resize(), v.Version + 1); + Thread.Yield(); + v = epvs.Enter(); + } + } + finally + { + epvs.Leave(); + } + } + } + + public class ListGrowthStateMachine : VersionSchemeStateMachine + { + public const byte COPYING = 1; + private TwoPhaseResizableList obj; + private volatile bool copyDone = false; + + public ListGrowthStateMachine(TwoPhaseResizableList obj, long toVersion) : base(toVersion) + { + this.obj = obj; + } + + public override bool GetNextStep(VersionSchemeState currentState, out VersionSchemeState nextState) + { + switch (currentState.Phase) + { + case VersionSchemeState.REST: + nextState = VersionSchemeState.Make(COPYING, currentState.Version); + return true; + case COPYING: + nextState = VersionSchemeState.Make(VersionSchemeState.REST, actualToVersion); + return copyDone; + default: + throw new NotImplementedException(); + } + } + + public override void OnEnteringState(VersionSchemeState fromState, VersionSchemeState toState) + { + switch (fromState.Phase) + { + case VersionSchemeState.REST: + obj.newList = new long[obj.list.Length * 2]; + break; + case COPYING: + obj.list = obj.newList; + break; + default: + throw new NotImplementedException(); + } + } + + public override void AfterEnteringState(VersionSchemeState state) + { + if (state.Phase == COPYING) + { + Array.Copy(obj.list, obj.newList, obj.list.Length); + copyDone = true; + obj.epvs.SignalStepAvailable(); + } + } + } + + public class TwoPhaseResizableList : IResizableList + { + internal EpochProtectedVersionScheme epvs; + internal long[] list, newList; + internal int count; + + public TwoPhaseResizableList() + { + epvs = new EpochProtectedVersionScheme(new LightEpoch()); + list = new long[16]; + newList = list; + count = 0; + } + + // TODO(Tianyu): How to ensure this is correct in the face of concurrent pushes? + public int Count() => count; + + public long Read(int index) + { + if (index < 0) throw new IndexOutOfRangeException(); + try + { + var state = epvs.Enter(); + switch (state.Phase) + { + case VersionSchemeState.REST: + if (index < list.Length) return list[index]; + // element allocated but yet to be constructed + if (index < count) return default; + throw new IndexOutOfRangeException(); + case ListGrowthStateMachine.COPYING: + if (index < list.Length) return list[index]; + if (index < newList.Length) return newList[index]; + if (index < count) return default; + throw new IndexOutOfRangeException(); + default: + throw new NotImplementedException(); + } + } + finally + { + epvs.Leave(); + } + } + + public void Write(int index, long value) + { + try + { + var state = epvs.Enter(); + // Write operation is not allowed during copy because we don't know about the copy progress + while (state.Phase == ListGrowthStateMachine.COPYING) + { + state = epvs.Refresh(); + Thread.Yield(); + } + + if (index < 0 || index >= count) throw new IndexOutOfRangeException(); + list[index] = value; + } + finally + { + epvs.Leave(); + } + } + + public int Push(long value) + { + try + { + var result = Interlocked.Increment(ref count) - 1; + + var state = epvs.Enter(); + + // Write the entry into the correct underlying array + while (true) + { + if (state.Phase == VersionSchemeState.REST && result == list.Length) + { + epvs.Leave(); + // Use explicit versioning to prevent multiple list growth resulting from same full list state + epvs.ExecuteStateMachine(new ListGrowthStateMachine(this, state.Version + 1)); + state = epvs.Enter(); + } + + switch (state.Phase) + { + case VersionSchemeState.REST: + if (result >= list.Length) + { + epvs.Leave(); + Thread.Yield(); + state = epvs.Enter(); + continue; + } + + list[result] = value; + return result; + case ListGrowthStateMachine.COPYING: + // This was the copying phase of a previous state machine + if (result >= newList.Length) + { + epvs.Leave(); + Thread.Yield(); + state = epvs.Enter(); + continue; + } + + // Make sure to write update to old list if it belongs there in case copying erases new write + if (result < list.Length) + list[result] = value; + // Also write to new list in case copying was delayed + newList[result] = value; + return result; + } + } + } + finally + { + epvs.Leave(); + } + } + } +} \ No newline at end of file diff --git a/cs/samples/EpvsSample/Program.cs b/cs/samples/EpvsSample/Program.cs new file mode 100644 index 000000000..bff98dbdb --- /dev/null +++ b/cs/samples/EpvsSample/Program.cs @@ -0,0 +1,48 @@ +using System; +using CommandLine; + +namespace EpvsSample +{ + internal class Options + { + [Option('m', "synchronization-mode", Default = "epvs", + HelpText = "synchronization mode options:" + + "\n epvs" + + "\n epvs-refresh" + + "\n latch")] + public string SynchronizationMode { get; set; } = null!; + + [Option('o', "num-ops", Default = 1000000)] + public int NumOps { get; set; } + + [Option('t', "num-threads", Required = true)] + public int NumThreads { get; set; } + + [Option('n', "numa", Required = false, Default = 0, + HelpText = "NUMA options:" + + "\n 0 = No sharding across NUMA sockets" + + "\n 1 = Sharding across NUMA sockets")] + public int NumaStyle { get; set; } + + [Option('p', "probability", Default = 1e-6)] + public double VersionChangeProbability { get; set; } + + [Option('l', "delay", Default = 1)] + public int VersionChangeDelay { get; set; } + + + [Option('u', "output-file", Default = "")] + public string OutputFile { get; set; } = null!; + } + + + internal class Program + { + static void Main(string[] args) + { + var options = Parser.Default.ParseArguments(args).Value; + var bench = new EpvsBench(); + bench.RunExperiment(options); + } + } +} \ No newline at end of file diff --git a/cs/src/core/Epoch/EpochProtectedVersionScheme.cs b/cs/src/core/Epoch/EpochProtectedVersionScheme.cs new file mode 100644 index 000000000..f512444cd --- /dev/null +++ b/cs/src/core/Epoch/EpochProtectedVersionScheme.cs @@ -0,0 +1,482 @@ +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; + +namespace FASTER.core +{ + /// + /// The current state of a state-machine operation such as a checkpoint. + /// + [StructLayout(LayoutKind.Explicit, Size = 8)] + public struct VersionSchemeState + { + /// + /// Special value denoting that the version state machine is at rest in stable state + /// + public const byte REST = 0; + const int kTotalSizeInBytes = 8; + const int kTotalBits = kTotalSizeInBytes * 8; + + // Phase + const int kPhaseBits = 8; + const int kPhaseShiftInWord = kTotalBits - kPhaseBits; + const long kPhaseMaskInWord = ((1L << kPhaseBits) - 1) << kPhaseShiftInWord; + const long kPhaseMaskInInteger = (1L << kPhaseBits) - 1; + + // Version + const int kVersionBits = kPhaseShiftInWord; + const long kVersionMaskInWord = (1L << kVersionBits) - 1; + + /// Internal intermediate state of state machine + private const byte kIntermediateMask = 128; + + [FieldOffset(0)] internal long Word; + + /// + /// Custom phase marker denoting where in a state machine EPVS is in right now + /// + public byte Phase + { + get { return (byte) ((Word >> kPhaseShiftInWord) & kPhaseMaskInInteger); } + set + { + Word &= ~kPhaseMaskInWord; + Word |= (((long) value) & kPhaseMaskInInteger) << kPhaseShiftInWord; + } + } + + /// + /// whether EPVS is in intermediate state now (transitioning between two states) + public bool IsIntermediate() => (Phase & kIntermediateMask) != 0; + + /// + /// Version number of the current state + /// + public long Version + { + get { return Word & kVersionMaskInWord; } + set + { + Word &= ~kVersionMaskInWord; + Word |= value & kVersionMaskInWord; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static VersionSchemeState Copy(ref VersionSchemeState other) + { + var info = default(VersionSchemeState); + info.Word = other.Word; + return info; + } + + /// + /// Make a state with the given phase and version + /// + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static VersionSchemeState Make(byte phase, long version) + { + var info = default(VersionSchemeState); + info.Phase = phase; + info.Version = version; + return info; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static VersionSchemeState MakeIntermediate(VersionSchemeState state) + => Make((byte) (state.Phase | kIntermediateMask), state.Version); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static void RemoveIntermediate(ref VersionSchemeState state) + { + state.Phase = (byte) (state.Phase & ~kIntermediateMask); + } + + internal static bool Equal(VersionSchemeState s1, VersionSchemeState s2) + { + return s1.Word == s2.Word; + } + + /// + public override string ToString() + { + return $"[{Phase},{Version}]"; + } + + /// + /// Compare the current to for equality if obj is also a + /// + public override bool Equals(object obj) + { + return obj is VersionSchemeState other && Equals(other); + } + + /// + public override int GetHashCode() + { + return Word.GetHashCode(); + } + + /// + /// Compare the current to for equality + /// + private bool Equals(VersionSchemeState other) + { + return Word == other.Word; + } + + /// + /// Equals + /// + public static bool operator ==(VersionSchemeState left, VersionSchemeState right) + { + return left.Equals(right); + } + + /// + /// Not Equals + /// + public static bool operator !=(VersionSchemeState left, VersionSchemeState right) + { + return !(left == right); + } + } + + /// + /// A version state machine specifies a sequence of transitions to a new version + /// + public abstract class VersionSchemeStateMachine + { + private long toVersion; + /// + /// The actual version this state machine is advancing to, or -1 if not yet determined + /// + protected internal long actualToVersion = -1; + + /// + /// Constructs a new version state machine for transition to the given version + /// + /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version + protected VersionSchemeStateMachine(long toVersion = -1) + { + this.toVersion = toVersion; + actualToVersion = toVersion; + } + + /// + /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version + public long ToVersion() => toVersion; + + /// + /// Given the current state, compute the next state the version scheme should enter, if any. + /// + /// the current state + /// the next state, if any + /// whether a state transition is possible at this moment + public abstract bool GetNextStep(VersionSchemeState currentState, out VersionSchemeState nextState); + + /// + /// Code block to execute before entering a state. Guaranteed to execute in a critical section with mutual + /// exclusion with other transitions or EPVS-protected code regions + /// + /// the current state + /// the state transitioning to + public abstract void OnEnteringState(VersionSchemeState fromState, VersionSchemeState toState); + + /// + /// Code block to execute after entering the state. Execution here may interleave with other EPVS-protected + /// code regions. This can be used to collaborative perform heavyweight transition work without blocking + /// progress of other threads. + /// + /// the current state + public abstract void AfterEnteringState(VersionSchemeState state); + } + + internal class SimpleVersionSchemeStateMachine : VersionSchemeStateMachine + { + private Action criticalSection; + + public SimpleVersionSchemeStateMachine(Action criticalSection, long toVersion = -1) : base(toVersion) + { + this.criticalSection = criticalSection; + } + + public override bool GetNextStep(VersionSchemeState currentState, out VersionSchemeState nextState) + { + Debug.Assert(currentState.Phase == VersionSchemeState.REST); + nextState = VersionSchemeState.Make(VersionSchemeState.REST, ToVersion() == -1 ? currentState.Version + 1 : ToVersion()); + return true; + } + + public override void OnEnteringState(VersionSchemeState fromState, VersionSchemeState toState) + { + Debug.Assert(fromState.Phase == VersionSchemeState.REST && toState.Phase == VersionSchemeState.REST); + criticalSection(fromState.Version, toState.Version); + } + + public override void AfterEnteringState(VersionSchemeState state) {} + } + + /// + /// Status for state machine execution + /// + public enum StateMachineExecutionStatus + { + /// + /// execution successful + /// + OK, + /// + /// execution unsuccessful but may be retried + /// + RETRY, + /// + /// execution failed and should not be retried + /// + FAIL + } + + /// + /// Epoch Protected Version Scheme + /// + public class EpochProtectedVersionScheme + { + private LightEpoch epoch; + private VersionSchemeState state; + private VersionSchemeStateMachine currentMachine; + + /// + /// Construct a new EPVS backed by the given epoch framework. Multiple EPVS instances can share an underlying + /// epoch framework (WARNING: re-entrance is not yet supported, so nested protection of these shared instances + /// likely leads to error) + /// + /// The backing epoch protection framework + public EpochProtectedVersionScheme(LightEpoch epoch) + { + this.epoch = epoch; + state = VersionSchemeState.Make(VersionSchemeState.REST, 1); + currentMachine = null; + } + + /// + /// the current state + public VersionSchemeState CurrentState() => state; + + // Atomic transition from expectedState -> nextState + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool MakeTransition(VersionSchemeState expectedState, VersionSchemeState nextState) + { + if (Interlocked.CompareExchange(ref state.Word, nextState.Word, expectedState.Word) != + expectedState.Word) return false; + Debug.WriteLine("Moved to {0}, {1}", nextState.Phase, nextState.Version); + return true; + } + + /// + /// Enter protection on the current thread. During protection, no version transition is possible. For the system + /// to make progress, protection must be later relinquished on the same thread using Leave() or Refresh() + /// + /// the state of the EPVS as of protection, which extends until the end of protection + public VersionSchemeState Enter() + { + epoch.Resume(); + TryStepStateMachine(); + + VersionSchemeState result; + while (true) + { + result = state; + if (!result.IsIntermediate()) break; + epoch.Suspend(); + Thread.Yield(); + epoch.Resume(); + } + + return result; + } + + /// + /// Refreshes protection --- equivalent to dropping and immediately reacquiring protection, but more performant. + /// + /// the state of the EPVS as of protection, which extends until the end of protection + public VersionSchemeState Refresh() + { + epoch.ProtectAndDrain(); + VersionSchemeState result = default; + TryStepStateMachine(); + + while (true) + { + result = state; + if (!result.IsIntermediate()) break; + epoch.Suspend(); + Thread.Yield(); + epoch.Resume(); + } + return result; + } + + /// + /// Drop protection of the current thread + /// + public void Leave() + { + epoch.Suspend(); + } + + internal void TryStepStateMachine(VersionSchemeStateMachine expectedMachine = null) + { + var machineLocal = currentMachine; + var oldState = state; + + // Nothing to step + if (machineLocal == null) return; + + // Should exit to avoid stepping infinitely (until stack overflow) + if (expectedMachine != null && machineLocal != expectedMachine) return; + + // Still computing actual to version + if (machineLocal.actualToVersion == -1) return; + + // Machine finished, but not reset yet. Should reset and avoid starting another cycle + if (oldState.Phase == VersionSchemeState.REST && oldState.Version == machineLocal.actualToVersion) + { + Interlocked.CompareExchange(ref currentMachine, null, machineLocal); + return; + } + + // Step is in progress or no step is available + if (oldState.IsIntermediate() || !machineLocal.GetNextStep(oldState, out var nextState)) return; + + var intermediate = VersionSchemeState.MakeIntermediate(oldState); + if (!MakeTransition(oldState, intermediate)) return; + // Avoid upfront memory allocation by going to a function + StepMachineHeavy(machineLocal, oldState, nextState); + + // Ensure that state machine is able to make progress if this thread is the only active thread + if (!epoch.ThisInstanceProtected()) + { + epoch.Resume(); + epoch.Suspend(); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void StepMachineHeavy(VersionSchemeStateMachine machineLocal, VersionSchemeState old, VersionSchemeState next) + { + epoch.BumpCurrentEpoch(() => + { + machineLocal.OnEnteringState(old, next); + var success = MakeTransition(VersionSchemeState.MakeIntermediate(old), next); + machineLocal.AfterEnteringState(next); + Debug.Assert(success); + TryStepStateMachine(machineLocal); + }); + } + + /// + /// Signals to EPVS that a new step is available in the state machine. This is useful when the state machine + /// delays a step (e.g., while waiting on IO to complete) and invoked after the step is available, so the + /// state machine can make progress even without active threads entering and leaving the system. There is no + /// need to invoke this method if steps are always available. + /// + public void SignalStepAvailable() + { + TryStepStateMachine(); + } + + /// + /// Attempt to start executing the given state machine. + /// + /// state machine to execute + /// + /// whether the state machine is successfully started (OK), + /// cannot be started due to an active state machine (RETRY), + /// or cannot be started because the version has advanced past the target version specified (FAIL) + /// + public StateMachineExecutionStatus TryExecuteStateMachine(VersionSchemeStateMachine stateMachine) + { + if (stateMachine.ToVersion() != -1 && stateMachine.ToVersion() <= state.Version) return StateMachineExecutionStatus.FAIL; + var actualStateMachine = Interlocked.CompareExchange(ref currentMachine, stateMachine, null); + if (actualStateMachine == null) + { + // Compute the actual ToVersion of state machine + stateMachine.actualToVersion = + stateMachine.ToVersion() == -1 ? state.Version + 1 : stateMachine.ToVersion(); + // Trigger one initial step to begin the process + TryStepStateMachine(stateMachine); + return StateMachineExecutionStatus.OK; + } + + // Otherwise, need to check that we are not a duplicate attempt to increment version + if (stateMachine.ToVersion() != -1 && currentMachine.actualToVersion >= stateMachine.ToVersion()) + return StateMachineExecutionStatus.FAIL; + + return StateMachineExecutionStatus.RETRY; + } + + + /// + /// Start executing the given state machine + /// + /// state machine to start + /// whether to spin wait until version transition is complete + /// whether the state machine can be executed. If false, EPVS has advanced version past the target version specified + public bool ExecuteStateMachine(VersionSchemeStateMachine stateMachine, bool spin = false) + { + if (epoch.ThisInstanceProtected()) + throw new InvalidOperationException("unsafe to execute a state machine blockingly when under protection"); + StateMachineExecutionStatus status; + do + { + status = TryExecuteStateMachine(stateMachine); + } while (status == StateMachineExecutionStatus.RETRY); + + if (status != StateMachineExecutionStatus.OK) return false; + + if (spin) + { + while (state.Version != stateMachine.actualToVersion || state.Phase != VersionSchemeState.REST) + { + TryStepStateMachine(); + Thread.Yield(); + } + } + + return true; + } + + /// + /// Advance the version with a single critical section to the requested version. + /// + /// critical section to execute, with old version and new (target) version as arguments + /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version + /// + /// whether the state machine is successfully started (OK), + /// cannot be started due to an active state machine (RETRY), + /// or cannot be started because the version has advanced past the target version specified (FAIL) + /// + public StateMachineExecutionStatus TryAdvanceVersionWithCriticalSection(Action criticalSection, long targetVersion = -1) + { + return TryExecuteStateMachine(new SimpleVersionSchemeStateMachine(criticalSection, targetVersion)); + } + + /// + /// Advance the version with a single critical section to the requested version. + /// + /// critical section to execute, with old version and new (target) version as arguments + /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version + /// whether to spin wait until version transition is complete + /// whether the state machine can be executed. If false, EPVS has advanced version past the target version specified + public bool AdvanceVersionWithCriticalSection(Action criticalSection, long targetVersion = -1, bool spin = false) + { + return ExecuteStateMachine(new SimpleVersionSchemeStateMachine(criticalSection, targetVersion), spin); + } + + } +} \ No newline at end of file diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epoch/LightEpoch.cs similarity index 96% rename from cs/src/core/Epochs/LightEpoch.cs rename to cs/src/core/Epoch/LightEpoch.cs index 6df99acb2..b0d7bd6a3 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epoch/LightEpoch.cs @@ -14,6 +14,7 @@ namespace FASTER.core /// public unsafe sealed class LightEpoch { + private const int kCacheLineBytes = 64; /// /// Default invalid index entry. /// @@ -114,7 +115,7 @@ static LightEpoch() p = (long)threadIndexHandle.AddrOfPinnedObject(); #endif // Force the pointer to align to 64-byte boundaries - long p2 = (p + (Constants.kCacheLineBytes - 1)) & ~(Constants.kCacheLineBytes - 1); + long p2 = (p + (kCacheLineBytes - 1)) & ~(kCacheLineBytes - 1); threadIndexAligned = (Entry*)p2; } @@ -135,7 +136,7 @@ public LightEpoch() p = (long)tableHandle.AddrOfPinnedObject(); #endif // Force the pointer to align to 64-byte boundaries - long p2 = (p + (Constants.kCacheLineBytes - 1)) & ~(Constants.kCacheLineBytes - 1); + long p2 = (p + (kCacheLineBytes - 1)) & ~(kCacheLineBytes - 1); tableAligned = (Entry*)p2; CurrentEpoch = 1; @@ -500,7 +501,23 @@ static int ReserveEntry() } } } - + + /// + /// A 32-bit murmur3 implementation. + /// + /// + /// + private static int Murmur3(int h) + { + uint a = (uint)h; + a ^= a >> 16; + a *= 0x85ebca6b; + a ^= a >> 13; + a *= 0xc2b2ae35; + a ^= a >> 16; + return (int)a; + } + /// /// Allocate a new entry in epoch table. This is called /// once for a thread. @@ -511,7 +528,7 @@ static int ReserveEntryForThread() if (threadId == 0) // run once per thread for performance { threadId = Environment.CurrentManagedThreadId; - uint code = (uint)Utility.Murmur3(threadId); + uint code = (uint)Murmur3(threadId); startOffset1 = (ushort)(1 + (code % kTableSize)); startOffset2 = (ushort)(1 + ((code >> 16) % kTableSize)); } @@ -521,7 +538,7 @@ static int ReserveEntryForThread() /// /// Epoch table entry (cache line size). /// - [StructLayout(LayoutKind.Explicit, Size = Constants.kCacheLineBytes)] + [StructLayout(LayoutKind.Explicit, Size = kCacheLineBytes)] struct Entry { /// @@ -552,4 +569,4 @@ struct EpochActionPair public override string ToString() => $"epoch = {epoch}, action = {(action is null ? "n/a" : action.Method.ToString())}"; } } -} \ No newline at end of file +} diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs deleted file mode 100644 index 12fe2e791..000000000 --- a/cs/src/core/Epochs/FastThreadLocal.cs +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT license. - -using System; -using System.Threading; - -namespace FASTER.core -{ - /// - /// Fast implementation of instance-thread-local variables - /// - /// - internal sealed class FastThreadLocal - { - // Max instances supported - private const int kMaxInstances = 128; - - [ThreadStatic] - private static T[] tl_values; - [ThreadStatic] - private static int[] tl_iid; - - private readonly int offset; - private readonly int iid; - - private static readonly int[] instances = new int[kMaxInstances]; - private static int instanceId = 0; - - public FastThreadLocal() - { - iid = Interlocked.Increment(ref instanceId); - - for (int i = 0; i < kMaxInstances; i++) - { - if (0 == Interlocked.CompareExchange(ref instances[i], iid, 0)) - { - offset = i; - return; - } - } - throw new FasterException("Unsupported number of simultaneous instances"); - } - - public void InitializeThread() - { - if (tl_values == null) - { - tl_values = new T[kMaxInstances]; - tl_iid = new int[kMaxInstances]; - } - if (tl_iid[offset] != iid) - { - tl_iid[offset] = iid; - tl_values[offset] = default(T); - } - } - - public void DisposeThread() - { - tl_values[offset] = default(T); - tl_iid[offset] = 0; - } - - /// - /// Dispose instance for all threads - /// - public void Dispose() - { - instances[offset] = 0; - } - - public T Value - { - get => tl_values[offset]; - set => tl_values[offset] = value; - } - - public bool IsInitializedForThread => (tl_values != null) && (iid == tl_iid[offset]); - } -} \ No newline at end of file diff --git a/cs/src/core/Utilities/Utility.cs b/cs/src/core/Utilities/Utility.cs index dddf62c95..a62ddef05 100644 --- a/cs/src/core/Utilities/Utility.cs +++ b/cs/src/core/Utilities/Utility.cs @@ -339,24 +339,7 @@ public static bool Is32Bit(long x) { return ((ulong)x < 4294967295ul); } - - - /// - /// A 32-bit murmur3 implementation. - /// - /// - /// - internal static int Murmur3(int h) - { - uint a = (uint)h; - a ^= a >> 16; - a *= 0x85ebca6b; - a ^= a >> 13; - a *= 0xc2b2ae35; - a ^= a >> 16; - return (int)a; - } - + /// /// Updates the variable to newValue only if the current value is smaller than the new value. ///