diff --git a/cs/FASTER.sln b/cs/FASTER.sln index 35ea1ec57..299bd0af7 100644 --- a/cs/FASTER.sln +++ b/cs/FASTER.sln @@ -90,8 +90,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{C60F148B-2 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TstRunner", "playground\TstRunner\TstRunner.csproj", "{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EpvsSample", "samples\EpvsSample\EpvsSample.csproj", "{580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -291,14 +289,6 @@ Global {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|Any CPU.ActiveCfg = Release|x64 {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.ActiveCfg = Release|x64 {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.Build.0 = Release|x64 - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|Any CPU.Build.0 = Debug|Any CPU - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|x64.ActiveCfg = Debug|Any CPU - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Debug|x64.Build.0 = Debug|Any CPU - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|Any CPU.ActiveCfg = Release|Any CPU - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|Any CPU.Build.0 = Release|Any CPU - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|x64.ActiveCfg = Release|Any CPU - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -330,7 +320,6 @@ Global {9EFCF8C5-320B-473C-83DE-3815981D465B} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {E8C7FB0F-38B8-468A-B1CA-8793DF8F2693} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611} = {E6026D6A-01C5-4582-B2C1-64751490DABE} - {580E4D8A-8FDD-4B3A-8C6E-8F51E270BDB2} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC} diff --git a/cs/remote/FASTER.remote.sln b/cs/remote/FASTER.remote.sln index 85364f664..290299e1b 100644 --- a/cs/remote/FASTER.remote.sln +++ b/cs/remote/FASTER.remote.sln @@ -153,14 +153,6 @@ Global {2238A430-8D61-40A3-A23B-B1163A4CCBC6}.Release|Any CPU.Build.0 = Release|Any CPU {2238A430-8D61-40A3-A23B-B1163A4CCBC6}.Release|x64.ActiveCfg = Release|x64 {2238A430-8D61-40A3-A23B-B1163A4CCBC6}.Release|x64.Build.0 = Release|x64 - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|Any CPU.Build.0 = Debug|Any CPU - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|x64.ActiveCfg = Debug|Any CPU - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Debug|x64.Build.0 = Debug|Any CPU - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|Any CPU.ActiveCfg = Release|Any CPU - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|Any CPU.Build.0 = Release|Any CPU - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|x64.ActiveCfg = Release|Any CPU - {09E373B7-2C08-4EDD-AE68-8087A03AD490}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -177,7 +169,6 @@ Global {6A49ADD2-DC25-47E1-9D29-5DC6380E880A} = {6B8D1038-C9D5-4111-B5CE-BF64E7D12AE1} {2238A430-8D61-40A3-A23B-B1163A4CCBC6} = {8CF11B91-A6B6-4B81-AD43-2B07CF60F8FF} {8D1F793C-DA1E-4C75-B824-E510BB54534E} = {1065AE7E-DEA5-4E21-AE39-95B93C074B17} - {09E373B7-2C08-4EDD-AE68-8087A03AD490} = {6B8D1038-C9D5-4111-B5CE-BF64E7D12AE1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {FB603D60-F72D-4DAD-9349-442A45E20276} diff --git a/cs/samples/EpvsSample/EpvsBench.cs b/cs/samples/EpvsSample/EpvsBench.cs deleted file mode 100644 index ce895ad13..000000000 --- a/cs/samples/EpvsSample/EpvsBench.cs +++ /dev/null @@ -1,172 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Security.Cryptography; -using System.Threading; -using FASTER.core; - -namespace EpvsSample -{ - internal class EpvsBench - { - internal SemaphoreSlim testedLatch = null!; - internal EpochProtectedVersionScheme tested = null!; - internal byte[] hashBytes = null!; - - internal class Worker - { - private byte[] scratchPad; - - private HashAlgorithm hasher; - - // private long scratchPad; - private EpvsBench parent; - private List versionChangeIndexes; - private int numOps, versionChangeDelay, numaStyle, threadId; - - private byte syncMode; - - internal Worker(EpvsBench parent, Options options, Random random, int threadId) - { - hasher = new SHA256Managed(); - scratchPad = new byte[hasher.HashSize / 8]; - this.parent = parent; - versionChangeIndexes = new List(); - numOps = options.NumOps; - versionChangeDelay = options.VersionChangeDelay; - numaStyle = options.NumaStyle; - this.threadId = threadId; - - for (var i = 0; i < numOps; i++) - { - if (random.NextDouble() < options.VersionChangeProbability) - versionChangeIndexes.Add(i); - } - - switch (options.SynchronizationMode) - { - case "epvs": - syncMode = 1; - break; - case "epvs-refresh": - syncMode = 2; - break; - case "latch": - syncMode = 3; - break; - } - - } - - private void DoWork(int numUnits) - { - for (var i = 0; i < numUnits; i++) - hasher.TryComputeHash(parent.hashBytes, scratchPad, out _); - // scratchPad++; - } - - internal void RunOneThread() - { - if (numaStyle == 0) - Native32.AffinitizeThreadRoundRobin((uint) threadId); - else if (numaStyle == 1) - Native32.AffinitizeThreadShardedNuma((uint) threadId, 2); // assuming two NUMA sockets - - if (syncMode == 2) - parent.tested.Enter(); - - var nextChangeIndex = 0; - for (var i = 0; i < numOps; i++) - { - switch (syncMode) - { - case 1: - if (nextChangeIndex < versionChangeIndexes.Count && - i == versionChangeIndexes[nextChangeIndex]) - { - parent.tested.TryAdvanceVersionWithCriticalSection((_, _) => DoWork(versionChangeDelay)); - nextChangeIndex++; - } - else - { - parent.tested.Enter(); - DoWork(1); - parent.tested.Leave(); - } - break; - case 2: - if (nextChangeIndex < versionChangeIndexes.Count && - i == versionChangeIndexes[nextChangeIndex]) - { - while (parent.tested.TryAdvanceVersionWithCriticalSection((_, _) => - { - DoWork(versionChangeDelay); - }) == StateMachineExecutionStatus.RETRY) - parent.tested.Refresh(); - nextChangeIndex++; - } - else - { - parent.tested.Refresh(); - DoWork(1); - } - - break; - case 3: - parent.testedLatch.Wait(); - if (nextChangeIndex < versionChangeIndexes.Count && - i == versionChangeIndexes[nextChangeIndex]) - { - DoWork(versionChangeDelay); - nextChangeIndex++; - } - else - { - DoWork(1); - } - parent.testedLatch.Release(); - break; - default: - throw new NotImplementedException(); - } - } - - if (syncMode == 2) - parent.tested.Leave(); - } - } - - - internal void RunExperiment(Options options) - { - hashBytes = new byte[8]; - new Random().NextBytes(hashBytes); - tested = new EpochProtectedVersionScheme(new LightEpoch()); - testedLatch = new SemaphoreSlim(1, 1); - - var threads = new List(); - var random = new Random(); - for (var i = 0; i < options.NumThreads; i++) - { - var worker = new Worker(this, options, random, i); - var t = new Thread(() => worker.RunOneThread()); - threads.Add(t); - } - - var sw = Stopwatch.StartNew(); - foreach (var t in threads) - t.Start(); - foreach (var t in threads) - t.Join(); - var timeMilli = sw.ElapsedMilliseconds; - var throughput = options.NumOps * options.NumThreads * 1000.0 / timeMilli; - Console.WriteLine(throughput); - if (!options.OutputFile.Equals("")) - { - using var outputFile = new StreamWriter(options.OutputFile, true); - outputFile.WriteLine(throughput); - } - } - } -} \ No newline at end of file diff --git a/cs/samples/EpvsSample/EpvsSample.csproj b/cs/samples/EpvsSample/EpvsSample.csproj deleted file mode 100644 index 791e58a66..000000000 --- a/cs/samples/EpvsSample/EpvsSample.csproj +++ /dev/null @@ -1,17 +0,0 @@ - - - - Exe - net5.0 - enable - - - - - - - - - - - diff --git a/cs/samples/EpvsSample/ListExample.cs b/cs/samples/EpvsSample/ListExample.cs deleted file mode 100644 index f25100ab4..000000000 --- a/cs/samples/EpvsSample/ListExample.cs +++ /dev/null @@ -1,389 +0,0 @@ -using System; -using System.Threading; -using FASTER.core; - -namespace EpvsSample -{ - public interface IResizableList - { - public int Count(); - - public long Read(int index); - - public void Write(int index, long value); - - public int Push(long value); - } - - - public class SingleThreadedResizableList : IResizableList - { - private long[] list; - private int count; - - public SingleThreadedResizableList() - { - list = new long[16]; - count = 0; - } - - public int Count() => count; - - public long Read(int index) - { - if (index < 0 || index >= count) throw new IndexOutOfRangeException(); - return list[index]; - } - - public void Write(int index, long value) - { - if (index < 0 || index >= count) throw new IndexOutOfRangeException(); - list[index] = value; - } - - public int Push(long value) - { - if (count == list.Length) - { - var newList = new long[2 * count]; - Array.Copy(list, newList, list.Length); - list = newList; - } - - list[count] = value; - return count++; - } - } - - public class LatchedResizableList : IResizableList - { - private ReaderWriterLockSlim rwLatch; - private long[] list; - private int count; - - public LatchedResizableList() - { - rwLatch = new ReaderWriterLockSlim(); - list = new long[16]; - count = 0; - } - - public int Count() => Math.Min(count, list.Length); - - public long Read(int index) - { - if (index < 0) throw new IndexOutOfRangeException(); - try - { - rwLatch.EnterReadLock(); - if (index < list.Length) return list[index]; - if (index < count) return default; - throw new IndexOutOfRangeException(); - } - finally - { - rwLatch.ExitReadLock(); - } - } - - public void Write(int index, long value) - { - rwLatch.EnterReadLock(); - list[index] = value; - rwLatch.ExitReadLock(); - } - - private void Resize() - { - try - { - rwLatch.EnterWriteLock(); - var newList = new long[2 * list.Length]; - Array.Copy(list, newList, list.Length); - list = newList; - } - finally - { - rwLatch.ExitWriteLock(); - } - } - - public int Push(long value) - { - var result = Interlocked.Increment(ref count) - 1; - while (true) - { - if (result == list.Length) - Resize(); - try - { - rwLatch.EnterReadLock(); - if (result >= list.Length) continue; - list[result] = value; - return result; - } - finally - { - rwLatch.ExitReadLock(); - } - } - } - } - - public class SimpleVersionSchemeResizableList : IResizableList - { - private EpochProtectedVersionScheme epvs; - private long[] list; - private int count; - - public SimpleVersionSchemeResizableList() - { - epvs = new EpochProtectedVersionScheme(new LightEpoch()); - list = new long[16]; - count = 0; - } - - public int Count() => Math.Min(count, list.Length); - - public long Read(int index) - { - if (index < 0) throw new IndexOutOfRangeException(); - try - { - epvs.Enter(); - if (index < list.Length) return list[index]; - if (index < count) return default; - throw new IndexOutOfRangeException(); - } - finally - { - epvs.Leave(); - } - } - - public void Write(int index, long value) - { - try - { - epvs.Enter(); - if (index < 0 || index >= count) throw new IndexOutOfRangeException(); - list[index] = value; - } - finally - { - epvs.Leave(); - } - } - - private void Resize() - { - var newList = new long[2 * list.Length]; - Array.Copy(list, newList, list.Length); - list = newList; - } - - public int Push(long value) - { - try - { - var v = epvs.Enter(); - var result = Interlocked.Increment(ref count) - 1; - - while (true) - { - if (result < list.Length) - { - list[result] = value; - return result; - } - - epvs.Leave(); - if (result == list.Length) - epvs.AdvanceVersionWithCriticalSection((_, _) => Resize(), v.Version + 1); - Thread.Yield(); - v = epvs.Enter(); - } - } - finally - { - epvs.Leave(); - } - } - } - - public class ListGrowthStateMachine : VersionSchemeStateMachine - { - public const byte COPYING = 1; - private TwoPhaseResizableList obj; - private volatile bool copyDone = false; - - public ListGrowthStateMachine(TwoPhaseResizableList obj, long toVersion) : base(toVersion) - { - this.obj = obj; - } - - public override bool GetNextStep(VersionSchemeState currentState, out VersionSchemeState nextState) - { - switch (currentState.Phase) - { - case VersionSchemeState.REST: - nextState = VersionSchemeState.Make(COPYING, currentState.Version); - return true; - case COPYING: - nextState = VersionSchemeState.Make(VersionSchemeState.REST, actualToVersion); - return copyDone; - default: - throw new NotImplementedException(); - } - } - - public override void OnEnteringState(VersionSchemeState fromState, VersionSchemeState toState) - { - switch (fromState.Phase) - { - case VersionSchemeState.REST: - obj.newList = new long[obj.list.Length * 2]; - break; - case COPYING: - obj.list = obj.newList; - break; - default: - throw new NotImplementedException(); - } - } - - public override void AfterEnteringState(VersionSchemeState state) - { - if (state.Phase == COPYING) - { - Array.Copy(obj.list, obj.newList, obj.list.Length); - copyDone = true; - obj.epvs.SignalStepAvailable(); - } - } - } - - public class TwoPhaseResizableList : IResizableList - { - internal EpochProtectedVersionScheme epvs; - internal long[] list, newList; - internal int count; - - public TwoPhaseResizableList() - { - epvs = new EpochProtectedVersionScheme(new LightEpoch()); - list = new long[16]; - newList = list; - count = 0; - } - - // TODO(Tianyu): How to ensure this is correct in the face of concurrent pushes? - public int Count() => count; - - public long Read(int index) - { - if (index < 0) throw new IndexOutOfRangeException(); - try - { - var state = epvs.Enter(); - switch (state.Phase) - { - case VersionSchemeState.REST: - if (index < list.Length) return list[index]; - // element allocated but yet to be constructed - if (index < count) return default; - throw new IndexOutOfRangeException(); - case ListGrowthStateMachine.COPYING: - if (index < list.Length) return list[index]; - if (index < newList.Length) return newList[index]; - if (index < count) return default; - throw new IndexOutOfRangeException(); - default: - throw new NotImplementedException(); - } - } - finally - { - epvs.Leave(); - } - } - - public void Write(int index, long value) - { - try - { - var state = epvs.Enter(); - // Write operation is not allowed during copy because we don't know about the copy progress - while (state.Phase == ListGrowthStateMachine.COPYING) - { - state = epvs.Refresh(); - Thread.Yield(); - } - - if (index < 0 || index >= count) throw new IndexOutOfRangeException(); - list[index] = value; - } - finally - { - epvs.Leave(); - } - } - - public int Push(long value) - { - try - { - var result = Interlocked.Increment(ref count) - 1; - - var state = epvs.Enter(); - - // Write the entry into the correct underlying array - while (true) - { - if (state.Phase == VersionSchemeState.REST && result == list.Length) - { - epvs.Leave(); - // Use explicit versioning to prevent multiple list growth resulting from same full list state - epvs.ExecuteStateMachine(new ListGrowthStateMachine(this, state.Version + 1)); - state = epvs.Enter(); - } - - switch (state.Phase) - { - case VersionSchemeState.REST: - if (result >= list.Length) - { - epvs.Leave(); - Thread.Yield(); - state = epvs.Enter(); - continue; - } - - list[result] = value; - return result; - case ListGrowthStateMachine.COPYING: - // This was the copying phase of a previous state machine - if (result >= newList.Length) - { - epvs.Leave(); - Thread.Yield(); - state = epvs.Enter(); - continue; - } - - // Make sure to write update to old list if it belongs there in case copying erases new write - if (result < list.Length) - list[result] = value; - // Also write to new list in case copying was delayed - newList[result] = value; - return result; - } - } - } - finally - { - epvs.Leave(); - } - } - } -} \ No newline at end of file diff --git a/cs/samples/EpvsSample/Program.cs b/cs/samples/EpvsSample/Program.cs deleted file mode 100644 index bff98dbdb..000000000 --- a/cs/samples/EpvsSample/Program.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; -using CommandLine; - -namespace EpvsSample -{ - internal class Options - { - [Option('m', "synchronization-mode", Default = "epvs", - HelpText = "synchronization mode options:" + - "\n epvs" + - "\n epvs-refresh" + - "\n latch")] - public string SynchronizationMode { get; set; } = null!; - - [Option('o', "num-ops", Default = 1000000)] - public int NumOps { get; set; } - - [Option('t', "num-threads", Required = true)] - public int NumThreads { get; set; } - - [Option('n', "numa", Required = false, Default = 0, - HelpText = "NUMA options:" + - "\n 0 = No sharding across NUMA sockets" + - "\n 1 = Sharding across NUMA sockets")] - public int NumaStyle { get; set; } - - [Option('p', "probability", Default = 1e-6)] - public double VersionChangeProbability { get; set; } - - [Option('l', "delay", Default = 1)] - public int VersionChangeDelay { get; set; } - - - [Option('u', "output-file", Default = "")] - public string OutputFile { get; set; } = null!; - } - - - internal class Program - { - static void Main(string[] args) - { - var options = Parser.Default.ParseArguments(args).Value; - var bench = new EpvsBench(); - bench.RunExperiment(options); - } - } -} \ No newline at end of file diff --git a/cs/src/core/Epoch/EpochProtectedVersionScheme.cs b/cs/src/core/Epoch/EpochProtectedVersionScheme.cs deleted file mode 100644 index f512444cd..000000000 --- a/cs/src/core/Epoch/EpochProtectedVersionScheme.cs +++ /dev/null @@ -1,482 +0,0 @@ -using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Threading; - -namespace FASTER.core -{ - /// - /// The current state of a state-machine operation such as a checkpoint. - /// - [StructLayout(LayoutKind.Explicit, Size = 8)] - public struct VersionSchemeState - { - /// - /// Special value denoting that the version state machine is at rest in stable state - /// - public const byte REST = 0; - const int kTotalSizeInBytes = 8; - const int kTotalBits = kTotalSizeInBytes * 8; - - // Phase - const int kPhaseBits = 8; - const int kPhaseShiftInWord = kTotalBits - kPhaseBits; - const long kPhaseMaskInWord = ((1L << kPhaseBits) - 1) << kPhaseShiftInWord; - const long kPhaseMaskInInteger = (1L << kPhaseBits) - 1; - - // Version - const int kVersionBits = kPhaseShiftInWord; - const long kVersionMaskInWord = (1L << kVersionBits) - 1; - - /// Internal intermediate state of state machine - private const byte kIntermediateMask = 128; - - [FieldOffset(0)] internal long Word; - - /// - /// Custom phase marker denoting where in a state machine EPVS is in right now - /// - public byte Phase - { - get { return (byte) ((Word >> kPhaseShiftInWord) & kPhaseMaskInInteger); } - set - { - Word &= ~kPhaseMaskInWord; - Word |= (((long) value) & kPhaseMaskInInteger) << kPhaseShiftInWord; - } - } - - /// - /// whether EPVS is in intermediate state now (transitioning between two states) - public bool IsIntermediate() => (Phase & kIntermediateMask) != 0; - - /// - /// Version number of the current state - /// - public long Version - { - get { return Word & kVersionMaskInWord; } - set - { - Word &= ~kVersionMaskInWord; - Word |= value & kVersionMaskInWord; - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static VersionSchemeState Copy(ref VersionSchemeState other) - { - var info = default(VersionSchemeState); - info.Word = other.Word; - return info; - } - - /// - /// Make a state with the given phase and version - /// - /// - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static VersionSchemeState Make(byte phase, long version) - { - var info = default(VersionSchemeState); - info.Phase = phase; - info.Version = version; - return info; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static VersionSchemeState MakeIntermediate(VersionSchemeState state) - => Make((byte) (state.Phase | kIntermediateMask), state.Version); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static void RemoveIntermediate(ref VersionSchemeState state) - { - state.Phase = (byte) (state.Phase & ~kIntermediateMask); - } - - internal static bool Equal(VersionSchemeState s1, VersionSchemeState s2) - { - return s1.Word == s2.Word; - } - - /// - public override string ToString() - { - return $"[{Phase},{Version}]"; - } - - /// - /// Compare the current to for equality if obj is also a - /// - public override bool Equals(object obj) - { - return obj is VersionSchemeState other && Equals(other); - } - - /// - public override int GetHashCode() - { - return Word.GetHashCode(); - } - - /// - /// Compare the current to for equality - /// - private bool Equals(VersionSchemeState other) - { - return Word == other.Word; - } - - /// - /// Equals - /// - public static bool operator ==(VersionSchemeState left, VersionSchemeState right) - { - return left.Equals(right); - } - - /// - /// Not Equals - /// - public static bool operator !=(VersionSchemeState left, VersionSchemeState right) - { - return !(left == right); - } - } - - /// - /// A version state machine specifies a sequence of transitions to a new version - /// - public abstract class VersionSchemeStateMachine - { - private long toVersion; - /// - /// The actual version this state machine is advancing to, or -1 if not yet determined - /// - protected internal long actualToVersion = -1; - - /// - /// Constructs a new version state machine for transition to the given version - /// - /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version - protected VersionSchemeStateMachine(long toVersion = -1) - { - this.toVersion = toVersion; - actualToVersion = toVersion; - } - - /// - /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version - public long ToVersion() => toVersion; - - /// - /// Given the current state, compute the next state the version scheme should enter, if any. - /// - /// the current state - /// the next state, if any - /// whether a state transition is possible at this moment - public abstract bool GetNextStep(VersionSchemeState currentState, out VersionSchemeState nextState); - - /// - /// Code block to execute before entering a state. Guaranteed to execute in a critical section with mutual - /// exclusion with other transitions or EPVS-protected code regions - /// - /// the current state - /// the state transitioning to - public abstract void OnEnteringState(VersionSchemeState fromState, VersionSchemeState toState); - - /// - /// Code block to execute after entering the state. Execution here may interleave with other EPVS-protected - /// code regions. This can be used to collaborative perform heavyweight transition work without blocking - /// progress of other threads. - /// - /// the current state - public abstract void AfterEnteringState(VersionSchemeState state); - } - - internal class SimpleVersionSchemeStateMachine : VersionSchemeStateMachine - { - private Action criticalSection; - - public SimpleVersionSchemeStateMachine(Action criticalSection, long toVersion = -1) : base(toVersion) - { - this.criticalSection = criticalSection; - } - - public override bool GetNextStep(VersionSchemeState currentState, out VersionSchemeState nextState) - { - Debug.Assert(currentState.Phase == VersionSchemeState.REST); - nextState = VersionSchemeState.Make(VersionSchemeState.REST, ToVersion() == -1 ? currentState.Version + 1 : ToVersion()); - return true; - } - - public override void OnEnteringState(VersionSchemeState fromState, VersionSchemeState toState) - { - Debug.Assert(fromState.Phase == VersionSchemeState.REST && toState.Phase == VersionSchemeState.REST); - criticalSection(fromState.Version, toState.Version); - } - - public override void AfterEnteringState(VersionSchemeState state) {} - } - - /// - /// Status for state machine execution - /// - public enum StateMachineExecutionStatus - { - /// - /// execution successful - /// - OK, - /// - /// execution unsuccessful but may be retried - /// - RETRY, - /// - /// execution failed and should not be retried - /// - FAIL - } - - /// - /// Epoch Protected Version Scheme - /// - public class EpochProtectedVersionScheme - { - private LightEpoch epoch; - private VersionSchemeState state; - private VersionSchemeStateMachine currentMachine; - - /// - /// Construct a new EPVS backed by the given epoch framework. Multiple EPVS instances can share an underlying - /// epoch framework (WARNING: re-entrance is not yet supported, so nested protection of these shared instances - /// likely leads to error) - /// - /// The backing epoch protection framework - public EpochProtectedVersionScheme(LightEpoch epoch) - { - this.epoch = epoch; - state = VersionSchemeState.Make(VersionSchemeState.REST, 1); - currentMachine = null; - } - - /// - /// the current state - public VersionSchemeState CurrentState() => state; - - // Atomic transition from expectedState -> nextState - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool MakeTransition(VersionSchemeState expectedState, VersionSchemeState nextState) - { - if (Interlocked.CompareExchange(ref state.Word, nextState.Word, expectedState.Word) != - expectedState.Word) return false; - Debug.WriteLine("Moved to {0}, {1}", nextState.Phase, nextState.Version); - return true; - } - - /// - /// Enter protection on the current thread. During protection, no version transition is possible. For the system - /// to make progress, protection must be later relinquished on the same thread using Leave() or Refresh() - /// - /// the state of the EPVS as of protection, which extends until the end of protection - public VersionSchemeState Enter() - { - epoch.Resume(); - TryStepStateMachine(); - - VersionSchemeState result; - while (true) - { - result = state; - if (!result.IsIntermediate()) break; - epoch.Suspend(); - Thread.Yield(); - epoch.Resume(); - } - - return result; - } - - /// - /// Refreshes protection --- equivalent to dropping and immediately reacquiring protection, but more performant. - /// - /// the state of the EPVS as of protection, which extends until the end of protection - public VersionSchemeState Refresh() - { - epoch.ProtectAndDrain(); - VersionSchemeState result = default; - TryStepStateMachine(); - - while (true) - { - result = state; - if (!result.IsIntermediate()) break; - epoch.Suspend(); - Thread.Yield(); - epoch.Resume(); - } - return result; - } - - /// - /// Drop protection of the current thread - /// - public void Leave() - { - epoch.Suspend(); - } - - internal void TryStepStateMachine(VersionSchemeStateMachine expectedMachine = null) - { - var machineLocal = currentMachine; - var oldState = state; - - // Nothing to step - if (machineLocal == null) return; - - // Should exit to avoid stepping infinitely (until stack overflow) - if (expectedMachine != null && machineLocal != expectedMachine) return; - - // Still computing actual to version - if (machineLocal.actualToVersion == -1) return; - - // Machine finished, but not reset yet. Should reset and avoid starting another cycle - if (oldState.Phase == VersionSchemeState.REST && oldState.Version == machineLocal.actualToVersion) - { - Interlocked.CompareExchange(ref currentMachine, null, machineLocal); - return; - } - - // Step is in progress or no step is available - if (oldState.IsIntermediate() || !machineLocal.GetNextStep(oldState, out var nextState)) return; - - var intermediate = VersionSchemeState.MakeIntermediate(oldState); - if (!MakeTransition(oldState, intermediate)) return; - // Avoid upfront memory allocation by going to a function - StepMachineHeavy(machineLocal, oldState, nextState); - - // Ensure that state machine is able to make progress if this thread is the only active thread - if (!epoch.ThisInstanceProtected()) - { - epoch.Resume(); - epoch.Suspend(); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void StepMachineHeavy(VersionSchemeStateMachine machineLocal, VersionSchemeState old, VersionSchemeState next) - { - epoch.BumpCurrentEpoch(() => - { - machineLocal.OnEnteringState(old, next); - var success = MakeTransition(VersionSchemeState.MakeIntermediate(old), next); - machineLocal.AfterEnteringState(next); - Debug.Assert(success); - TryStepStateMachine(machineLocal); - }); - } - - /// - /// Signals to EPVS that a new step is available in the state machine. This is useful when the state machine - /// delays a step (e.g., while waiting on IO to complete) and invoked after the step is available, so the - /// state machine can make progress even without active threads entering and leaving the system. There is no - /// need to invoke this method if steps are always available. - /// - public void SignalStepAvailable() - { - TryStepStateMachine(); - } - - /// - /// Attempt to start executing the given state machine. - /// - /// state machine to execute - /// - /// whether the state machine is successfully started (OK), - /// cannot be started due to an active state machine (RETRY), - /// or cannot be started because the version has advanced past the target version specified (FAIL) - /// - public StateMachineExecutionStatus TryExecuteStateMachine(VersionSchemeStateMachine stateMachine) - { - if (stateMachine.ToVersion() != -1 && stateMachine.ToVersion() <= state.Version) return StateMachineExecutionStatus.FAIL; - var actualStateMachine = Interlocked.CompareExchange(ref currentMachine, stateMachine, null); - if (actualStateMachine == null) - { - // Compute the actual ToVersion of state machine - stateMachine.actualToVersion = - stateMachine.ToVersion() == -1 ? state.Version + 1 : stateMachine.ToVersion(); - // Trigger one initial step to begin the process - TryStepStateMachine(stateMachine); - return StateMachineExecutionStatus.OK; - } - - // Otherwise, need to check that we are not a duplicate attempt to increment version - if (stateMachine.ToVersion() != -1 && currentMachine.actualToVersion >= stateMachine.ToVersion()) - return StateMachineExecutionStatus.FAIL; - - return StateMachineExecutionStatus.RETRY; - } - - - /// - /// Start executing the given state machine - /// - /// state machine to start - /// whether to spin wait until version transition is complete - /// whether the state machine can be executed. If false, EPVS has advanced version past the target version specified - public bool ExecuteStateMachine(VersionSchemeStateMachine stateMachine, bool spin = false) - { - if (epoch.ThisInstanceProtected()) - throw new InvalidOperationException("unsafe to execute a state machine blockingly when under protection"); - StateMachineExecutionStatus status; - do - { - status = TryExecuteStateMachine(stateMachine); - } while (status == StateMachineExecutionStatus.RETRY); - - if (status != StateMachineExecutionStatus.OK) return false; - - if (spin) - { - while (state.Version != stateMachine.actualToVersion || state.Phase != VersionSchemeState.REST) - { - TryStepStateMachine(); - Thread.Yield(); - } - } - - return true; - } - - /// - /// Advance the version with a single critical section to the requested version. - /// - /// critical section to execute, with old version and new (target) version as arguments - /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version - /// - /// whether the state machine is successfully started (OK), - /// cannot be started due to an active state machine (RETRY), - /// or cannot be started because the version has advanced past the target version specified (FAIL) - /// - public StateMachineExecutionStatus TryAdvanceVersionWithCriticalSection(Action criticalSection, long targetVersion = -1) - { - return TryExecuteStateMachine(new SimpleVersionSchemeStateMachine(criticalSection, targetVersion)); - } - - /// - /// Advance the version with a single critical section to the requested version. - /// - /// critical section to execute, with old version and new (target) version as arguments - /// version to transition to, or -1 if unconditionally transitioning to an unspecified next version - /// whether to spin wait until version transition is complete - /// whether the state machine can be executed. If false, EPVS has advanced version past the target version specified - public bool AdvanceVersionWithCriticalSection(Action criticalSection, long targetVersion = -1, bool spin = false) - { - return ExecuteStateMachine(new SimpleVersionSchemeStateMachine(criticalSection, targetVersion), spin); - } - - } -} \ No newline at end of file diff --git a/cs/src/core/Epochs/FastThreadLocal.cs b/cs/src/core/Epochs/FastThreadLocal.cs new file mode 100644 index 000000000..12fe2e791 --- /dev/null +++ b/cs/src/core/Epochs/FastThreadLocal.cs @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Fast implementation of instance-thread-local variables + /// + /// + internal sealed class FastThreadLocal + { + // Max instances supported + private const int kMaxInstances = 128; + + [ThreadStatic] + private static T[] tl_values; + [ThreadStatic] + private static int[] tl_iid; + + private readonly int offset; + private readonly int iid; + + private static readonly int[] instances = new int[kMaxInstances]; + private static int instanceId = 0; + + public FastThreadLocal() + { + iid = Interlocked.Increment(ref instanceId); + + for (int i = 0; i < kMaxInstances; i++) + { + if (0 == Interlocked.CompareExchange(ref instances[i], iid, 0)) + { + offset = i; + return; + } + } + throw new FasterException("Unsupported number of simultaneous instances"); + } + + public void InitializeThread() + { + if (tl_values == null) + { + tl_values = new T[kMaxInstances]; + tl_iid = new int[kMaxInstances]; + } + if (tl_iid[offset] != iid) + { + tl_iid[offset] = iid; + tl_values[offset] = default(T); + } + } + + public void DisposeThread() + { + tl_values[offset] = default(T); + tl_iid[offset] = 0; + } + + /// + /// Dispose instance for all threads + /// + public void Dispose() + { + instances[offset] = 0; + } + + public T Value + { + get => tl_values[offset]; + set => tl_values[offset] = value; + } + + public bool IsInitializedForThread => (tl_values != null) && (iid == tl_iid[offset]); + } +} \ No newline at end of file diff --git a/cs/src/core/Epoch/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs similarity index 96% rename from cs/src/core/Epoch/LightEpoch.cs rename to cs/src/core/Epochs/LightEpoch.cs index b0d7bd6a3..6df99acb2 100644 --- a/cs/src/core/Epoch/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -14,7 +14,6 @@ namespace FASTER.core /// public unsafe sealed class LightEpoch { - private const int kCacheLineBytes = 64; /// /// Default invalid index entry. /// @@ -115,7 +114,7 @@ static LightEpoch() p = (long)threadIndexHandle.AddrOfPinnedObject(); #endif // Force the pointer to align to 64-byte boundaries - long p2 = (p + (kCacheLineBytes - 1)) & ~(kCacheLineBytes - 1); + long p2 = (p + (Constants.kCacheLineBytes - 1)) & ~(Constants.kCacheLineBytes - 1); threadIndexAligned = (Entry*)p2; } @@ -136,7 +135,7 @@ public LightEpoch() p = (long)tableHandle.AddrOfPinnedObject(); #endif // Force the pointer to align to 64-byte boundaries - long p2 = (p + (kCacheLineBytes - 1)) & ~(kCacheLineBytes - 1); + long p2 = (p + (Constants.kCacheLineBytes - 1)) & ~(Constants.kCacheLineBytes - 1); tableAligned = (Entry*)p2; CurrentEpoch = 1; @@ -501,23 +500,7 @@ static int ReserveEntry() } } } - - /// - /// A 32-bit murmur3 implementation. - /// - /// - /// - private static int Murmur3(int h) - { - uint a = (uint)h; - a ^= a >> 16; - a *= 0x85ebca6b; - a ^= a >> 13; - a *= 0xc2b2ae35; - a ^= a >> 16; - return (int)a; - } - + /// /// Allocate a new entry in epoch table. This is called /// once for a thread. @@ -528,7 +511,7 @@ static int ReserveEntryForThread() if (threadId == 0) // run once per thread for performance { threadId = Environment.CurrentManagedThreadId; - uint code = (uint)Murmur3(threadId); + uint code = (uint)Utility.Murmur3(threadId); startOffset1 = (ushort)(1 + (code % kTableSize)); startOffset2 = (ushort)(1 + ((code >> 16) % kTableSize)); } @@ -538,7 +521,7 @@ static int ReserveEntryForThread() /// /// Epoch table entry (cache line size). /// - [StructLayout(LayoutKind.Explicit, Size = kCacheLineBytes)] + [StructLayout(LayoutKind.Explicit, Size = Constants.kCacheLineBytes)] struct Entry { /// @@ -569,4 +552,4 @@ struct EpochActionPair public override string ToString() => $"epoch = {epoch}, action = {(action is null ? "n/a" : action.Method.ToString())}"; } } -} +} \ No newline at end of file diff --git a/cs/src/core/Utilities/Utility.cs b/cs/src/core/Utilities/Utility.cs index a62ddef05..dddf62c95 100644 --- a/cs/src/core/Utilities/Utility.cs +++ b/cs/src/core/Utilities/Utility.cs @@ -339,7 +339,24 @@ public static bool Is32Bit(long x) { return ((ulong)x < 4294967295ul); } - + + + /// + /// A 32-bit murmur3 implementation. + /// + /// + /// + internal static int Murmur3(int h) + { + uint a = (uint)h; + a ^= a >> 16; + a *= 0x85ebca6b; + a ^= a >> 13; + a *= 0xc2b2ae35; + a ^= a >> 16; + return (int)a; + } + /// /// Updates the variable to newValue only if the current value is smaller than the new value. ///