diff --git a/cs/playground/FasterLogSample/Program.cs b/cs/playground/FasterLogSample/Program.cs index 3fdd4d617..631848f44 100644 --- a/cs/playground/FasterLogSample/Program.cs +++ b/cs/playground/FasterLogSample/Program.cs @@ -1,20 +1,16 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -using FASTER.core; -using FASTER.core.log; using System; using System.Diagnostics; -using System.Diagnostics.Eventing.Reader; -using System.IO; -using System.Runtime.CompilerServices; using System.Threading; +using FASTER.core; namespace FasterLogSample { public class Program { - const int entryLength = 100; + const int entryLength = 96; static FasterLog log; static void ReportThread() diff --git a/cs/src/core/Epochs/LightEpoch.cs b/cs/src/core/Epochs/LightEpoch.cs index cd47f0aaf..8434f8964 100644 --- a/cs/src/core/Epochs/LightEpoch.cs +++ b/cs/src/core/Epochs/LightEpoch.cs @@ -9,9 +9,8 @@ namespace FASTER.core { - /// - /// + /// Epoch protection /// public unsafe class LightEpoch { @@ -37,6 +36,10 @@ public unsafe class LightEpoch private GCHandle tableHandle; private Entry* tableAligned; + private static Entry[] threadIndex; + private static GCHandle threadIndexHandle; + private static Entry* threadIndexAligned; + /// /// List of action, epoch pairs containing actions to performed /// when an epoch becomes safe to reclaim. @@ -44,11 +47,6 @@ public unsafe class LightEpoch private int drainCount = 0; private readonly EpochActionPair[] drainList = new EpochActionPair[kDrainListSize]; - /// - /// Number of entries in the epoch table - /// - private int numEntries; - /// /// A thread's entry in the epoch table. /// @@ -61,6 +59,9 @@ public unsafe class LightEpoch [ThreadStatic] private static int threadEntryIndexCount; + [ThreadStatic] + static int threadId; + /// /// Global current epoch value /// @@ -72,25 +73,28 @@ public unsafe class LightEpoch public int SafeToReclaimEpoch; /// - /// Instantiate the epoch table + /// Static constructor to setup shared cache-aligned space + /// to store per-entry count of instances using that entry /// - /// - public LightEpoch(int size = kTableSize) + static LightEpoch() { - Initialize(size); + // Over-allocate to do cache-line alignment + threadIndex = new Entry[kTableSize + 2]; + threadIndexHandle = GCHandle.Alloc(threadIndex, GCHandleType.Pinned); + long p = (long)threadIndexHandle.AddrOfPinnedObject(); + + // Force the pointer to align to 64-byte boundaries + long p2 = (p + (Constants.kCacheLineBytes - 1)) & ~(Constants.kCacheLineBytes - 1); + threadIndexAligned = (Entry*)p2; } /// - /// Initialize the epoch table + /// Instantiate the epoch table /// - /// - unsafe void Initialize(int size) + public LightEpoch() { - // threadEntryIndex = new FastThreadLocal(); - numEntries = size; - // Over-allocate to do cache-line alignment - tableRaw = new Entry[size + 2]; + tableRaw = new Entry[kTableSize + 2]; tableHandle = GCHandle.Alloc(tableRaw, GCHandleType.Pinned); long p = (long)tableHandle.AddrOfPinnedObject(); @@ -114,12 +118,8 @@ public void Dispose() tableHandle.Free(); tableAligned = null; tableRaw = null; - - numEntries = 0; CurrentEpoch = 1; SafeToReclaimEpoch = 0; - - // threadEntryIndex.Dispose(); } /// @@ -140,6 +140,7 @@ public int ProtectAndDrain() { int entry = threadEntryIndex; + (*(tableAligned + entry)).threadId = threadEntryIndex; (*(tableAligned + entry)).localCurrentEpoch = CurrentEpoch; if (drainCount > 0) @@ -180,6 +181,7 @@ private void Drain(int nextEpoch) /// /// Thread acquires its epoch entry /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Acquire() { if (threadEntryIndex == kInvalidIndex) @@ -191,20 +193,18 @@ public void Acquire() /// /// Thread releases its epoch entry /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Release() { int entry = threadEntryIndex; - if (kInvalidIndex == entry) - { - return; - } + (*(tableAligned + entry)).localCurrentEpoch = 0; + (*(tableAligned + entry)).threadId = 0; threadEntryIndexCount--; if (threadEntryIndexCount == 0) { + (threadIndexAligned + threadEntryIndex)->threadId = 0; threadEntryIndex = kInvalidIndex; - (*(tableAligned + entry)).localCurrentEpoch = 0; - (*(tableAligned + entry)).threadId = 0; } } @@ -214,7 +214,7 @@ public void Release() [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Suspend() { - (*(tableAligned + threadEntryIndex)).localCurrentEpoch = int.MaxValue; + Release(); } /// @@ -223,8 +223,7 @@ public void Suspend() [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Resume() { - if (threadEntryIndex == kInvalidIndex) - Acquire(); + Acquire(); ProtectAndDrain(); } @@ -307,7 +306,7 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch) { int oldestOngoingCall = currentEpoch; - for (int index = 1; index <= numEntries; ++index) + for (int index = 1; index <= kTableSize; ++index) { int entry_epoch = (*(tableAligned + index)).localCurrentEpoch; if (0 != entry_epoch) @@ -332,20 +331,20 @@ private int ComputeNewSafeToReclaimEpoch(int currentEpoch) /// Start index /// Thread id /// Reserved entry - private int ReserveEntry(int startIndex, int threadId) + private static int ReserveEntry(int startIndex, int threadId) { int current_iteration = 0; for (; ; ) { // Reserve an entry in the table. - for (int i = 0; i < numEntries; ++i) + for (int i = 0; i < kTableSize; ++i) { - int index_to_test = 1 + ((startIndex + i) & (numEntries - 1)); - if (0 == (*(tableAligned + index_to_test)).threadId) + int index_to_test = 1 + ((startIndex + i) & (kTableSize - 1)); + if (0 == (threadIndexAligned + index_to_test)->threadId) { bool success = (0 == Interlocked.CompareExchange( - ref (*(tableAligned + index_to_test)).threadId, + ref (threadIndexAligned+index_to_test)->threadId, threadId, 0)); if (success) @@ -356,7 +355,7 @@ private int ReserveEntry(int startIndex, int threadId) ++current_iteration; } - if (current_iteration > (numEntries * 3)) + if (current_iteration > (kTableSize * 10)) { throw new Exception("Unable to reserve an epoch entry, try increasing the epoch table size (kTableSize)"); } @@ -368,10 +367,13 @@ private int ReserveEntry(int startIndex, int threadId) /// once for a thread. /// /// Reserved entry - private int ReserveEntryForThread() + private static int ReserveEntryForThread() { - // for portability(run on non-windows platform) - int threadId = Environment.OSVersion.Platform == PlatformID.Win32NT ? (int)Native32.GetCurrentThreadId() : Thread.CurrentThread.ManagedThreadId; + if (threadId == 0) // run once per thread for performance + { + // For portability(run on non-windows platform) + threadId = Environment.OSVersion.Platform == PlatformID.Win32NT ? (int)Native32.GetCurrentThreadId() : Thread.CurrentThread.ManagedThreadId; + } int startIndex = Utility.Murmur3(threadId); return ReserveEntry(startIndex, threadId); } @@ -429,7 +431,7 @@ public bool MarkAndCheckIsComplete(int markerIdx, int version) (*(tableAligned + entry)).markers[markerIdx] = version; // check if all threads have reported complete - for (int index = 1; index <= numEntries; ++index) + for (int index = 1; index <= kTableSize; ++index) { int entry_epoch = (*(tableAligned + index)).localCurrentEpoch; int fc_version = (*(tableAligned + index)).markers[markerIdx]; diff --git a/cs/src/core/Index/FasterLog/FasterLog.cs b/cs/src/core/Index/FasterLog/FasterLog.cs index 01d6326ff..5435008ed 100644 --- a/cs/src/core/Index/FasterLog/FasterLog.cs +++ b/cs/src/core/Index/FasterLog/FasterLog.cs @@ -9,15 +9,14 @@ using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; -using FASTER.core; -namespace FASTER.core.log +namespace FASTER.core { /// /// FASTER log /// - public class FasterLog + public class FasterLog : IDisposable { private readonly BlittableAllocator allocator; private readonly LightEpoch epoch; @@ -43,11 +42,20 @@ public class FasterLog /// public FasterLog(FasterLogSettings logSettings) { - this.epoch = new LightEpoch(); + epoch = new LightEpoch(); allocator = new BlittableAllocator(logSettings.GetLogSettings(), null, null, epoch); allocator.Initialize(); } + /// + /// Dispose + /// + public void Dispose() + { + allocator.Dispose(); + epoch.Dispose(); + } + /// /// Append entry to log /// @@ -146,10 +154,21 @@ public FasterLogScanIterator Scan(long beginAddress, long endAddress, ScanBuffer } /// - /// Dispose this thread's epoch entry. Use when you manage your own - /// threads and want to recycle a thread-local epoch entry. + /// Create and pin epoch entry for this thread - use with ReleaseThread + /// if you manage the thread. + /// DO NOT USE WITH ASYNC CODE + /// + public void AcquireThread() + { + epoch.Acquire(); + } + + /// + /// Dispose epoch entry for this thread. Use with AcquireThread + /// if you manage the thread. + /// DO NOT USE WITH ASYNC CODE /// - public void DisposeThread() + public void ReleaseThread() { epoch.Release(); } diff --git a/cs/src/core/Index/FasterLog/FasterLogIterator.cs b/cs/src/core/Index/FasterLog/FasterLogIterator.cs index c97e82c9f..b09db13c3 100644 --- a/cs/src/core/Index/FasterLog/FasterLogIterator.cs +++ b/cs/src/core/Index/FasterLog/FasterLogIterator.cs @@ -5,7 +5,7 @@ using System.Threading; using System.Diagnostics; -namespace FASTER.core.log +namespace FASTER.core { /// /// Scan iterator for hybrid log @@ -36,7 +36,7 @@ public class FasterLogScanIterator : IDisposable /// /// /// - public unsafe FasterLogScanIterator(BlittableAllocator hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch) + internal unsafe FasterLogScanIterator(BlittableAllocator hlog, long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch) { this.allocator = hlog; this.epoch = epoch; diff --git a/cs/src/core/Index/FasterLog/FasterLogSettings.cs b/cs/src/core/Index/FasterLog/FasterLogSettings.cs index dc758b57b..1b70aa656 100644 --- a/cs/src/core/Index/FasterLog/FasterLogSettings.cs +++ b/cs/src/core/Index/FasterLog/FasterLogSettings.cs @@ -3,7 +3,7 @@ #pragma warning disable 0162 -namespace FASTER.core.log +namespace FASTER.core { /// /// FASTER Log Settings diff --git a/cs/test/FasterLogTests.cs b/cs/test/FasterLogTests.cs new file mode 100644 index 000000000..60542c9be --- /dev/null +++ b/cs/test/FasterLogTests.cs @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Linq; +using FASTER.core; +using NUnit.Framework; + +namespace FASTER.test +{ + + [TestFixture] + internal class FasterLogTests + { + const int entryLength = 100; + const int numEntries = 1000000; + private FasterLog log; + private IDevice device; + + + [SetUp] + public void Setup() + { + device = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log", deleteOnClose: true); + } + + [TearDown] + public void TearDown() + { + device.Close(); + } + + [Test] + public void FasterLogTest1() + { + log = new FasterLog(new FasterLogSettings { LogDevice = device }); + log.AcquireThread(); + + byte[] entry = new byte[entryLength]; + for (int i = 0; i < entryLength; i++) + entry[i] = (byte)i; + + for (int i = 0; i < numEntries; i++) + { + log.Append(entry); + } + log.Flush(true); + + using (var iter = log.Scan(0, long.MaxValue)) + { + int count = 0; + while (iter.GetNext(out Span result)) + { + count++; + Assert.IsTrue(result.SequenceEqual(entry)); + if (count % 100 == 0) + log.TruncateUntil(iter.CurrentAddress); + } + Assert.IsTrue(count == numEntries); + } + + log.ReleaseThread(); + log.Dispose(); + } + } +} diff --git a/cs/test/SimpleRecoveryTest.cs b/cs/test/SimpleRecoveryTest.cs index 4ced1f870..0f5d41689 100644 --- a/cs/test/SimpleRecoveryTest.cs +++ b/cs/test/SimpleRecoveryTest.cs @@ -63,10 +63,13 @@ public void SimpleRecoveryTest1() } fht1.TakeFullCheckpoint(out Guid token); fht1.CompleteCheckpoint(true); + + fht2.StartSession(); + fht1.StopSession(); fht2.Recover(token); - fht2.StartSession(); + for (int key = 0; key < numOps; key++) { var status = fht2.Read(ref inputArray[key], ref inputArg, ref output, Empty.Default, 0);