From 3908c99f3569df83005269b09fdad5e10e51d20d Mon Sep 17 00:00:00 2001 From: Ted Hart <15467143+TedHartMS@users.noreply.github.com> Date: Sun, 31 Jan 2021 18:53:44 -0800 Subject: [PATCH] Record locking (#394) * Add RecordInfo.SpinLock * Pass kInvalidAddress to SingleReader for readcache * IntExclusiveLocker does not require unsafe * Assert logicalAddress >= ReadOnlyAddress in RecordAccessor.SpinLock Co-authored-by: Badrish Chandramouli --- cs/src/core/Index/Common/RecordInfo.cs | 84 +++++++--- cs/src/core/Index/FASTER/FASTERImpl.cs | 16 +- cs/src/core/Index/FASTER/RecordAccessor.cs | 19 +++ cs/src/core/Utilities/IntExclusiveLocker.cs | 4 +- cs/test/LockTests.cs | 166 ++++++++++++++++++++ 5 files changed, 257 insertions(+), 32 deletions(-) create mode 100644 cs/test/LockTests.cs diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index f2fa9841d..0156a18f4 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -6,8 +6,10 @@ //#define RECORD_INFO_WITH_PIN_COUNT using System; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; +using System.Threading; namespace FASTER.core { @@ -18,7 +20,7 @@ namespace FASTER.core #endif public unsafe struct RecordInfo { - public const int kFinalBitOffset = 48; + public const int kLatchBitOffset = 48; public const int kTombstoneBitOffset = 49; @@ -34,7 +36,7 @@ public unsafe struct RecordInfo public const long kPreviousAddressMask = (1L << 48) - 1; - public const long kFinalBitMask = (1L << kFinalBitOffset); + public const long kLatchBitMask = (1L << kLatchBitOffset); public const long kTombstoneMask = (1L << kTombstoneBitOffset); @@ -51,10 +53,9 @@ public unsafe struct RecordInfo [FieldOffset(sizeof(long))] private int access_data; - public static void WriteInfo(RecordInfo* info, int checkpointVersion, bool final, bool tombstone, bool invalidBit, long previousAddress) + public static void WriteInfo(RecordInfo* info, int checkpointVersion, bool tombstone, bool invalidBit, long previousAddress) { info->word = default(long); - info->Final = final; info->Tombstone = tombstone; info->Invalid = invalidBit; info->PreviousAddress = previousAddress; @@ -104,10 +105,9 @@ public void Unpin() [FieldOffset(0)] private long word; - public static void WriteInfo(ref RecordInfo info, int checkpointVersion, bool final, bool tombstone, bool invalidBit, long previousAddress) + public static void WriteInfo(ref RecordInfo info, int checkpointVersion, bool tombstone, bool invalidBit, long previousAddress) { info.word = default; - info.Final = final; info.Tombstone = tombstone; info.Invalid = invalidBit; info.PreviousAddress = previousAddress; @@ -144,46 +144,86 @@ public void Unpin() throw new InvalidOperationException(); } #endif - public bool IsNull() - { - return word == 0; - } + /// + /// The RecordInfo locked by this thread, if any. + /// + [ThreadStatic] + internal static RecordInfo* threadLockedRecord; + + /// + /// The number of times the current thread has (re-)entered the lock. + /// + [ThreadStatic] + internal static int threadLockedRecordEntryCount; - public bool Tombstone + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void SpinLock() { - get + // Check for a re-entrant lock. + if (threadLockedRecord == (RecordInfo*)Unsafe.AsPointer(ref this)) { - return (word & kTombstoneMask) > 0; + Debug.Assert(threadLockedRecordEntryCount > 0); + ++threadLockedRecordEntryCount; + return; } - set + // RecordInfo locking is intended for use in concurrent callbacks only (ConcurrentReader, ConcurrentWriter, InPlaceUpdater), + // so only the RecordInfo for that callback should be locked. A different RecordInfo being locked implies a missing Unlock. + Debug.Assert(threadLockedRecord == null); + Debug.Assert(threadLockedRecordEntryCount == 0); + while (true) { - if (value) + long expected_word = word; + if ((expected_word & kLatchBitMask) == 0) { - word |= kTombstoneMask; + var found_word = Interlocked.CompareExchange(ref word, expected_word | kLatchBitMask, expected_word); + if (found_word == expected_word) + { + threadLockedRecord = (RecordInfo*)Unsafe.AsPointer(ref this); + threadLockedRecordEntryCount = 1; + return; + } } - else + Thread.Yield(); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Unlock() + { + Debug.Assert(threadLockedRecord == (RecordInfo*)Unsafe.AsPointer(ref this)); + if (threadLockedRecord == (RecordInfo*)Unsafe.AsPointer(ref this)) + { + Debug.Assert(threadLockedRecordEntryCount > 0); + if (--threadLockedRecordEntryCount == 0) { - word &= ~kTombstoneMask; + word &= ~kLatchBitMask; + threadLockedRecord = null; } } } - public bool Final + public bool IsNull() + { + return word == 0; + } + + public bool Tombstone { get { - return (word & kFinalBitMask) > 0; + return (word & kTombstoneMask) > 0; } + set { if (value) { - word |= kFinalBitMask; + word |= kTombstoneMask; } else { - word &= ~kFinalBitMask; + word &= ~kTombstoneMask; } } } diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 9edcd1090..69b8f0b76 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -116,7 +116,8 @@ internal OperationStatus InternalRead( } // This is not called when looking up by address, so we do not set pendingContext.recordInfo. - fasterSession.SingleReader(ref key, ref input, ref readcache.GetValue(physicalAddress), ref output, logicalAddress); + // ReadCache addresses are not valid for indexing etc. so pass kInvalidAddress. + fasterSession.SingleReader(ref key, ref input, ref readcache.GetValue(physicalAddress), ref output, Constants.kInvalidAddress); return OperationStatus.SUCCESS; } } @@ -433,7 +434,7 @@ internal OperationStatus InternalUpsert( var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), sessionCtx.version, - true, false, false, + tombstone:false, invalidBit:false, latestLogicalAddress); hlog.Serialize(ref key, newPhysicalAddress); fasterSession.SingleWriter(ref key, ref value, @@ -759,7 +760,7 @@ internal OperationStatus InternalRMW( BlockAllocate(allocatedSize, out long newLogicalAddress, sessionCtx, fasterSession); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), sessionCtx.version, - true, false, false, + tombstone:false, invalidBit:false, latestLogicalAddress); hlog.Serialize(ref key, newPhysicalAddress); @@ -1057,8 +1058,7 @@ internal OperationStatus InternalDelete( BlockAllocate(allocateSize, out long newLogicalAddress, sessionCtx, fasterSession); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), - sessionCtx.version, - true, true, false, + sessionCtx.version, tombstone:true, invalidBit:false, latestLogicalAddress); hlog.Serialize(ref key, newPhysicalAddress); @@ -1313,7 +1313,7 @@ internal void InternalContinuePendingReadCopyToTail GetRecordInfo(logicalAddress).Version; + /// + /// Locks the RecordInfo at address + /// + /// The address to examine + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void SpinLock(long logicalAddress) + { + Debug.Assert(logicalAddress >= this.fkv.Log.ReadOnlyAddress); + GetRecordInfo(logicalAddress).SpinLock(); + } + + /// + /// Unlocks the RecordInfo at address + /// + /// The address to examine + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Unlock(long logicalAddress) => GetRecordInfo(logicalAddress).Unlock(); + #endregion public interface } } diff --git a/cs/src/core/Utilities/IntExclusiveLocker.cs b/cs/src/core/Utilities/IntExclusiveLocker.cs index eb425a603..31f0c2d54 100644 --- a/cs/src/core/Utilities/IntExclusiveLocker.cs +++ b/cs/src/core/Utilities/IntExclusiveLocker.cs @@ -6,7 +6,7 @@ namespace FASTER.core /// /// Exclusive lock + marking using 2 MSB bits of int /// - internal unsafe struct IntExclusiveLocker + internal struct IntExclusiveLocker { const int kLatchBitMask = 1 << 31; const int kMarkBitMask = 1 << 30; @@ -30,7 +30,7 @@ public static void SpinLock(ref int value) [MethodImpl(MethodImplOptions.AggressiveInlining)] public static void Unlock(ref int value) { - value = value & ~kLatchBitMask; + value &= ~kLatchBitMask; } public static void Mark(ref int value) diff --git a/cs/test/LockTests.cs b/cs/test/LockTests.cs new file mode 100644 index 000000000..58e8ee748 --- /dev/null +++ b/cs/test/LockTests.cs @@ -0,0 +1,166 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using NUnit.Framework; +using System; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.test +{ + [TestFixture] + internal class LockTests + { + internal class Functions : AdvancedSimpleFunctions + { + private readonly RecordAccessor recordAccessor; + + internal Functions(RecordAccessor accessor) => this.recordAccessor = accessor; + + public override void ConcurrentReader(ref int key, ref int input, ref int value, ref int dst, long address) + { + this.recordAccessor.SpinLock(address); + dst = value; + this.recordAccessor.Unlock(address); + } + + bool LockAndIncrement(ref int dst, long address) + { + this.recordAccessor.SpinLock(address); + ++dst; + this.recordAccessor.Unlock(address); + return true; + } + + public override bool ConcurrentWriter(ref int key, ref int src, ref int dst, long address) => LockAndIncrement(ref dst, address); + + public override bool InPlaceUpdater(ref int key, ref int input, ref int value, long address) => LockAndIncrement(ref value, address); + } + + private FasterKV fkv; + private AdvancedClientSession session; + private IDevice log; + + [SetUp] + public void Setup() + { + log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "/GenericStringTests.log", deleteOnClose: true); + fkv = new FasterKV( 1L << 20, new LogSettings { LogDevice = log, ObjectLogDevice = null } ); + session = fkv.For(new Functions(fkv.RecordAccessor)).NewSession(); + } + + [TearDown] + public void TearDown() + { + session.Dispose(); + session = null; + fkv.Dispose(); + fkv = null; + log.Dispose(); + log = null; + } + + [Test] + public unsafe void RecordInfoLockTest() + { + // Re-entrancy check + static void checkLatch(RecordInfo* ptr, long count) + { + Assert.IsTrue(RecordInfo.threadLockedRecord == ptr); + Assert.IsTrue(RecordInfo.threadLockedRecordEntryCount == count); + } + RecordInfo recordInfo = new RecordInfo(); + RecordInfo* ri = (RecordInfo*)Unsafe.AsPointer(ref recordInfo); + checkLatch(null, 0); + recordInfo.SpinLock(); + checkLatch(ri, 1); + recordInfo.SpinLock(); + checkLatch(ri, 2); + recordInfo.Unlock(); + checkLatch(ri, 1); + recordInfo.Unlock(); + checkLatch(null, 0); + + XLockTest(() => recordInfo.SpinLock(), () => recordInfo.Unlock()); + } + + private void XLockTest(Action locker, Action unlocker) + { + long lockTestValue = 0; + const int numThreads = 50; + const int numIters = 5000; + + var tasks = Enumerable.Range(0, numThreads).Select(ii => Task.Factory.StartNew(XLockTestFunc)).ToArray(); + Task.WaitAll(tasks); + + Assert.AreEqual(numThreads * numIters, lockTestValue); + + void XLockTestFunc() + { + for (int ii = 0; ii < numIters; ++ii) + { + locker(); + var temp = lockTestValue; + Thread.Yield(); + lockTestValue = temp + 1; + unlocker(); + } + } + } + + [Test] + public void IntExclusiveLockerTest() + { + int lockTestValue = 0; + XLockTest(() => IntExclusiveLocker.SpinLock(ref lockTestValue), () => IntExclusiveLocker.Unlock(ref lockTestValue)); + } + + [Test] + public void AdvancedFunctionsLockTest() + { + // Populate + const int numRecords = 100; + const int valueMult = 1000000; + for (int key = 0; key < numRecords; key++) + { + // For this test we should be in-memory, so no pending + Assert.AreNotEqual(Status.PENDING, session.Upsert(key, key * valueMult)); + } + + // Update + const int numThreads = 20; + const int numIters = 500; + var tasks = Enumerable.Range(0, numThreads).Select(ii => Task.Factory.StartNew(() => UpdateFunc((ii & 1) == 0, numRecords, numIters))).ToArray(); + Task.WaitAll(tasks); + + // Verify + for (int key = 0; key < numRecords; key++) + { + var expectedValue = key * valueMult + numThreads * numIters; + Assert.AreNotEqual(Status.PENDING, session.Read(key, out int value)); + Assert.AreEqual(expectedValue, value); + } + } + + void UpdateFunc(bool useRMW, int numRecords, int numIters) + { + for (var key = 0; key < numRecords; ++key) + { + for (int iter = 0; iter < numIters; iter++) + { + if ((iter & 7) == 7) + Assert.AreNotEqual(Status.PENDING, session.Read(key)); + + // These will both just increment the stored value, ignoring the input argument. + if (useRMW) + session.RMW(key, default); + else + session.Upsert(key, default); + } + } + } + } +}