diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index a7f36e139..dc633d054 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -13,7 +13,6 @@ namespace FASTER.core { - public partial class FasterKV : FasterBase, IFasterKV { @@ -23,7 +22,7 @@ public partial class FasterKV : FasterBase, /// /// Compares two keys /// - internal protected readonly IFasterEqualityComparer comparer; + internal readonly IFasterEqualityComparer comparer; internal readonly bool UseReadCache; private readonly CopyReadsToTail CopyReadsToTail; @@ -71,8 +70,8 @@ public partial class FasterKV : FasterBase, internal ConcurrentDictionary _recoveredSessions; - internal bool DisableLocking; - internal LockTable LockTable; + internal readonly bool DisableLocking; + internal readonly LockTable LockTable; internal long NumActiveLockingSessions = 0; internal void IncrementNumLockingSessions() diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index a8f1d8e8a..1de4cf4ad 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -241,7 +241,7 @@ internal OperationStatus InternalRead( if (CopyReadsToTail == CopyReadsToTail.FromReadOnly && !pendingContext.SkipCopyReadsToTail) { var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress)); - InternalTryCopyToTail(sessionCtx, ref pendingContext, ref key, ref input, ref container.Get(), ref output, logicalAddress, fasterSession, sessionCtx, WriteReason.Upsert); + InternalTryCopyToTail(sessionCtx, ref pendingContext, ref key, ref input, ref container.Get(), ref output, logicalAddress, fasterSession, sessionCtx, WriteReason.CopyToTail); container.Dispose(); } return OperationStatus.SUCCESS; @@ -701,9 +701,9 @@ private OperationStatus CreateNewRecordUpsert( if (!recordInfo.Tombstone) { - if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress, out bool lockFailed)) + if (fasterSession.InPlaceUpdater(ref key, ref input, ref recordValue, ref output, ref recordInfo, logicalAddress, out bool lockFailed)) { hlog.MarkPage(logicalAddress, sessionCtx.version); pendingContext.recordInfo = recordInfo; @@ -1175,7 +1175,6 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress)); else if (LockTable.IsActive) LockTable.TransferToLogRecord(ref key, ref recordInfo); - recordInfo.SetTentativeAtomic(false); // If IU, status will be NOTFOUND; return that. if (status != OperationStatus.SUCCESS) @@ -1186,19 +1185,23 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output, ref recordInfo, newLogicalAddress); pendingContext.recordInfo = recordInfo; pendingContext.logicalAddress = newLogicalAddress; - return status; } - - // Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op. - if (fasterSession.PostCopyUpdater(ref key, - ref input, ref hlog.GetValue(physicalAddress), - ref hlog.GetValue(newPhysicalAddress), - ref output, ref recordInfo, newLogicalAddress)) + else { - pendingContext.recordInfo = recordInfo; - pendingContext.logicalAddress = newLogicalAddress; - return status; + // Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op. + if (fasterSession.PostCopyUpdater(ref key, + ref input, ref hlog.GetValue(physicalAddress), + ref hlog.GetValue(newPhysicalAddress), + ref output, ref recordInfo, newLogicalAddress)) + { + pendingContext.recordInfo = recordInfo; + pendingContext.logicalAddress = newLogicalAddress; + } + else + status = OperationStatus.RETRY_NOW; } + recordInfo.SetTentativeAtomic(false); + return status; } else { @@ -1497,11 +1500,11 @@ internal OperationStatus InternalDelete( recordInfo.CopyLocksFrom(hlog.GetInfo(unsealPhysicalAddress)); else if (LockTable.IsActive) LockTable.TransferToLogRecord(ref key, ref recordInfo); - recordInfo.SetTentativeAtomic(false); // Note that this is the new logicalAddress; we have not retrieved the old one if it was below HeadAddress, and thus // we do not know whether 'logicalAddress' belongs to 'key' or is a collision. fasterSession.PostSingleDeleter(ref key, ref recordInfo, newLogicalAddress); + recordInfo.SetTentativeAtomic(false); pendingContext.recordInfo = recordInfo; pendingContext.logicalAddress = newLogicalAddress; status = OperationStatus.SUCCESS; @@ -2022,7 +2025,6 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), { if (LockTable.IsActive) LockTable.TransferToLogRecord(ref key, ref recordInfo); - recordInfo.SetTentativeAtomic(false); // If IU, status will be NOTFOUND; return that. if (status != OperationStatus.SUCCESS) @@ -2034,20 +2036,26 @@ ref hlog.GetValue(newPhysicalAddress), ref pendingContext.output, ref recordInfo, newLogicalAddress); pendingContext.recordInfo = recordInfo; pendingContext.logicalAddress = newLogicalAddress; - return status; } - - // Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op. - if (fasterSession.PostCopyUpdater(ref key, - ref pendingContext.input.Get(), - ref hlog.GetContextRecordValue(ref request), - ref hlog.GetValue(newPhysicalAddress), - ref pendingContext.output, ref recordInfo, newLogicalAddress)) + else { - pendingContext.recordInfo = recordInfo; - pendingContext.logicalAddress = newLogicalAddress; - return status; + + // Else it was a CopyUpdater so call PCU; if PCU returns true, return success, else retry op. + if (fasterSession.PostCopyUpdater(ref key, + ref pendingContext.input.Get(), + ref hlog.GetContextRecordValue(ref request), + ref hlog.GetValue(newPhysicalAddress), + ref pendingContext.output, ref recordInfo, newLogicalAddress)) + { + pendingContext.recordInfo = recordInfo; + pendingContext.logicalAddress = newLogicalAddress; + } + else + status = OperationStatus.RETRY_NOW; } + + recordInfo.SetTentativeAtomic(false); + return status; } else { @@ -2474,7 +2482,7 @@ internal OperationStatus InternalTryCopyToTail (1L << 10, diff --git a/cs/test/ReadCacheChainTests.cs b/cs/test/ReadCacheChainTests.cs index 2d1e081fa..3072e85ac 100644 --- a/cs/test/ReadCacheChainTests.cs +++ b/cs/test/ReadCacheChainTests.cs @@ -6,7 +6,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading; namespace FASTER.test.ReadCacheTests { @@ -101,7 +100,7 @@ void CreateChain(bool immutable = false) Assert.AreEqual((immutable && key >= immutableSplitKey) ? Status.OK : Status.PENDING, status); session.CompletePending(wait: true); if (ii == 0) - readCacheHighEvictionAddress = fht.readcache.GetTailAddress(); + readCacheHighEvictionAddress = fht.ReadCache.TailAddress; } // Pass2: non-PENDING reads from the cache @@ -131,7 +130,7 @@ internal static unsafe (long logicalAddress, long physicalAddress) GetHashChain( var bucket = default(HashBucket*); var slot = default(int); - var hash = fht.comparer.GetHashCode64(ref key); + var hash = fht.Comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); var entry = default(HashBucketEntry); @@ -155,6 +154,7 @@ internal static (long logicalAddress, long physicalAddress) NextInChain(FasterKV var log = isReadCache ? fht.readcache : fht.hlog; var info = log.GetInfo(physicalAddress); var la = info.PreviousAddress; + isReadCache = new HashBucketEntry { word = la }.ReadCache; log = isReadCache ? fht.readcache : fht.hlog; var pa = log.GetPhysicalAddress(la); @@ -199,9 +199,9 @@ internal static (long logicalAddress, long physicalAddress) NextInChain(FasterKV internal static (long logicalAddress, long physicalAddress) SkipReadCacheChain(FasterKV fht, int key) { - var (la, pa) = ChainTests.GetHashChain(fht, key, out _, out _, out bool isReadCache); + var (la, pa) = GetHashChain(fht, key, out _, out _, out bool isReadCache); while (isReadCache) - (la, pa) = ChainTests.NextInChain(fht, pa, out _, out _, ref isReadCache); + (la, pa) = NextInChain(fht, pa, out _, out _, ref isReadCache); return (la, pa); } @@ -249,7 +249,7 @@ void doTest(int key) doTest(midChainKey); ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: false); - fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress); + fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress); ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: true); } @@ -280,7 +280,7 @@ void doTest(int key) Assert.IsTrue(isReadCache); Assert.IsTrue(invalid); - fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress); + fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress); _ = GetHashChain(lowChainKey, out actualKey, out invalid, out isReadCache); Assert.IsFalse(isReadCache); Assert.IsFalse(invalid); @@ -338,7 +338,7 @@ void doTest(int key) doTest(midChainKey); ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: false); - fht.ReadCacheEvict(fht.readcache.BeginAddress, readCacheHighEvictionAddress); + fht.ReadCacheEvict(fht.ReadCache.BeginAddress, readCacheHighEvictionAddress); ScanReadCacheChain(new[] { lowChainKey, midChainKey, highChainKey }, evicted: true); } diff --git a/cs/test/SingleWriterTests.cs b/cs/test/SingleWriterTests.cs index fa581c70e..3a366c996 100644 --- a/cs/test/SingleWriterTests.cs +++ b/cs/test/SingleWriterTests.cs @@ -10,18 +10,18 @@ namespace FASTER.test.SingleWriter { internal class SingleWriterTestFunctions : SimpleFunctions { - internal int actualExecuted = 0; + internal WriteReason actualReason; public override void SingleWriter(ref int key, ref int input, ref int src, ref int dst, ref int output, ref RecordInfo recordInfo, long address, WriteReason reason) { Assert.AreEqual((WriteReason)input, reason); - actualExecuted |= 1 << input; + actualReason = reason; } public override void PostSingleWriter(ref int key, ref int input, ref int src, ref int dst, ref int output, ref RecordInfo recordInfo, long address, WriteReason reason) { Assert.AreEqual((WriteReason)input, reason); - actualExecuted |= 1 << input; + actualReason = reason; } } @@ -29,6 +29,7 @@ class SingleWriterTests { const int numRecords = 1000; const int valueMult = 1_000_000; + const WriteReason NoReason = (WriteReason)(-1); SingleWriterTestFunctions functions; @@ -43,8 +44,17 @@ public void Setup() log = Devices.CreateLogDevice(Path.Combine(MethodTestDir, "test.log"), deleteOnClose: true); functions = new SingleWriterTestFunctions(); + ReadCacheSettings readCacheSettings = default; + foreach (var arg in TestContext.CurrentContext.Test.Arguments) + { + if (arg is ReadCopyDestination dest) + { + if (dest == ReadCopyDestination.ReadCache) + readCacheSettings = new() { PageSizeBits = 12, MemorySizeBits = 22 }; + break; + } + } - ReadCacheSettings readCacheSettings = new() { PageSizeBits = 12, MemorySizeBits = 22 }; fht = new FasterKV(1L << 20, new LogSettings { LogDevice = log, ObjectLogDevice = null, PageSizeBits = 12, MemorySizeBits = 22, ReadCacheSettings = readCacheSettings, CopyReadsToTail = CopyReadsToTail.FromStorage }); session = fht.For(functions).NewSession(); } @@ -70,39 +80,40 @@ void Populate() } [Test] - [Category(LockableUnsafeContextTestCategory)] + [Category(FasterKVTestCategory)] [Category(SmokeTestCategory)] - public void SingleWriterReasonsTest() + public void SingleWriterReasonsTest([Values] ReadCopyDestination readCopyDestination) { - int expectedExecuted = 0; - - expectedExecuted |= 1 << (int)WriteReason.Upsert; + functions.actualReason = NoReason; Populate(); - Assert.AreEqual(expectedExecuted, functions.actualExecuted); + Assert.AreEqual(WriteReason.Upsert, functions.actualReason); fht.Log.FlushAndEvict(wait: true); + functions.actualReason = NoReason; int key = 42; - int input = (int)WriteReason.CopyToReadCache; - expectedExecuted |= 1 << input; + WriteReason expectedReason = readCopyDestination == ReadCopyDestination.ReadCache ? WriteReason.CopyToReadCache : WriteReason.CopyToTail; + int input = (int)expectedReason; var status = session.Read(key, input, out int output); Assert.AreEqual(Status.PENDING, status); session.CompletePending(wait: true); - Assert.AreEqual(expectedExecuted, functions.actualExecuted); + Assert.AreEqual(expectedReason, functions.actualReason); + functions.actualReason = NoReason; key = 64; - input = (int)WriteReason.CopyToTail; - expectedExecuted |= 1 << input; + expectedReason = WriteReason.CopyToTail; + input = (int)expectedReason; RecordMetadata recordMetadata = default; status = session.Read(ref key, ref input, ref output, ref recordMetadata, ReadFlags.CopyToTail); Assert.AreEqual(Status.PENDING, status); session.CompletePending(wait: true); - Assert.AreEqual(expectedExecuted, functions.actualExecuted); + Assert.AreEqual(expectedReason, functions.actualReason); - input = (int)WriteReason.Compaction; - expectedExecuted |= 1 << input; + functions.actualReason = NoReason; + expectedReason = WriteReason.Compaction; + input = (int)expectedReason; fht.Log.Compact(functions, ref input, ref output, fht.Log.SafeReadOnlyAddress, CompactionType.Scan); - Assert.AreEqual(expectedExecuted, functions.actualExecuted); + Assert.AreEqual(expectedReason, functions.actualReason); } } } diff --git a/cs/test/TestUtils.cs b/cs/test/TestUtils.cs index eee901d7f..74ae9b19f 100644 --- a/cs/test/TestUtils.cs +++ b/cs/test/TestUtils.cs @@ -8,7 +8,6 @@ using FASTER.devices; using System.Threading; using System.Runtime.InteropServices; -using System.Linq; namespace FASTER.test { @@ -176,6 +175,8 @@ internal enum AllocatorType internal enum SyncMode { Sync, Async }; + public enum ReadCopyDestination { Tail, ReadCache } + internal static (Status status, TOutput output) GetSinglePendingResult(CompletedOutputIterator completedOutputs) => GetSinglePendingResult(completedOutputs, out _);