From 3f7de5e2ce117316d6691e0c73ce7dc1c5ae4baa Mon Sep 17 00:00:00 2001 From: Ted Hart <15467143+TedHartMS@users.noreply.github.com> Date: Mon, 27 Mar 2023 18:23:17 -0700 Subject: [PATCH] [C#] Add ConditionalCopyToTail (#807) * Locking fixes - CompletePending* must also check to lock non-auxiliary records - Ephemeral locking must look for lock evictions as well - InternalLock must check for SpinWaitUntilRecordIsClosed - Replace oneMiss handling in InternalLock - EphemeralSUnlock must follow unlock failures - LockTable should have its own bufferPool - Ensure OnPagesClosed does eviction scans in address order * More locking fixes: - Fix missing unlock of a Tombstoned record in InternalContinuePendingRead - Add more Asserts where ManualLocking requires a locked record - Update and remove obsolete comments * - Remove key-present optimization for SpinWaitUntilRecordIsClosed - Better checking on following records transferred from LockTable in InternalLock(unlock) - Add some additional comments * Fix hash bucket overflow pointer during recovery Fix debug assert logic * WIP on locktable implemented via mainhash overflow buckets * unstaged files missed in previous push * Updates to EphemeralOnly locking for transfer-from-RO/RC * Change LockTableTests to OverflowBucketlockTableTests * Remove RecordInfo.Tentative, .IsIntermediate* * More locking WIP: session.NeedKeyLockCode, test cleanup * WIP locking: Tests build * WIP on mainhash locking: - Add KeyCode creation and sorting and methods public and internal - Remove unnecessary IsLocked (it does not make work with EphemeralOnly and SessionControlled may map to multiple keys) - More tests run now * Most UTs run * More UTs succeed * Still some UT failures * - use XLocks as Tentative indicator for insertions/splices at log tail and readcache insertions - improve readcache consistency checking - clear HasInMemoryLock when HeadAddress goes above srcRecord address following BlockAllocate - rename LockingMode.SessionControlled -> Mixed and other renaming for clarity - remove some unused methods - fix locking sequences in InternalContinuePendingR(ead|MW) * More fixes - Missing 'ref' on recordInfo var in FindAndUnlockTransferredRecord - Improve record allocation checking for below-HA in-memory source and RecordInfo lock clearing if so - Add LockCode to HashEntryInfo in DEBUG * Remove SpinWaitUntilRecordIsClosed; it was only used in ReadCache and in the relevant case we just need to wait for ReadCacheEvict to complete. * - MixedModeTentativeOrClosed is not needed - Add/Improve some ToString()s - Remove unused RecSrc.ClearSrc() - Rename to HasRecordInfoLock - Move ICPRead HandleImmediateRetryStatus outside the lock scope - Consoldiate EphemeralSUnlock(AfterPendingIO) * Streamline locking and FindInMemory calls in InternalXxx * - Change names to pendingContext.Initial*Address and improve setting of these in InternalXxx and handling of these in CompletePending*() and TryFindRecordInMemory. - Also consistentize related param names for readcache and log search to minAddress. * Clean up pending RMW call to CreateNewRecordRMW, per prior changes that make the inputSrc unnecessary. * - Fix stale readcache assignment to recSrc.Log - make searches after pendingIO completion non-inclusive of minAddress - Some ToString() and naming changes for consistency - Remove RecordInfo.IsValidUpdateOrLockSource (superseded by direct call to .IsClosed) * Allow length specification on KeyCode array operations * - Fix check for lock having escaped to eviction zone following BlockAllocate - Add some comments for SafeHeadAddress et al. * - Make sure the readcache bit is set when needed on addresses put into stackCtx.newLogicalAddress. - Rename to HandleNewRecordOnException for clarity * Update remote project * - Make sure FindAndUnlockTransferredRecord can go to SafeHeadAddress - Add non-ref overload of GetLockCode * Tweaks for benchmark with Mixed locking * Add ILockableKey.CompareLockCodes * Change LockCode to just be hashcode, since we may grow the hash table. * - Make CPR consistency check work without conflict with the key latches in the HashBuckets. - Change some names for clarity. - Pending read completion should return values from any in-memory record, as they are the latest * Remove duplicate opCtx param and removed sessionCtx (aka currentCtx, aka ctx) from calls where FasterSession was also passed (in favor of (new) FasterSession.SessionCtx), where the called method has a type constraint of "where FasterSession : IFasterSession". Not yet done: converting "whereFasterSession : IFasterSession" constraints to generic to remove the duplicate ctx parameter. * Fix DoLockOp to compare bucket indexes, not KeyCodes * Finish removal of duplicate sessionCtx param from calls where FasterSession was also passed (in favor of (new) FasterSession.SessionCtx), where the called method has a type constraint of "where FasterSession : IFasterSession" (non-generic). This cannot be done in Synchronization (the state machines) due to their use of NullFasterSession. * Fix key duplication in readcache and log: - VerifyInMemoryAddresses should not refresh hei if no splice point is present - FindRecordInMemory checks pendingContext.InitialHash - Move CompleteCopyToTail in ITCTT into the non-readcache section - TryFindRecordInMemory needs to check pc.InitialEntryIndex's readcache bit - Updaters need to scan down to hlog.ReadOnlyAddress to ensure the "key is not at or below LatestLogicalAddress" logic holds - ReadCacheCheckTailAfterSplice needs to check hei.*Address readcache bit * Replace use of SafeHeadAddress with SpinWaitUntilClosed in FindInReadCache * Remove LockingMode.EphemeralOnly add --hashpack arg to FASTER.benchmark * Tweaks for the Standard locking change * fix remote to use LockingMode.Standard * Add DisposeTest comments and more Asserts * Use calls on RecordSource to make it easier to verify correct matching of log and physicalAddress * Temporarily disable LocalMemoryDevice tests for investigation * test removal of DisposeTests * - Fix for CTT to address SameKeyInsertAndCTTTest failure - Add missing arg parsing for MemOnlyCache - Re-enable DeviceType.LocalMemory in tests * WIP on ConditionalInsert * [C#] MemOnlyCache sample: improved size tracking, including read cache (#800) * Improved cache size tracking, including read cache * Rename MemOnlyCache to ResizableCacheStore. * update doc * fixes * Updates * ClearBitsForDiskImages should unseal record as well (Q: why was it sealed to begin with?) * Correct clearing of index bits during recovery * Remove RecordInfo sealing * WIP Ephemeral locking at IFunctions-level only, part 1 * Remove unused SessionType enumeration; consolidate duplicate IFasterSession implementations * - Add EphemeralLocking readcache detach/reattach logic - Remove unused ReadFlags.ResetModifiedBit * Rename a couple files * re-add RecordInfo.Seal for Ephemeral locking; remove unused (RMW|Upsert)Action.NeedMoreSpace (this is the default 'false' behavior) * Remove single-key overloads of Lock() and Unlock() * - Update a too-aggressive assert. - CreateLogDevice errors on Windows if path > MAX_PATH minus room for segment#. * Improve previous MAX_PATH fix * Chnge Read of an expired to just return NOTFOUND | Expired rather than tombstoning * - Replace ReadFlags with ReadCopyOptions - More WIP on ConditionalInsert * RETRY_LATER rather than trying to re-find splice point directly in VerifyInMemoryAddresses * More WIP on ConditionalInsert * WIP on ConditionalCopyToTail * change filename * - Fix Compact/ConditionalCopyToTail - Make TryCopyToReadCache bool - Tighten up DetachAndReattachReadCacheChain - Other minor tweaks * fix merge --------- --- cs/samples/ReadAddress/VersionedReadApp.cs | 4 +- cs/samples/ResizableCacheStore/Program.cs | 2 +- cs/src/core/Async/ReadAsync.cs | 4 +- cs/src/core/ClientSession/ClientSession.cs | 9 +- .../core/ClientSession/FASTERClientSession.cs | 76 ++-- cs/src/core/Compaction/FASTERCompaction.cs | 105 ++---- cs/src/core/FasterLog/FasterLogSettings.cs | 2 +- cs/src/core/Index/Common/Contexts.cs | 106 ++---- cs/src/core/Index/Common/FasterKVSettings.cs | 9 +- cs/src/core/Index/Common/LogSettings.cs | 4 +- cs/src/core/Index/Common/ReadOptions.cs | 83 ++++- cs/src/core/Index/Common/RecordInfo.cs | 12 +- cs/src/core/Index/FASTER/FASTER.cs | 33 +- cs/src/core/Index/FASTER/FASTERIterator.cs | 52 ++- cs/src/core/Index/FASTER/FASTERThread.cs | 10 +- .../Implementation/ConditionalCopyToTail.cs | 108 ++++++ .../Implementation/ContainsKeyInMemory.cs | 15 +- .../FASTER/Implementation/ContinuePending.cs | 338 ++++-------------- .../Index/FASTER/Implementation/FindRecord.cs | 31 ++ .../FASTER/Implementation/HashEntryInfo.cs | 4 +- .../Index/FASTER/Implementation/Helpers.cs | 16 +- .../FASTER/Implementation/InternalDelete.cs | 2 +- .../FASTER/Implementation/InternalRMW.cs | 2 +- .../FASTER/Implementation/InternalRead.cs | 37 +- .../FASTER/Implementation/InternalUpsert.cs | 11 +- .../Locking/TransientLocking.cs | 3 +- .../Index/FASTER/Implementation/ReadCache.cs | 70 ++-- .../Implementation/TryCopyToReadCache.cs | 96 +++++ .../FASTER/Implementation/TryCopyToTail.cs | 103 ++++++ cs/src/core/Index/FASTER/ReadFlags.cs | 57 --- cs/src/core/Index/Interfaces/IFasterKV.cs | 12 +- cs/test/AdvancedLockTests.cs | 10 +- cs/test/BasicFASTERTests.cs | 11 +- cs/test/BlittableLogCompactionTests.cs | 197 +++++----- cs/test/DisposeTests.cs | 8 +- cs/test/EphemeralLockingTests.cs | 2 +- cs/test/LockableUnsafeContextTests.cs | 2 +- cs/test/ModifiedBitTests.cs | 2 +- cs/test/ReadAddressTests.cs | 213 +++++------ cs/test/ReadCacheChainTests.cs | 30 +- cs/test/SingleWriterTests.cs | 6 +- cs/test/TestTypes.cs | 8 +- cs/test/TestUtils.cs | 2 + cs/test/UnsafeContextTests.cs | 38 +- 44 files changed, 952 insertions(+), 993 deletions(-) create mode 100644 cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs create mode 100644 cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs create mode 100644 cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs delete mode 100644 cs/src/core/Index/FASTER/ReadFlags.cs diff --git a/cs/samples/ReadAddress/VersionedReadApp.cs b/cs/samples/ReadAddress/VersionedReadApp.cs index c151ec424..821c13bf3 100644 --- a/cs/samples/ReadAddress/VersionedReadApp.cs +++ b/cs/samples/ReadAddress/VersionedReadApp.cs @@ -151,7 +151,7 @@ private static void ScanStore(FasterKV store, int keyValue) var output = default(Value); var input = default(Value); var key = new Key(keyValue); - ReadOptions readOptions = new() { ReadFlags = ReadFlags.DisableReadCache}; + ReadOptions readOptions = new() { CopyOptions = ReadCopyOptions.None }; for (int lap = 9; /* tested in loop */; --lap) { var status = session.Read(ref key, ref input, ref output, ref readOptions, out var recordMetadata, serialNo: maxLap + 1); @@ -185,7 +185,7 @@ private static async Task ScanStoreAsync(FasterKV store, int keyValu var input = default(Value); var key = new Key(keyValue); RecordMetadata recordMetadata = default; - ReadOptions readOptions = new() { ReadFlags = ReadFlags.DisableReadCache }; + ReadOptions readOptions = new() { CopyOptions = ReadCopyOptions.None }; for (int lap = 9; /* tested in loop */; --lap) { var readAsyncResult = await session.ReadAsync(ref key, ref input, ref readOptions, default, serialNo: maxLap + 1, cancellationToken: cancellationToken); diff --git a/cs/samples/ResizableCacheStore/Program.cs b/cs/samples/ResizableCacheStore/Program.cs index 29622981e..87762c857 100644 --- a/cs/samples/ResizableCacheStore/Program.cs +++ b/cs/samples/ResizableCacheStore/Program.cs @@ -342,7 +342,7 @@ static void Main(string[] args) { LogDevice = log, ObjectLogDevice = objectLog, MutableFraction = 0.9, // 10% of memory log is "read-only region" - ReadFlags = UseReadCTT ? ReadFlags.CopyReadsToTail : ReadFlags.None, // whether reads in read-only region are copied to tail + ReadCopyOptions = UseReadCTT ? new(ReadCopyFrom.AllImmutable, ReadCopyTo.MainLog) : ReadCopyOptions.None, // whether reads in read-only region are copied to tail PageSizeBits = PageSizeBits, MemorySizeBits = MemorySizeBits }; diff --git a/cs/src/core/Async/ReadAsync.cs b/cs/src/core/Async/ReadAsync.cs index 656dcd4f5..4709a9768 100644 --- a/cs/src/core/Async/ReadAsync.cs +++ b/cs/src/core/Async/ReadAsync.cs @@ -105,9 +105,7 @@ internal ReadAsyncResult(FasterKV fasterKV, IFasterSession> ReadAsync(IFasterSession fasterSession, ref Key key, ref Input input, ref ReadOptions readOptions, Context context, long serialNo, CancellationToken token, bool noKey = false) { - var pcontext = new PendingContext { IsAsync = true }; - var operationFlags = PendingContext.GetOperationFlags(MergeReadFlags(fasterSession.Ctx.ReadFlags, readOptions.ReadFlags), noKey); - pcontext.SetOperationFlags(operationFlags, readOptions.StopAddress); + var pcontext = new PendingContext(fasterSession.Ctx.ReadCopyOptions, ref readOptions, isAsync: true, noKey: true); var diskRequest = default(AsyncIOContext); fasterSession.UnsafeResumeThread(); diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 39ad23159..83aad5410 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -663,10 +663,8 @@ internal bool UnsafeCompletePending(FasterSession fasterSession, var result = fht.InternalCompletePending(fasterSession, wait, requestedOutputs); if (spinWaitForCommit) { - if (wait != true) - { + if (!wait) throw new FasterException("Can spin-wait for commit (checkpoint completion) only if wait is true"); - } do { fht.InternalCompletePending(fasterSession, wait, requestedOutputs); @@ -861,14 +859,13 @@ public long Compact(ref Input input, ref Output output, lon /// /// /// Lower-bound address (addresses are searched from tail (high) to head (low); do not search for "future records" earlier than this) - /// Actual address of existing key record [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal OperationStatus CompactionCopyToTail(ref Key key, ref Input input, ref Value desiredValue, ref Output output, long untilAddress, long actualAddress) + internal Status CompactionCopyToTail(ref Key key, ref Input input, ref Value desiredValue, ref Output output, long untilAddress) { UnsafeResumeThread(); try { - return fht.InternalCopyToTailForCompaction(ref key, ref input, ref desiredValue, ref output, untilAddress, actualAddress, FasterSession); + return fht.CompactionConditionalCopyToTail(FasterSession, ref key, ref input, ref desiredValue, ref output, untilAddress); } finally { diff --git a/cs/src/core/ClientSession/FASTERClientSession.cs b/cs/src/core/ClientSession/FASTERClientSession.cs index 69cbb7f93..c87a6fc64 100644 --- a/cs/src/core/ClientSession/FASTERClientSession.cs +++ b/cs/src/core/ClientSession/FASTERClientSession.cs @@ -34,13 +34,13 @@ internal ClientSessionBuilder(FasterKV fasterKV, IFunctionsCallback functions /// Name of session (optional) /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession NewSession(Functions functions, string sessionName = null, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions { - return _fasterKV.NewSession(functions, sessionName, sessionVariableLengthStructSettings, readFlags); + return _fasterKV.NewSession(functions, sessionName, sessionVariableLengthStructSettings, readCopyOptions); } /// @@ -50,13 +50,13 @@ public ClientSession NewSessionName of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession ResumeSession(Functions functions, string sessionName, out CommitPoint commitPoint, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions { - return _fasterKV.ResumeSession(functions, sessionName, out commitPoint, sessionVariableLengthStructSettings, readFlags); + return _fasterKV.ResumeSession(functions, sessionName, out commitPoint, sessionVariableLengthStructSettings, readCopyOptions); } /// @@ -64,16 +64,16 @@ public ClientSession ResumeSessio /// /// Name of session (optional) /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession NewSession(string sessionName = null, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions { if (_functions == null) throw new FasterException("Functions not provided for session"); - return _fasterKV.NewSession((Functions)_functions, sessionName, sessionVariableLengthStructSettings, readFlags); + return _fasterKV.NewSession((Functions)_functions, sessionName, sessionVariableLengthStructSettings, readCopyOptions); } /// @@ -83,16 +83,16 @@ public ClientSession NewSessionName of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession ResumeSession(string sessionName, out CommitPoint commitPoint, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions { if (_functions == null) throw new FasterException("Functions not provided for session"); - return _fasterKV.ResumeSession((Functions)_functions, sessionName, out commitPoint, sessionVariableLengthStructSettings, readFlags); + return _fasterKV.ResumeSession((Functions)_functions, sessionName, out commitPoint, sessionVariableLengthStructSettings, readCopyOptions); } /// @@ -102,16 +102,16 @@ public ClientSession ResumeSessio /// ID of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession ResumeSession(int sessionID, out CommitPoint commitPoint, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions { if (_functions == null) throw new FasterException("Functions not provided for session"); - return _fasterKV.ResumeSession((Functions)_functions, sessionID, out commitPoint, sessionVariableLengthStructSettings, readFlags); + return _fasterKV.ResumeSession((Functions)_functions, sessionID, out commitPoint, sessionVariableLengthStructSettings, readCopyOptions); } } @@ -134,12 +134,12 @@ public ClientSessionBuilder For( /// Callback functions /// Name of session (auto-generated if not provided) /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession> NewSession(IFunctions functions, string sessionName = null, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) { - return NewSession>(functions, sessionName, sessionVariableLengthStructSettings, readFlags); + return NewSession>(functions, sessionName, sessionVariableLengthStructSettings, readCopyOptions); } /// @@ -148,16 +148,16 @@ public ClientSessionCallback functions /// Name of session (optional) /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance internal ClientSession NewSession(Functions functions, string sessionName = null, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions => InternalNewSession>(functions, sessionName, - ctx => new ClientSession(this, ctx, functions, sessionVariableLengthStructSettings), readFlags); + ctx => new ClientSession(this, ctx, functions, sessionVariableLengthStructSettings), readCopyOptions); private TSession InternalNewSession(Functions functions, string sessionName, - Func, TSession> sessionCreator, ReadFlags readFlags) + Func, TSession> sessionCreator, ReadCopyOptions readCopyOptions) where TSession : IClientSession { if (functions == null) @@ -172,11 +172,11 @@ private TSession InternalNewSession int sessionID = Interlocked.Increment(ref maxSessionID); var ctx = new FasterExecutionContext(); InitContext(ctx, sessionID, sessionName); - ctx.ReadFlags = MergeReadFlags(ReadFlags, readFlags); + ctx.MergeReadCopyOptions(this.ReadCopyOptions, readCopyOptions); var prevCtx = new FasterExecutionContext(); InitContext(prevCtx, sessionID, sessionName); prevCtx.version--; - prevCtx.ReadFlags = ctx.ReadFlags; + prevCtx.ReadCopyOptions = ctx.ReadCopyOptions; ctx.prevCtx = prevCtx; @@ -197,12 +197,12 @@ private TSession InternalNewSession /// Name of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession> ResumeSession(IFunctions functions, string sessionName, - out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) { - return ResumeSession>(functions, sessionName, out commitPoint, sessionVariableLengthStructSettings, readFlags); + return ResumeSession>(functions, sessionName, out commitPoint, sessionVariableLengthStructSettings, readCopyOptions); } /// @@ -213,17 +213,17 @@ public ClientSessionName of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance internal ClientSession ResumeSession(Functions functions, string sessionName, out CommitPoint commitPoint, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions { if (_recoveredSessionNameMap == null || !_recoveredSessionNameMap.TryRemove(sessionName, out int sessionID)) throw new FasterException($"Unable to find session named {sessionName} to recover"); return InternalResumeSession>(functions, sessionID, out commitPoint, - ctx => new ClientSession(this, ctx, functions, sessionVariableLengthStructSettings), readFlags); + ctx => new ClientSession(this, ctx, functions, sessionVariableLengthStructSettings), readCopyOptions); } /// @@ -234,12 +234,12 @@ internal ClientSession ResumeSess /// ID of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession> ResumeSession(IFunctions functions, int sessionID, - out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) { - return ResumeSession>(functions, sessionID, out commitPoint, sessionVariableLengthStructSettings, readFlags); + return ResumeSession>(functions, sessionID, out commitPoint, sessionVariableLengthStructSettings, readCopyOptions); } /// @@ -250,18 +250,18 @@ public ClientSessionID of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance internal ClientSession ResumeSession(Functions functions, int sessionID, out CommitPoint commitPoint, - SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default) + SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default) where Functions : IFunctions { return InternalResumeSession>(functions, sessionID, out commitPoint, - ctx => new ClientSession(this, ctx, functions, sessionVariableLengthStructSettings), readFlags); + ctx => new ClientSession(this, ctx, functions, sessionVariableLengthStructSettings), readCopyOptions); } private TSession InternalResumeSession(Functions functions, int sessionID, out CommitPoint commitPoint, - Func, TSession> sessionCreator, ReadFlags readFlags) + Func, TSession> sessionCreator, ReadCopyOptions readCopyOptions) where TSession : IClientSession { if (functions == null) @@ -271,7 +271,7 @@ private TSession InternalResumeSession(sessionID, out var ctx); if (commitPoint.UntilSerialNo == -1) throw new Exception($"Unable to find session {sessionID} to recover"); - ctx.ReadFlags = MergeReadFlags(ReadFlags, readFlags); + ctx.MergeReadCopyOptions(this.ReadCopyOptions, readCopyOptions); var session = sessionCreator(ctx); diff --git a/cs/src/core/Compaction/FASTERCompaction.cs b/cs/src/core/Compaction/FASTERCompaction.cs index 0f3498ead..639006eb1 100644 --- a/cs/src/core/Compaction/FASTERCompaction.cs +++ b/cs/src/core/Compaction/FASTERCompaction.cs @@ -44,6 +44,7 @@ private long CompactLookup 256) { - long checkedAddress = hlog.SafeReadOnlyAddress; - - var status = fhtSession.Read(ref key, ref input, ref output, ref readOptions, out var recordMetadata); - - // For now, we perform each record compaction separately. In order to do this in parallel to maximize IOPS, - // we need a new Conditional Upsert API (CopyToTailIfNotExists) - if (status.IsPending) - { - fhtSession.CompletePendingWithOutputs(out var completedOutput, true); - try - { - if (completedOutput.Next()) - { - status = completedOutput.Current.Status; - recordMetadata = completedOutput.Current.RecordMetadata; - } - else - throw new FasterException("Pending Read did not complete during compaction"); - } - finally - { - completedOutput.Dispose(); - } - } - - // Either record was found in future, or we returned NOTFOUND because of a tombstone in future - if (status.Found || recordMetadata.Address >= iter1.NextAddress) - break; - - copyStatus = fhtSession.CompactionCopyToTail(ref key, ref input, ref value, ref output, checkedAddress, iter1.CurrentAddress); - recordMetadata.RecordInfo.PreviousAddress = checkedAddress; - } while (copyStatus == OperationStatus.RECORD_ON_DISK); + fhtSession.CompletePending(wait: true); + numPending = 0; + } } // Ensure address is at record boundary untilAddress = iter1.NextAddress; } + if (numPending > 0) + fhtSession.CompletePending(wait: true); } Log.ShiftBeginAddress(untilAddress, false); return untilAddress; @@ -130,13 +103,9 @@ private long CompactScan ref var value = ref iter1.GetValue(); if (recordInfo.Tombstone || cf.IsDeleted(ref key, ref value)) - { - tempKvSession.Delete(ref key, default, 0); - } + tempKvSession.Delete(ref key); else - { - tempKvSession.Upsert(ref key, ref value, default, 0); - } + tempKvSession.Upsert(ref key, ref value); } // Ensure address is at record boundary untilAddress = originalUntilAddress = iter1.NextAddress; @@ -145,53 +114,47 @@ private long CompactScan // Scan until SafeReadOnlyAddress var scanUntil = hlog.SafeReadOnlyAddress; if (untilAddress < scanUntil) - LogScanForValidity(ref untilAddress, scanUntil, tempKvSession); + ScanImmutableTailToRemoveFromTempKv(ref untilAddress, scanUntil, tempKvSession); + var numPending = 0; using var iter3 = tempKv.Log.Scan(tempKv.Log.BeginAddress, tempKv.Log.TailAddress); while (iter3.GetNext(out var recordInfo)) { - if (!recordInfo.Tombstone) + if (recordInfo.Tombstone) + continue; + + // Try to ensure we have checked all immutable records + scanUntil = hlog.SafeReadOnlyAddress; + if (untilAddress < scanUntil) + ScanImmutableTailToRemoveFromTempKv(ref untilAddress, scanUntil, tempKvSession); + + // If record is not the latest in tempKv's memory for this key, ignore it (will not be returned if deleted) + if (!tempKvSession.ContainsKeyInMemory(ref iter3.GetKey(), out long tempKeyAddress).Found || iter3.CurrentAddress != tempKeyAddress) + continue; + + // As long as there's no record of the same key whose address is >= untilAddress (scan boundary), we are safe to copy the old record + // to the tail. We don't know the actualAddress of the key in the main kv, but we it will not be below untilAddress. + var status = fhtSession.CompactionCopyToTail(ref iter3.GetKey(), ref input, ref iter3.GetValue(), ref output, untilAddress - 1); + if (status.IsPending && ++numPending > 256) { - OperationStatus copyStatus = default; - do - { - // Try to ensure we have checked all immutable records - scanUntil = hlog.SafeReadOnlyAddress; - if (untilAddress < scanUntil) - LogScanForValidity(ref untilAddress, scanUntil, tempKvSession); - - // If record is not the latest in tempKv's memory for this key, ignore it - if (tempKvSession.ContainsKeyInMemory(ref iter3.GetKey(), out long tempKeyAddress).Found) - { - if (iter3.CurrentAddress != tempKeyAddress) - continue; - } - else - { - // Possibly deleted key (once ContainsKeyInMemory is updated to check Tombstones) - continue; - } - // As long as there's no record of the same key whose address is >= untilAddress (scan boundary), - // we are safe to copy the old record to the tail. We don't know the actualAddress in the main kv. - copyStatus = fhtSession.CompactionCopyToTail(ref iter3.GetKey(), ref input, ref iter3.GetValue(), ref output, untilAddress - 1, actualAddress: Constants.kUnknownAddress); - } while (copyStatus == OperationStatus.RECORD_ON_DISK); + fhtSession.CompletePending(wait: true); + numPending = 0; } } + if (numPending > 0) + fhtSession.CompletePending(wait: true); } Log.ShiftBeginAddress(originalUntilAddress, false); return originalUntilAddress; } - private void LogScanForValidity(ref long untilAddress, long scanUntil, ClientSession tempKvSession) + private void ScanImmutableTailToRemoveFromTempKv(ref long untilAddress, long scanUntil, ClientSession tempKvSession) where Functions : IFunctions { using var iter = Log.Scan(untilAddress, scanUntil); while (iter.GetNext(out var _)) { - ref var k = ref iter.GetKey(); - ref var v = ref iter.GetValue(); - - tempKvSession.Delete(ref k, default, 0); + tempKvSession.Delete(ref iter.GetKey(), default, 0); untilAddress = iter.NextAddress; } } diff --git a/cs/src/core/FasterLog/FasterLogSettings.cs b/cs/src/core/FasterLog/FasterLogSettings.cs index e7780b620..3c21de2ca 100644 --- a/cs/src/core/FasterLog/FasterLogSettings.cs +++ b/cs/src/core/FasterLog/FasterLogSettings.cs @@ -205,7 +205,7 @@ internal LogSettings GetLogSettings() PageSizeBits = Utility.NumBitsPreviousPowerOf2(PageSize), SegmentSizeBits = Utility.NumBitsPreviousPowerOf2(SegmentSize), MemorySizeBits = ReadOnlyMode ? 0 : Utility.NumBitsPreviousPowerOf2(MemorySize), - ReadFlags = ReadFlags.None, + ReadCopyOptions = ReadCopyOptions.None, MutableFraction = MutableFraction, ObjectLogDevice = null, ReadCacheSettings = null, diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index b6e536fec..180717503 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -19,7 +19,8 @@ internal enum OperationType READ, RMW, UPSERT, - DELETE + DELETE, + CONDITIONAL_INSERT, } [Flags] @@ -149,10 +150,17 @@ internal struct PendingContext internal long serialNum; internal HashBucketEntry entry; + // operationFlags values internal ushort operationFlags; + internal const ushort kNoOpFlags = 0; + internal const ushort kNoKey = 0x0001; + internal const ushort kIsAsync = 0x0002; + + internal ReadCopyOptions readCopyOptions; + internal RecordInfo recordInfo; internal long minAddress; - internal LockOperation lockOperation; + internal WriteReason writeReason; // for ConditionalCopyToTail // For flushing head pages on tail allocation. internal CompletionEvent flushEvent; @@ -160,22 +168,22 @@ internal struct PendingContext // For RMW if an allocation caused the source record for a copy to go from readonly to below HeadAddress, or for any operation with CAS failure. internal long retryNewLogicalAddress; - // BEGIN Must be kept in sync with corresponding ReadFlags enum values - internal const ushort kDisableReadCacheUpdates = 0x0001; - internal const ushort kDisableReadCacheReads = 0x0002; - internal const ushort kCopyReadsToTail = 0x0004; - internal const ushort kCopyFromDeviceOnly = 0x0008; - internal const ushort kResetModifiedBit = 0x0010; - // END Must be kept in sync with corresponding ReadFlags enum values - - internal const ushort kNoKey = 0x0100; - internal const ushort kIsAsync = 0x0200; + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal PendingContext(ReadCopyOptions sessionReadCopyOptions, ref ReadOptions readOptions, bool isAsync = false, bool noKey = false) + { + // The async flag is often set when the PendingContext is created, so preserve that. + this.operationFlags = (ushort)((noKey ? kNoKey : kNoOpFlags) | (isAsync ? kIsAsync : kNoOpFlags)); + this.readCopyOptions = ReadCopyOptions.Merge(sessionReadCopyOptions, readOptions.CopyOptions); + this.minAddress = readOptions.StopAddress; + } - // Flags for various operations passed at multiple levels, e.g. through RETRY_NOW. - internal const ushort kUnused1 = 0x1000; - internal const ushort kUnused2 = 0x2000; - internal const ushort kUnused3 = 0x4000; - internal const ushort kHasExpiration = 0x8000; + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal PendingContext(ReadCopyOptions readCopyOptions, bool isAsync = false, bool noKey = false) + { + // The async flag is often set when the PendingContext is created, so preserve that. + this.operationFlags = (ushort)((noKey ? kNoKey : kNoOpFlags) | (isAsync ? kIsAsync : kNoOpFlags)); + this.readCopyOptions = readCopyOptions; + } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal IHeapContainer DetachKey() @@ -193,60 +201,12 @@ internal IHeapContainer DetachInput() return tempInputContainer; } - static PendingContext() - { - Debug.Assert((ushort)ReadFlags.DisableReadCacheUpdates >> 1 == kDisableReadCacheUpdates); - Debug.Assert((ushort)ReadFlags.DisableReadCacheReads >> 1 == kDisableReadCacheReads); - Debug.Assert((ushort)ReadFlags.CopyReadsToTail >> 1 == kCopyReadsToTail); - Debug.Assert((ushort)ReadFlags.CopyFromDeviceOnly >> 1 == kCopyFromDeviceOnly); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static ushort GetOperationFlags(ReadFlags readFlags) - => (ushort)((int)(readFlags & (ReadFlags.DisableReadCacheUpdates | ReadFlags.DisableReadCacheReads | ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly)) >> 1); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static ushort GetOperationFlags(ReadFlags readFlags, bool noKey) - { - ushort flags = GetOperationFlags(readFlags); - if (noKey) - flags |= kNoKey; - return flags; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void SetOperationFlags(ReadFlags readFlags, ref ReadOptions readOptions) => this.SetOperationFlags(GetOperationFlags(readFlags), readOptions.StopAddress); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void SetOperationFlags(ReadFlags readFlags, ref ReadOptions readOptions, bool noKey) => this.SetOperationFlags(GetOperationFlags(readFlags, noKey), readOptions.StopAddress); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void SetOperationFlags(ReadFlags readFlags) => this.operationFlags = GetOperationFlags(readFlags); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void SetOperationFlags(ushort flags, long stopAddress) - { - // The async flag is often set when the PendingContext is created, so preserve that. - this.operationFlags = (ushort)(flags | (this.operationFlags & kIsAsync)); - this.minAddress = stopAddress; - } - internal bool NoKey { get => (operationFlags & kNoKey) != 0; set => operationFlags = value ? (ushort)(operationFlags | kNoKey) : (ushort)(operationFlags & ~kNoKey); } - internal bool DisableReadCacheUpdates => (operationFlags & kDisableReadCacheUpdates) != 0; - - internal bool DisableReadCacheReads => (operationFlags & kDisableReadCacheReads) != 0; - - internal bool CopyReadsToTail => (operationFlags & kCopyReadsToTail) != 0; - - internal bool CopyReadsToTailFromReadOnly => (operationFlags & (kCopyReadsToTail | kCopyFromDeviceOnly)) == kCopyReadsToTail; - - internal bool CopyFromDeviceOnly => (operationFlags & kCopyFromDeviceOnly) != 0; - internal bool HasMinAddress => this.minAddress != Constants.kInvalidAddress; internal bool IsAsync @@ -283,8 +243,8 @@ internal sealed class FasterExecutionContext internal int sessionID; internal string sessionName; - // Control Read operations. These flags override flags specified at the FasterKV level, but may be overridden on the individual Read() operations - internal ReadFlags ReadFlags; + // Control automatic Read copy operations. These flags override flags specified at the FasterKV level, but may be overridden on the individual Read() operations + internal ReadCopyOptions ReadCopyOptions; internal long version; internal long serialNum; @@ -301,14 +261,10 @@ internal sealed class FasterExecutionContext public int SyncIoPendingCount => ioPendingRequests.Count - asyncPendingCount; - public bool HasNoPendingRequests - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get - { - return SyncIoPendingCount == 0; - } - } + internal void MergeReadCopyOptions(ReadCopyOptions storeCopyOptions, ReadCopyOptions copyOptions) + => this.ReadCopyOptions = ReadCopyOptions.Merge(storeCopyOptions, copyOptions); + + public bool HasNoPendingRequests => SyncIoPendingCount == 0; public void WaitPending(LightEpoch epoch) { diff --git a/cs/src/core/Index/Common/FasterKVSettings.cs b/cs/src/core/Index/Common/FasterKVSettings.cs index 960433128..86855ffb5 100644 --- a/cs/src/core/Index/Common/FasterKVSettings.cs +++ b/cs/src/core/Index/Common/FasterKVSettings.cs @@ -59,7 +59,7 @@ public sealed class FasterKVSettings : IDisposable /// /// Control Read operations. These flags may be overridden by flags specified on session.NewSession or on the individual Read() operations /// - public ReadFlags ReadFlags; + public ReadCopyOptions ReadCopyOptions; /// /// Whether to preallocate the entire log (pages) in memory @@ -196,6 +196,7 @@ public override string ToString() retStr += $"; obj log device: {(ObjectLogDevice == null ? "null" : ObjectLogDevice.GetType().Name)}"; retStr += $"; mutable fraction: {MutableFraction}; locking mode: {this.LockingMode}"; retStr += $"; read cache (rc): {(ReadCacheEnabled ? "yes" : "no")}"; + retStr += $"; read copy options: {ReadCopyOptions}"; if (ReadCacheEnabled) retStr += $"; rc memory: {Utility.PrettySize(ReadCacheMemorySize)}; rc page: {Utility.PrettySize(ReadCachePageSize)}"; return retStr; @@ -206,8 +207,8 @@ internal long GetIndexSizeCacheLines() long adjustedSize = Utility.PreviousPowerOf2(IndexSize); if (adjustedSize < 512) throw new FasterException($"{nameof(IndexSize)} should be at least of size 8 cache line (512 bytes)"); - if (IndexSize != adjustedSize) - logger?.LogInformation($"Warning: using lower value {adjustedSize} instead of specified {IndexSize} for {nameof(IndexSize)}"); + if (IndexSize != adjustedSize) // Don't use string interpolation when logging messages because it makes it impossible to group by the message template. + logger?.LogInformation("Warning: using lower value {0} instead of specified {1} for {2}", adjustedSize, IndexSize, nameof(IndexSize)); return adjustedSize / 64; } @@ -215,7 +216,7 @@ internal LogSettings GetLogSettings() { return new LogSettings { - ReadFlags = ReadFlags, + ReadCopyOptions = ReadCopyOptions, LogDevice = LogDevice, ObjectLogDevice = ObjectLogDevice, MemorySizeBits = Utility.NumBitsPreviousPowerOf2(MemorySize), diff --git a/cs/src/core/Index/Common/LogSettings.cs b/cs/src/core/Index/Common/LogSettings.cs index 0781482a3..86497d945 100644 --- a/cs/src/core/Index/Common/LogSettings.cs +++ b/cs/src/core/Index/Common/LogSettings.cs @@ -60,9 +60,9 @@ public class LogSettings public double MutableFraction = 0.9; /// - /// Control Read operations. These flags may be overridden by flags specified on session.NewSession or on the individual Read() operations + /// Control Read copy operations. These values may be overridden by flags specified on session.NewSession or on the individual Read() operations /// - public ReadFlags ReadFlags; + public ReadCopyOptions ReadCopyOptions; /// /// Settings for optional read cache diff --git a/cs/src/core/Index/Common/ReadOptions.cs b/cs/src/core/Index/Common/ReadOptions.cs index 0d8be5406..b66552d98 100644 --- a/cs/src/core/Index/Common/ReadOptions.cs +++ b/cs/src/core/Index/Common/ReadOptions.cs @@ -1,8 +1,83 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. +using System.Runtime.CompilerServices; + namespace FASTER.core { + /// + /// Identifies which log regions records will be copied from to . This specification is + /// evaluated in hierarchical order, from that on the FasterKV ctor, which may be overridden by those in + /// .NewSession(), which may be overridden + /// by those at the individual Read() level. + /// + public enum ReadCopyFrom : byte + { + /// 'default' value; inherit settings from the previous hierarchy level(s). + Inherit = 0, + + /// Do not copy. + None, + + /// From larger-than-memory device (e.g. disk storage). + Device, + + /// From or from the immutable region of the log. + AllImmutable + } + + /// + /// Identifies the destination of records copied from . + /// + public enum ReadCopyTo : byte + { + /// 'default' value; inherit settings from the previous hierarchy level(s). + Inherit = 0, + + /// Do not copy. + None, + + /// Copy to the tail of the main log (or splice into the readcache/mainlog boundary, if readcache records are present). + MainLog, + + /// Copy to the readcache. This requires that be supplied to the FasterKV ctor. + ReadCache + } + + /// + /// Options for automatically copying immutable records on Read(). + /// + public struct ReadCopyOptions + { + /// Which immutable regions to copy records from. + public ReadCopyFrom CopyFrom; + + /// The destination for copies records. + public ReadCopyTo CopyTo; + + internal bool IsActive => CopyFrom != ReadCopyFrom.None && CopyTo != ReadCopyTo.None; + + /// Constructor. + public ReadCopyOptions(ReadCopyFrom from, ReadCopyTo to) + { + this.CopyFrom = from; + this.CopyTo = to; + } + + internal ReadCopyOptions Merge(ReadCopyOptions other) => this = Merge(this, other); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static ReadCopyOptions Merge(ReadCopyOptions upper, ReadCopyOptions lower) + => new(lower.CopyFrom == ReadCopyFrom.Inherit ? upper.CopyFrom : lower.CopyFrom, + lower.CopyTo == ReadCopyTo.Inherit ? upper.CopyTo : lower.CopyTo); + + /// A default instance that does no copying. + public static ReadCopyOptions None => new() { CopyFrom = ReadCopyFrom.None, CopyTo = ReadCopyTo.None }; + + /// + public override string ToString() => $"from: {CopyFrom}, to {CopyTo}"; + } + /// /// Options for the read operation /// @@ -22,9 +97,11 @@ public struct ReadOptions public long StopAddress; /// - /// Flags for controlling operations within the read, such as ReadCache interaction. When doing versioned reads, this should turn off - /// and turn on + /// Options for automatically copying immutable records on Read(). /// - public ReadFlags ReadFlags; + public ReadCopyOptions CopyOptions; + + /// + public override string ToString() => $"startAddr: {StartAddress}, stopAddr {StopAddress}, copy {{{CopyOptions}}}"; } } diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs index 46e2628a0..c03ed400a 100644 --- a/cs/src/core/Index/Common/RecordInfo.cs +++ b/cs/src/core/Index/Common/RecordInfo.cs @@ -12,7 +12,7 @@ namespace FASTER.core { // RecordInfo layout (64 bits total): - // [Unused3][Modified][InNewVersion][Filler][Dirty][Unused2][Unused1][Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] + // [Unused1][Modified][InNewVersion][Filler][Dirty][Unused2][Sealed][Valid][Tombstone][X][SSSSSS] [RAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] [AAAAAAAA] // where X = exclusive lock, S = shared lock, R = readcache, A = address [StructLayout(LayoutKind.Explicit, Size = 8)] public struct RecordInfo @@ -232,9 +232,6 @@ internal bool TryResetModifiedAtomic() } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void CloseAtomic() => SetInvalidAtomic(); - [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryUpdateAddress(long expectedPrevAddress, long newPrevAddress) { @@ -349,9 +346,6 @@ public long PreviousAddress } } - public bool PreviousAddressIsReadCache => (this.word & Constants.kReadCacheBitMask) != 0; - public long AbsolutePreviousAddress => AbsoluteAddress(this.PreviousAddress); - [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int GetLength() => kTotalSizeInBytes; @@ -369,10 +363,10 @@ internal bool Unused2 public override string ToString() { - var paRC = this.PreviousAddressIsReadCache ? "(rc)" : string.Empty; + var paRC = IsReadCache(this.PreviousAddress) ? "(rc)" : string.Empty; var locks = $"{(this.IsLockedExclusive ? "x" : string.Empty)}{this.NumLockedShared}"; static string bstr(bool value) => value ? "T" : "F"; - return $"prev {this.AbsolutePreviousAddress}{paRC}, locks {locks}, valid {bstr(Valid)}, tomb {bstr(Tombstone)}, seal {bstr(this.IsSealed)}," + return $"prev {AbsoluteAddress(this.PreviousAddress)}{paRC}, locks {locks}, valid {bstr(Valid)}, tomb {bstr(Tombstone)}, seal {bstr(this.IsSealed)}," + $" mod {bstr(Modified)}, dirty {bstr(Dirty)}, fill {bstr(Filler)}, Un1 {bstr(Unused1)}, Un2 {bstr(Unused2)}"; } } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index c01abbcd6..85b2f961f 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -26,7 +26,7 @@ public partial class FasterKV : FasterBase, internal readonly IFasterEqualityComparer comparer; internal readonly bool UseReadCache; - private readonly ReadFlags ReadFlags; + private readonly ReadCopyOptions ReadCopyOptions; internal readonly int sectorSize; private readonly bool WriteDefaultOnDelete; @@ -153,11 +153,18 @@ public FasterKV(long size, LogSettings logSettings, if (checkpointSettings.CheckpointManager is null) disposeCheckpointManager = true; - this.ReadFlags = logSettings.ReadFlags; UseReadCache = logSettings.ReadCacheSettings is not null; - UpdateVarLen(ref variableLengthStructSettings); + this.ReadCopyOptions = logSettings.ReadCopyOptions; + if (this.ReadCopyOptions.CopyTo == ReadCopyTo.Inherit) + this.ReadCopyOptions.CopyTo = UseReadCache ? ReadCopyTo.ReadCache : ReadCopyTo.None; + else if (this.ReadCopyOptions.CopyTo == ReadCopyTo.ReadCache && !UseReadCache) + this.ReadCopyOptions.CopyTo = ReadCopyTo.None; + + if (this.ReadCopyOptions.CopyFrom == ReadCopyFrom.Inherit) + this.ReadCopyOptions.CopyFrom = ReadCopyFrom.Device; + UpdateVarLen(ref variableLengthStructSettings); IVariableLengthStruct keyLen = null; if ((!Utility.IsBlittable() && variableLengthStructSettings?.keyLength is null) || @@ -212,7 +219,7 @@ public FasterKV(long size, LogSettings logSettings, { readcache = new BlittableAllocator( new LogSettings - { + { LogDevice = new NullDevice(), PageSizeBits = logSettings.ReadCacheSettings.PageSizeBits, MemorySizeBits = logSettings.ReadCacheSettings.MemorySizeBits, @@ -567,21 +574,11 @@ public async ValueTask CompleteCheckpointAsync(CancellationToken token = default } } - internal static ReadFlags MergeReadFlags(ReadFlags upper, ReadFlags lower) - { - // If lower is None, start with Default, else start with "upper without None" - ReadFlags flags = ((lower & ReadFlags.None) == 0) ? (upper & ~ReadFlags.None) : ReadFlags.Default; - // Add in "lower without None" - flags |= (lower & ~ReadFlags.None); - return flags; - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] internal Status ContextRead(ref Key key, ref Input input, ref Output output, Context context, FasterSession fasterSession, long serialNo) where FasterSession : IFasterSession { - var pcontext = default(PendingContext); - pcontext.SetOperationFlags(fasterSession.Ctx.ReadFlags); + var pcontext = new PendingContext(fasterSession.Ctx.ReadCopyOptions); OperationStatus internalStatus; do internalStatus = InternalRead(ref key, ref input, ref output, Constants.kInvalidAddress, ref context, ref pcontext, fasterSession, serialNo); @@ -599,8 +596,7 @@ internal Status ContextRead(ref Key key, FasterSession fasterSession, long serialNo) where FasterSession : IFasterSession { - var pcontext = default(PendingContext); - pcontext.SetOperationFlags(MergeReadFlags(fasterSession.Ctx.ReadFlags, readOptions.ReadFlags), ref readOptions); + var pcontext = new PendingContext(fasterSession.Ctx.ReadCopyOptions, ref readOptions); OperationStatus internalStatus; do internalStatus = InternalRead(ref key, ref input, ref output, readOptions.StartAddress, ref context, ref pcontext, fasterSession, serialNo); @@ -618,8 +614,7 @@ internal Status ContextRead(ref Key key, internal Status ContextReadAtAddress(ref Input input, ref Output output, ref ReadOptions readOptions, Context context, FasterSession fasterSession, long serialNo) where FasterSession : IFasterSession { - var pcontext = default(PendingContext); - pcontext.SetOperationFlags(MergeReadFlags(fasterSession.Ctx.ReadFlags, readOptions.ReadFlags), ref readOptions, noKey: true); + var pcontext = new PendingContext(fasterSession.Ctx.ReadCopyOptions, ref readOptions, noKey: true); Key key = default; OperationStatus internalStatus; do diff --git a/cs/src/core/Index/FASTER/FASTERIterator.cs b/cs/src/core/Index/FASTER/FASTERIterator.cs index f97ed8a38..fc80b0a8b 100644 --- a/cs/src/core/Index/FASTER/FASTERIterator.cs +++ b/cs/src/core/Index/FASTER/FASTERIterator.cs @@ -19,12 +19,9 @@ public IFasterScanIterator Iterate - (this, functions, untilAddress, loggerFactory: loggerFactory); + return new FasterKVIterator(this, functions, untilAddress, loggerFactory: loggerFactory); } - /// /// Iterator for all (distinct) live key-values stored in FASTER /// @@ -50,7 +47,6 @@ public IFasterScanIterator Iterate(CompactionFu } } - internal sealed class FasterKVIterator : IFasterScanIterator where Functions : IFunctions { @@ -60,6 +56,10 @@ internal sealed class FasterKVIterator iter1; private IFasterScanIterator iter2; + // Phases are: + // 0: Populate tempKv if the record is not the tailmost for the tag chain; if it is, then return it. + // 1: Return records from tempKv. + // 2: Done private int enumerationPhase; public FasterKVIterator(FasterKV fht, Functions functions, long untilAddress, ILoggerFactory loggerFactory = null) @@ -114,39 +114,34 @@ public unsafe bool GetNext(out RecordInfo recordInfo) if (iter1.GetNext(out recordInfo)) { ref var key = ref iter1.GetKey(); - ref var value = ref iter1.GetValue(); - HashEntryInfo hei = new(fht.Comparer.GetHashCode64(ref key)); if (fht.FindTag(ref hei) && hei.entry.Address == iter1.CurrentAddress) { + // The tag chain starts with this (won't be true if we have readcache) so we won't see it again; remove it from tempKv if we've seen it before. if (recordInfo.PreviousAddress >= fht.Log.BeginAddress) { + // Check if it's in-memory first so we don't spuriously create a tombstone record. if (tempKvSession.ContainsKeyInMemory(ref key, out _).Found) - { tempKvSession.Delete(ref key); - } } if (!recordInfo.Tombstone) return true; - continue; } + + // Not the tailmost record in the tag chain so handle whether to add it to or remove it from tempKV (we want to return only the latest version). + if (recordInfo.Tombstone) + tempKvSession.Delete(ref key); else - { - if (recordInfo.Tombstone) - tempKvSession.Delete(ref key); - else - tempKvSession.Upsert(ref key, ref value); - continue; - } - } - else - { - iter1.Dispose(); - enumerationPhase = 1; - iter2 = tempKv.Log.Scan(tempKv.Log.BeginAddress, tempKv.Log.TailAddress); + tempKvSession.Upsert(ref key, ref iter1.GetValue()); + continue; } + + // Done with phase 0; dispose iter1 (the main-log iterator), initialize iter2 (over tempKv), and drop through to phase 1 handling. + iter1.Dispose(); + enumerationPhase = 1; + iter2 = tempKv.Log.Scan(tempKv.Log.BeginAddress, tempKv.Log.TailAddress); } if (enumerationPhase == 1) @@ -157,13 +152,13 @@ public unsafe bool GetNext(out RecordInfo recordInfo) return true; continue; } - else - { - iter2.Dispose(); - enumerationPhase = 2; - } + + // Done with phase 1, so we're done. Drop through to phase 2 handling. + iter2.Dispose(); + enumerationPhase = 2; } + // Phase 2: we're done. This handles both the call that exhausted iter2, and any subsequent calls on this outer iterator. recordInfo = default; return false; } @@ -188,7 +183,6 @@ public bool GetNext(out RecordInfo recordInfo, out Key key, out Value value) key = default; value = default; - return false; } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index e24fbd52c..69d61c6c4 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -180,9 +180,13 @@ internal Status InternalCompletePendingRequestFromContext ContinuePendingRead(request, ref pendingContext, fasterSession), + OperationType.RMW => ContinuePendingRMW(request, ref pendingContext, fasterSession), + OperationType.CONDITIONAL_INSERT => ContinuePendingConditionalCopyToTail(request, ref pendingContext, fasterSession), + _ => throw new FasterException("Unexpected OperationType") + }; var status = HandleOperationStatus(fasterSession.Ctx, ref pendingContext, internalStatus, out newRequest); diff --git a/cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs b/cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs new file mode 100644 index 000000000..48ad0da00 --- /dev/null +++ b/cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Diagnostics; +using System.Runtime.CompilerServices; + +namespace FASTER.core +{ + public unsafe partial class FasterKV : FasterBase, IFasterKV + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private OperationStatus ConditionalCopyToTail(FasterSession fasterSession, + ref PendingContext pendingContext, + ref Key key, ref Input input, ref Value value, ref Output output, ref Context userContext, long lsn, + ref OperationStackContext stackCtx, WriteReason writeReason) + where FasterSession : IFasterSession + { + // We are called by one of ReadFromImmutable, CompactionConditionalCopyToTail, or ContinueConditionalCopyToTail, and stackCtx is set up for the first try. + // minAddress is the stackCtx.recSrc.LatestLogicalAddress; by the time we get here, any IO below that has been done due to PrepareConditionalCopyToTailIO, + // which then went to ContinueConditionalCopyToTail, which evaluated whether the record was found at that level. + while (true) + { + // ConditionalCopyToTail is different in regard to locking from the usual procedures, in that if we find a source record we don't lock--we exit with success. + // So we only do LockTable-based locking and only when we are about to insert at the tail. + if (TryTransientSLock(fasterSession, ref key, ref stackCtx, out OperationStatus status)) + { + try + { + RecordInfo dummyRecordInfo = default; // TryCopyToTail only needs this for readcache record invalidation. + status = TryCopyToTail(ref pendingContext, ref key, ref input, ref value, ref output, ref stackCtx, ref dummyRecordInfo, fasterSession, writeReason); + } + finally + { + stackCtx.HandleNewRecordOnException(this); + TransientSUnlock(fasterSession, ref key, ref stackCtx); + } + } + if (!HandleImmediateRetryStatus(status, fasterSession, ref pendingContext)) + return status; + + // Failed TryCopyToTail, probably a failed CAS due to another record insertion. Re-traverse from the tail to the highest point we just searched + // (which may have gone below HeadAddress). +1 to LatestLogicalAddress because we have examined that already. + var minAddress = stackCtx.recSrc.LatestLogicalAddress + 1; + stackCtx = new(stackCtx.hei.hash); + if (TryFindRecordInMainLogForConditionalCopyToTail(ref key, ref stackCtx, minAddress, out bool needIO)) + return OperationStatus.SUCCESS; + + // Issue IO if necessary, else loop back up and retry the insert. + if (needIO) + return PrepareIOForConditionalCopyToTail(fasterSession, ref pendingContext, ref key, ref input, ref value, ref output, ref userContext, lsn, + ref stackCtx, minAddress, WriteReason.Compaction); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal Status CompactionConditionalCopyToTail(FasterSession fasterSession, ref Key key, ref Input input, ref Value value, + ref Output output, long minAddress) + where FasterSession : IFasterSession + { + Debug.Assert(epoch.ThisInstanceProtected(), "This is called only from Compaction so the epoch should be protected"); + PendingContext pendingContext = new(); + + OperationStackContext stackCtx = new(comparer.GetHashCode64(ref key)); + if (TryFindRecordInMainLogForConditionalCopyToTail(ref key, ref stackCtx, minAddress, out bool needIO)) + return Status.CreateFound(); + + Context userContext = default; + OperationStatus status; + if (needIO) + status = PrepareIOForConditionalCopyToTail(fasterSession, ref pendingContext, ref key, ref input, ref value, ref output, ref userContext, 0L, + ref stackCtx, minAddress, WriteReason.Compaction); + else + status = ConditionalCopyToTail(fasterSession, ref pendingContext, ref key, ref input, ref value, ref output, ref userContext, 0L, ref stackCtx, WriteReason.Compaction); + return HandleOperationStatus(fasterSession.Ctx, ref pendingContext, status, out _); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private OperationStatus PrepareIOForConditionalCopyToTail(FasterSession fasterSession, ref PendingContext pendingContext, + ref Key key, ref Input input, ref Value value, ref Output output, ref Context userContext, long lsn, + ref OperationStackContext stackCtx, long minAddress, WriteReason writeReason) + where FasterSession : IFasterSession + { + pendingContext.type = OperationType.CONDITIONAL_INSERT; + pendingContext.minAddress = minAddress; + pendingContext.writeReason = writeReason; + pendingContext.InitialEntryAddress = Constants.kInvalidAddress; + pendingContext.InitialLatestLogicalAddress = stackCtx.recSrc.LatestLogicalAddress; + + if (!pendingContext.NoKey && pendingContext.key == default) // If this is true, we don't have a valid key + pendingContext.key = hlog.GetKeyContainer(ref key); + if (pendingContext.input == default) + pendingContext.input = fasterSession.GetHeapContainer(ref input); + if (pendingContext.value == default) + pendingContext.value = hlog.GetValueContainer(ref value); + + pendingContext.output = output; + if (pendingContext.output is IHeapConvertible heapConvertible) + heapConvertible.ConvertToHeap(); + + pendingContext.userContext = userContext; + pendingContext.logicalAddress = stackCtx.recSrc.LogicalAddress; + pendingContext.version = fasterSession.Ctx.version; + pendingContext.serialNum = lsn; + + return OperationStatus.RECORD_ON_DISK; + } + } +} diff --git a/cs/src/core/Index/FASTER/Implementation/ContainsKeyInMemory.cs b/cs/src/core/Index/FASTER/Implementation/ContainsKeyInMemory.cs index bdb1a947c..682cce7a8 100644 --- a/cs/src/core/Index/FASTER/Implementation/ContainsKeyInMemory.cs +++ b/cs/src/core/Index/FASTER/Implementation/ContainsKeyInMemory.cs @@ -27,21 +27,8 @@ internal Status InternalContainsKeyInMemory= fromAddress) + if (TryFindRecordInMainLog(ref key, ref stackCtx, fromAddress) && !stackCtx.recSrc.GetInfo().Tombstone) { - var physicalAddress = stackCtx.recSrc.SetPhysicalAddress(); - ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); - if (recordInfo.Invalid || !comparer.Equals(ref key, ref hlog.GetKey(physicalAddress))) - { - logicalAddress = recordInfo.PreviousAddress; - TraceBackForKeyMatch(ref key, logicalAddress, fromAddress, out logicalAddress, out _); - } - - if (stackCtx.recSrc.LogicalAddress < fromAddress) - { - logicalAddress = 0; - return new(StatusCode.NotFound); - } logicalAddress = stackCtx.recSrc.LogicalAddress; return new(StatusCode.Found); } diff --git a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs index 2f4648ec3..e3e66c53d 100644 --- a/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs +++ b/cs/src/core/Index/FASTER/Implementation/ContinuePending.cs @@ -27,14 +27,14 @@ public unsafe partial class FasterKV : FasterBase, IFasterKV /// /// - internal OperationStatus InternalContinuePendingRead(AsyncIOContext request, + internal OperationStatus ContinuePendingRead(AsyncIOContext request, ref PendingContext pendingContext, FasterSession fasterSession) where FasterSession : IFasterSession { ref RecordInfo srcRecordInfo = ref hlog.GetInfoFromBytePointer(request.record.GetValidPointer()); srcRecordInfo.ClearBitsForDiskImages(); - if (request.logicalAddress >= hlog.BeginAddress) + if (request.logicalAddress >= hlog.BeginAddress && request.logicalAddress >= pendingContext.minAddress) { SpinWaitUntilClosed(request.logicalAddress); @@ -81,10 +81,10 @@ internal OperationStatus InternalContinuePendingRead= hlog.ReadOnlyAddress) { - // If this succeeds, we obviously don't need to copy to tail or readcache, so return success. - if (fasterSession.ConcurrentReader(ref key, ref pendingContext.input.Get(), ref stackCtx.recSrc.GetValue(), + // If this succeeds, we don't need to copy to tail or readcache, so return success. + if (fasterSession.ConcurrentReader(ref key, ref pendingContext.input.Get(), ref value, ref pendingContext.output, ref srcRecordInfo, ref readInfo, out EphemeralLockResult lockResult)) return OperationStatus.SUCCESS; if (lockResult == EphemeralLockResult.Failed) @@ -93,8 +93,11 @@ internal OperationStatus InternalContinuePendingRead(fasterSession, ref key, ref stackCtx, ref srcRecordInfo); + TransientSUnlock(fasterSession, ref key, ref stackCtx); } // Must do this *after* Unlocking. Status was set by InternalTryCopyToTail. if (!HandleImmediateRetryStatus(status, fasterSession, ref pendingContext)) - { - // If no copy to tail was done. - if (status == OperationStatus.NOTFOUND || status == OperationStatus.RECORD_ON_DISK) - return OperationStatus.SUCCESS; return status; - } - } // end while (true) } @@ -160,9 +158,13 @@ internal OperationStatus InternalContinuePendingReadSUCCESS /// The value has been successfully updated(or inserted). /// + /// + /// RECORD_ON_DISK + /// We need to issue an IO to continue. + /// /// /// - internal OperationStatus InternalContinuePendingRMW(AsyncIOContext request, + internal OperationStatus ContinuePendingRMW(AsyncIOContext request, ref PendingContext pendingContext, FasterSession fasterSession) where FasterSession : IFasterSession { @@ -228,273 +230,53 @@ internal OperationStatus InternalContinuePendingRMW - /// + /// Continue a pending CONDITIONAL_INSERT operation with the record retrieved from disk, checking whether a record for this key was + /// added since we went pending; in that case this operation must be adjusted to use current data. /// - /// - /// - /// - /// - /// - /// - /// - /// - /// Lower-bound address (addresses are searched from tail (high) to head (low); do not search for "future records" earlier than this) - /// Actual address of existing key record - /// - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal OperationStatus InternalCopyToTailForCompaction( - ref Key key, ref Input input, ref Value value, ref Output output, - long untilAddress, long actualAddress, FasterSession fasterSession) - where FasterSession : IFasterSession - { - Debug.Assert(epoch.ThisInstanceProtected(), "This is currently only called from Compaction so the epoch should be protected"); - OperationStatus status = default; - PendingContext pendingContext = default; - OperationStackContext stackCtx = new(comparer.GetHashCode64(ref key)); - - do - { - // A 'ref' variable must be initialized. If we find a record for the key, we reassign the reference. We don't copy from this source, but we do lock it. - RecordInfo dummyRecordInfo = default; - ref RecordInfo srcRecordInfo = ref dummyRecordInfo; - - // We must check both the readcache as well as transfer if the current record is in the immutable region (Compaction - // allows copying up to SafeReadOnlyAddress). hei must be set in all cases, because ITCTT relies on it. - if (!FindTag(ref stackCtx.hei)) - Debug.Fail("Expected to FindTag in InternalCopyToTailForCompaction"); - stackCtx.SetRecordSourceToHashEntry(hlog); - - if (this.LockTable.IsEnabled && !fasterSession.TryLockTransientShared(ref key, ref stackCtx)) - { - HandleImmediateRetryStatus(OperationStatus.RETRY_LATER, fasterSession, ref pendingContext); - continue; - } - - status = OperationStatus.SUCCESS; - if (actualAddress >= hlog.BeginAddress) - { - // Lookup-based compaction knows the record address. - if (actualAddress >= hlog.HeadAddress) - { - // Since this is for compaction, we don't need to TracebackForKeyMatch; ITCTT will catch the case where a future record was inserted for this key. - stackCtx.recSrc.LogicalAddress = actualAddress; - stackCtx.recSrc.SetPhysicalAddress(); - stackCtx.recSrc.HasMainLogSrc = true; - srcRecordInfo = ref stackCtx.recSrc.GetInfo(); - } - else - { - if (TryFindRecordInMemory(ref key, ref stackCtx, ref pendingContext)) - srcRecordInfo = ref stackCtx.recSrc.GetInfo(); - } - } - else - { - // Scan compaction does not know the address, so we must traverse. This is similar to what ITCTT does, but we update untilAddress so it's not done twice. - Debug.Assert(actualAddress == Constants.kUnknownAddress, "Unexpected address in compaction"); - - if (TryFindRecordInMemory(ref key, ref stackCtx, hlog.HeadAddress)) - { - if (stackCtx.recSrc.LogicalAddress > untilAddress) // Same check ITCTT does - status = OperationStatus.NOTFOUND; - else - { - untilAddress = stackCtx.recSrc.LatestLogicalAddress; - srcRecordInfo = ref stackCtx.recSrc.GetInfo(); - } - } - } - - if (status == OperationStatus.SUCCESS) - { - try - { - status = InternalTryCopyToTail(ref pendingContext, ref key, ref input, ref value, ref output, - ref stackCtx, ref srcRecordInfo, untilAddress, fasterSession, WriteReason.Compaction); - } - finally - { - stackCtx.HandleNewRecordOnException(this); - TransientSUnlock(fasterSession, ref key, ref stackCtx, ref srcRecordInfo); - } - } - } while (HandleImmediateRetryStatus(status, fasterSession, ref pendingContext)); - return status; - } - - /// - /// Helper function for trying to copy existing immutable records (at foundLogicalAddress) to the tail. - /// - /// - /// - /// - /// The record value; may be superseded by a default value for expiration - /// - /// Contains the and structures for this operation, - /// and allows passing back the newLogicalAddress for invalidation in the case of exceptions. - /// if ., the recordInfo to transfer locks from. - /// The expected *main-log* address of the record being copied. This has different meanings depending on the operation: - /// - /// If this is Read() doing a copy of a record in the immutable region, this is the logical address of the source record - /// Otherwise, if this is Read(), it is the latestLogicalAddress from the Read() - /// If this is Compact(), this is a lower-bound address (addresses are searched from tail (high) to head (low); do not search for "future records" earlier than this) - /// - /// - /// - /// The reason for this operation. + /// record read from the disk. + /// internal context for the pending RMW operation + /// Callback functions. /// - /// - /// RETRY_NOW: failed CAS, so no copy done. This routine deals entirely with new records, so will not encounter Sealed records - /// RECORD_ON_DISK: unable to determine if record present beyond expectedLogicalAddress, so no copy done - /// NOTFOUND: record was found in memory beyond expectedLogicalAddress, so no copy done - /// SUCCESS: no record found beyond expectedLogicalAddress, so copy was done - /// + /// + /// + /// Value + /// Description + /// + /// + /// SUCCESS + /// The value has been successfully inserted, or was found above the specified address. + /// + /// + /// RECORD_ON_DISK + /// We need to issue an IO to continue. + /// + /// /// - internal OperationStatus InternalTryCopyToTail(ref PendingContext pendingContext, - ref Key key, ref Input input, ref Value recordValue, ref Output output, ref OperationStackContext stackCtx, - ref RecordInfo srcRecordInfo, long untilLogicalAddress, FasterSession fasterSession, WriteReason reason) + internal OperationStatus ContinuePendingConditionalCopyToTail(AsyncIOContext request, + ref PendingContext pendingContext, FasterSession fasterSession) where FasterSession : IFasterSession { - #region Trace back for newly-inserted record in HybridLog - if (stackCtx.recSrc.LatestLogicalAddress > untilLogicalAddress) - { - // Entries exist in the log above our last-checked address; another session inserted them after our FindTag. See if there is a newer entry for this key. - var minAddress = untilLogicalAddress < hlog.HeadAddress ? hlog.HeadAddress : untilLogicalAddress; - TraceBackForKeyMatch(ref key, stackCtx.recSrc.LatestLogicalAddress, minAddress, out long foundLogicalAddress, out _); - if (foundLogicalAddress > untilLogicalAddress) - { - // Note: ReadAtAddress bails here by design; we assume anything in the readcache is the latest version. - // Any loop to retrieve prior versions should set ReadFlags.DisableReadCache*; see ReadAddressTests. - return foundLogicalAddress < hlog.HeadAddress ? OperationStatus.RECORD_ON_DISK : OperationStatus.NOTFOUND; - } - - // Update untilLogicalAddress to the latest address we've checked; recSrc.LatestLogicalAddress can be updated by VerifyReadCacheSplicePoint. - untilLogicalAddress = stackCtx.recSrc.LatestLogicalAddress; - } - #endregion - - #region Create new copy in mutable region - var (actualSize, allocatedSize) = hlog.GetRecordSize(ref key, ref recordValue); - - UpsertInfo upsertInfo = new() - { - Version = fasterSession.Ctx.version, - SessionID = fasterSession.Ctx.sessionID, - Address = stackCtx.recSrc.HasInMemorySrc ? stackCtx.recSrc.LogicalAddress : Constants.kInvalidAddress, - KeyHash = stackCtx.hei.hash - }; - - StatusCode advancedStatusCode = StatusCode.Found; - - // A 'ref' variable must be initialized; we'll assign it to the new record we allocate. - RecordInfo dummyRecordInfo = default; - ref RecordInfo newRecordInfo = ref dummyRecordInfo; - AllocatorBase localLog = hlog; - - #region Allocate new record and call SingleWriter - long newLogicalAddress, newPhysicalAddress; - bool copyToReadCache = UseReadCache && reason == WriteReason.CopyToReadCache; - long readcacheNewAddressBit = 0L; - if (copyToReadCache) - { - localLog = readcache; - readcacheNewAddressBit = Constants.kReadCacheBitMask; + // If the key was found at or above minAddress, do nothing. + if (request.logicalAddress >= pendingContext.minAddress) + return OperationStatus.SUCCESS; - if (!TryAllocateRecordReadCache(ref pendingContext, ref stackCtx, allocatedSize, out newLogicalAddress, out newPhysicalAddress, out OperationStatus status)) - return status; - - newRecordInfo = ref WriteNewRecordInfo(ref key, readcache, newPhysicalAddress, inNewVersion: false, tombstone: false, stackCtx.hei.Address); - - upsertInfo.Address = Constants.kInvalidAddress; // We do not expose readcache addresses - advancedStatusCode |= StatusCode.CopiedRecordToReadCache; - reason = WriteReason.CopyToReadCache; - } - else - { - if (!TryAllocateRecord(ref pendingContext, ref stackCtx, allocatedSize, recycle: true, out newLogicalAddress, out newPhysicalAddress, out OperationStatus status)) - return status; - - newRecordInfo = ref WriteNewRecordInfo(ref key, hlog, newPhysicalAddress, inNewVersion: fasterSession.Ctx.InNewVersion, tombstone: false, stackCtx.recSrc.LatestLogicalAddress); - - upsertInfo.Address = newLogicalAddress; - advancedStatusCode |= StatusCode.CopiedRecord; - if (reason == WriteReason.CopyToReadCache) - reason = WriteReason.CopyToTail; - } - - stackCtx.SetNewRecord(newLogicalAddress | readcacheNewAddressBit); - upsertInfo.RecordInfo = newRecordInfo; - - if (!fasterSession.SingleWriter(ref key, ref input, ref recordValue, ref localLog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), - ref output, ref newRecordInfo, ref upsertInfo, reason)) - { - // No SaveAlloc here, but TODO this record could be reused later. - stackCtx.SetNewRecordInvalid(ref newRecordInfo); - return (upsertInfo.Action == UpsertAction.CancelOperation) ? OperationStatus.CANCELED : OperationStatus.SUCCESS; - } - - #endregion Allocate new record and call SingleWriter - - // Insert the new record by CAS'ing either directly into the hash entry or splicing into the readcache/mainlog boundary. - // It is possible that we will successfully CAS but subsequently fail validation. - bool success = true, casSuccess = false; - OperationStatus failStatus = OperationStatus.RETRY_NOW; // Default to CAS-failed status, which does not require an epoch refresh - if (!copyToReadCache && DoEphemeralLocking) - newRecordInfo.InitializeLockShared(); // For PostSingleWriter - if (copyToReadCache || (stackCtx.recSrc.LowestReadCacheLogicalAddress == Constants.kInvalidAddress)) - { - Debug.Assert(!stackCtx.hei.IsReadCache || (readcacheNewAddressBit != 0), $"Inconsistent IsReadCache ({stackCtx.hei.IsReadCache}) vs. readcacheNewAddressBit ({readcacheNewAddressBit})"); - - // ReadCache entries, and main-log records when there are no readcache records, are CAS'd in as the first entry in the hash chain. - success = casSuccess = stackCtx.hei.TryCAS(newLogicalAddress | readcacheNewAddressBit); - - if (success && copyToReadCache && stackCtx.recSrc.LowestReadCacheLogicalAddress != Constants.kInvalidAddress) - { - // If someone added a main-log entry for this key from an update or CTT while we were inserting the new readcache record, then the new - // readcache record is obsolete and must be Invalidated. (If LowestReadCacheLogicalAddress == kInvalidAddress, then the CAS would have - // failed in this case.) If this was the first readcache record in the chain, then once we CAS'd it in someone could have spliced into - // it, but then that splice will call ReadCacheCheckTailAfterSplice and invalidate it if it's the same key. - success = EnsureNoNewMainLogRecordWasSpliced(ref key, stackCtx.recSrc, untilLogicalAddress, ref failStatus); - } - } - else - { - Debug.Assert(readcacheNewAddressBit == 0, "Must not be inserting a readcache record here"); + // Prepare to copy to tail. Use data from pendingContext, not request; we're only made it to this line if the key was not found, and thus the request was not populated. + ref Key key = ref pendingContext.key.Get(); + OperationStackContext stackCtx = new(comparer.GetHashCode64(ref key)); - // We are doing CopyToTail; we may have a source record from either main log (Compaction) or ReadCache, or have a LockTable lock. - Debug.Assert(reason == WriteReason.CopyToTail || reason == WriteReason.Compaction, "Expected WriteReason.CopyToTail or .Compaction"); - success = casSuccess = SpliceIntoHashChainAtReadCacheBoundary(ref key, ref stackCtx, newLogicalAddress); - } + // See if the record was added above the highest address we checked before issuing the IO. + var minAddress = pendingContext.InitialLatestLogicalAddress + 1; + if (TryFindRecordInMainLogForConditionalCopyToTail(ref key, ref stackCtx, minAddress, out bool needIO)) + return OperationStatus.SUCCESS; - if (success) - { - if (!copyToReadCache) - PostInsertAtTail(ref key, ref stackCtx, ref srcRecordInfo); - } - else - { - stackCtx.SetNewRecordInvalid(ref newRecordInfo); - - if (!casSuccess) - { - // Let user dispose similar to a deleted record, and save for retry, *only* if CAS failed; otherwise we must preserve it in the chain. - fasterSession.DisposeSingleWriter(ref localLog.GetKey(newPhysicalAddress), ref input, ref recordValue, ref localLog.GetValue(newPhysicalAddress), ref output, ref newRecordInfo, ref upsertInfo, reason); - newRecordInfo.PreviousAddress = Constants.kTempInvalidAddress; // Necessary for ReadCacheEvict, but cannot be kInvalidAddress or we have recordInfo.IsNull - if (!copyToReadCache) - SaveAllocationForRetry(ref pendingContext, newLogicalAddress, newPhysicalAddress, allocatedSize); - } - return failStatus; - } + // HeadAddress may have risen above minAddress; if so, we need IO. + if (needIO) + return PrepareIOForConditionalCopyToTail(fasterSession, ref pendingContext, ref key, ref pendingContext.input.Get(), ref pendingContext.value.Get(), + ref pendingContext.output, ref pendingContext.userContext, pendingContext.serialNum, ref stackCtx, minAddress, WriteReason.Compaction); - // Success. - pendingContext.recordInfo = newRecordInfo; - pendingContext.logicalAddress = upsertInfo.Address; - fasterSession.PostSingleWriter(ref key, ref input, ref recordValue, ref localLog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), - ref output, ref newRecordInfo, ref upsertInfo, reason); - stackCtx.ClearNewRecord(); - return OperationStatusUtils.AdvancedOpCode(OperationStatus.SUCCESS, advancedStatusCode); -#endregion + // No IO needed. + return ConditionalCopyToTail(fasterSession, ref pendingContext, ref key, ref pendingContext.input.Get(), ref pendingContext.value.Get(), + ref pendingContext.output, ref pendingContext.userContext, pendingContext.serialNum, ref stackCtx, pendingContext.writeReason); } } } \ No newline at end of file diff --git a/cs/src/core/Index/FASTER/Implementation/FindRecord.cs b/cs/src/core/Index/FASTER/Implementation/FindRecord.cs index a6c62317e..e9ce559ea 100644 --- a/cs/src/core/Index/FASTER/Implementation/FindRecord.cs +++ b/cs/src/core/Index/FASTER/Implementation/FindRecord.cs @@ -34,6 +34,7 @@ private bool TryFindRecordInMemory(ref Key key, ref Oper return TryFindRecordInMainLog(ref key, ref stackCtx, minAddress: minLog); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryFindRecordInMainLog(ref Key key, ref OperationStackContext stackCtx, long minAddress) { Debug.Assert(!stackCtx.recSrc.HasInMemorySrc, "Should not have found record before this call"); @@ -45,6 +46,36 @@ internal bool TryFindRecordInMainLog(ref Key key, ref OperationStackContext stackCtx, long minAddress, out bool needIO) + { + // minAddress is inclusive + if (!FindTag(ref stackCtx.hei)) + return needIO = false; + stackCtx.SetRecordSourceToHashEntry(hlog); + + if (!stackCtx.hei.IsReadCache) + { + if (stackCtx.hei.Address < minAddress) + return needIO = false; + if (stackCtx.hei.Address < hlog.HeadAddress) + { + needIO = stackCtx.hei.Address >= hlog.BeginAddress; + return false; + } + } + + if (UseReadCache) + SkipReadCache(ref stackCtx, out _); // Where this is called, we have no dependency on source addresses so we don't care if it Refreshed + + needIO = false; + if (TryFindRecordInMainLog(ref key, ref stackCtx, minAddress < hlog.HeadAddress ? hlog.HeadAddress : minAddress)) + return true; + + needIO = stackCtx.recSrc.LogicalAddress >= minAddress && stackCtx.recSrc.LogicalAddress < hlog.HeadAddress && stackCtx.recSrc.LogicalAddress >= hlog.BeginAddress; + return false; + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private bool TraceBackForKeyMatch(ref Key key, ref RecordSource recSrc, long minAddress) { diff --git a/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs b/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs index 0b0aa9d8e..c349d7624 100644 --- a/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs +++ b/cs/src/core/Index/FASTER/Implementation/HashEntryInfo.cs @@ -58,7 +58,7 @@ internal HashEntryInfo(long hash) /// /// Return whether the has been updated /// - internal bool IsNotCurrent => this.CurrentAddress != this.Address; + internal bool IsCurrent => this.CurrentAddress == this.Address; /// /// Whether the original address for this hash entry (at the time of FindTag, etc.) is a readcache address. @@ -68,7 +68,7 @@ internal HashEntryInfo(long hash) /// /// Whether the current address for this hash entry (possibly modified after FindTag, etc.) is a readcache address. /// - internal bool IsCurrentReadCache => IsReadCache(this.bucket->bucket_entries[this.slot]); + internal bool IsCurrentReadCache => IsReadCache(this.CurrentAddress); /// /// Set members to the current entry (which may have been updated (via CAS) in the bucket after FindTag, etc.) diff --git a/cs/src/core/Index/FASTER/Implementation/Helpers.cs b/cs/src/core/Index/FASTER/Implementation/Helpers.cs index e0015a232..9f765fffb 100644 --- a/cs/src/core/Index/FASTER/Implementation/Helpers.cs +++ b/cs/src/core/Index/FASTER/Implementation/Helpers.cs @@ -101,16 +101,20 @@ private bool CASRecordIntoChain(ref Key key, ref OperationStackContext stackCtx, ref RecordInfo srcRecordInfo) + private void PostCopyToTail(ref Key key, ref OperationStackContext stackCtx, ref RecordInfo srcRecordInfo) + => PostCopyToTail(ref key, ref stackCtx, ref srcRecordInfo, stackCtx.hei.Address); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void PostCopyToTail(ref Key key, ref OperationStackContext stackCtx, ref RecordInfo srcRecordInfo, long highestReadCacheAddressChecked) { if (stackCtx.recSrc.HasReadCacheSrc) - srcRecordInfo.CloseAtomic(); + srcRecordInfo.SetInvalidAtomic(); - // If we are not using the LockTable, then we ElideAndReinsertReadCacheChain ensured no conflict between the readcache - // and the newly-inserted record. Otherwise we spliced it in directly, in which case a competing readcache record may - // have been inserted; if so, invalidate it. + // If we are not using the LockTable, then ElideAndReinsertReadCacheChain ensured no conflict between the readcache and the newly-inserted + // record. Otherwise we spliced it in directly, in which case a competing readcache record may have been inserted; if so, invalidate it. + // highestReadCacheAddressChecked is hei.Address unless we are from ConditionalCopyToTail, which may have skipped the readcache before this. if (UseReadCache && LockTable.IsEnabled) - ReadCacheCheckTailAfterSplice(ref key, ref stackCtx.hei); + ReadCacheCheckTailAfterSplice(ref key, ref stackCtx.hei, highestReadCacheAddressChecked); } // Called after BlockAllocate or anything else that could shift HeadAddress, to adjust addresses or return false for RETRY as needed. diff --git a/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs b/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs index fd849c916..e07ec8ffe 100644 --- a/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs +++ b/cs/src/core/Index/FASTER/Implementation/InternalDelete.cs @@ -261,7 +261,7 @@ private OperationStatus CreateNewRecordDelete(ref if (UseReadCache) { // DisableReadCacheReads is used by readAtAddress, e.g. to backtrack to previous versions. - if (pendingContext.DisableReadCacheReads || pendingContext.NoKey) + if (pendingContext.NoKey) { SkipReadCache(ref stackCtx, out _); // This may refresh, but we haven't examined HeadAddress yet stackCtx.SetRecordSourceToHashEntry(hlog); @@ -101,7 +101,7 @@ internal OperationStatus InternalRead(ref } #endregion - // These track the latest main-log address in the tag chain; InternalContinuePendingRead uses them to check for new inserts. + // Track the latest searched-below addresses. They are the same if there are no readcache records. pendingContext.InitialEntryAddress = stackCtx.hei.Address; pendingContext.InitialLatestLogicalAddress = stackCtx.recSrc.LatestLogicalAddress; @@ -119,7 +119,7 @@ internal OperationStatus InternalRead(ref else if (stackCtx.recSrc.LogicalAddress >= hlog.HeadAddress) { // Immutable region - status = ReadFromImmutableRegion(ref key, ref input, ref output, useStartAddress, ref stackCtx, ref pendingContext, fasterSession); + status = ReadFromImmutableRegion(ref key, ref input, ref output, ref userContext, lsn, useStartAddress, ref stackCtx, ref pendingContext, fasterSession); if (status == OperationStatus.ALLOCATE_FAILED && pendingContext.IsAsync) // May happen due to CopyToTailFromReadOnly goto CreatePendingContext; return status; @@ -202,7 +202,7 @@ private bool ReadFromCache(ref Key key, r } finally { - TransientSUnlock(fasterSession, ref key, ref stackCtx, ref srcRecordInfo); + TransientSUnlock(fasterSession, ref key, ref stackCtx); } } return false; @@ -248,12 +248,12 @@ private OperationStatus ReadFromMutableRegion(fasterSession, ref key, ref stackCtx, ref srcRecordInfo); + TransientSUnlock(fasterSession, ref key, ref stackCtx); } } private OperationStatus ReadFromImmutableRegion(ref Key key, ref Input input, ref Output output, - bool useStartAddress, ref OperationStackContext stackCtx, + ref Context userContext, long lsn, bool useStartAddress, ref OperationStackContext stackCtx, ref PendingContext pendingContext, FasterSession fasterSession) where FasterSession : IFasterSession { @@ -283,19 +283,13 @@ private OperationStatus ReadFromImmutableRegion(fasterSession, ref key, ref stackCtx, ref srcRecordInfo); + TransientSUnlock(fasterSession, ref key, ref stackCtx); } } - } } diff --git a/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs b/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs index 92aaeb1af..d3f0d97a1 100644 --- a/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs +++ b/cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs @@ -173,9 +173,12 @@ internal OperationStatus InternalUpsert(r Debug.Assert(latchDestination == LatchDestination.CreatePendingContext, $"Upsert CreatePendingContext encountered latchDest == {latchDestination}"); { pendingContext.type = OperationType.UPSERT; - if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key); - if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input); - if (pendingContext.value == default) pendingContext.value = hlog.GetValueContainer(ref value); + if (pendingContext.key == default) + pendingContext.key = hlog.GetKeyContainer(ref key); + if (pendingContext.input == default) + pendingContext.input = fasterSession.GetHeapContainer(ref input); + if (pendingContext.value == default) + pendingContext.value = hlog.GetValueContainer(ref value); pendingContext.output = output; if (pendingContext.output is IHeapConvertible heapConvertible) @@ -340,7 +343,7 @@ private OperationStatus CreateNewRecordUpsert(FasterSess } [MethodImpl(MethodImplOptions.AggressiveInlining)] - void TransientSUnlock(FasterSession fasterSession, ref Key key, - ref OperationStackContext stackCtx, ref RecordInfo srcRecordInfo) + private void TransientSUnlock(FasterSession fasterSession, ref Key key, ref OperationStackContext stackCtx) where FasterSession : IFasterSession { if (stackCtx.recSrc.HasTransientLock) diff --git a/cs/src/core/Index/FASTER/Implementation/ReadCache.cs b/cs/src/core/Index/FASTER/Implementation/ReadCache.cs index 46008b7a1..ffb7f0cc4 100644 --- a/cs/src/core/Index/FASTER/Implementation/ReadCache.cs +++ b/cs/src/core/Index/FASTER/Implementation/ReadCache.cs @@ -62,7 +62,7 @@ internal bool FindInReadCache(ref Key key, ref OperationStackContext // Update the leading LatestLogicalAddress to recordInfo.PreviousAddress, and if that is a main log record, break out. stackCtx.recSrc.LatestLogicalAddress = recordInfo.PreviousAddress & ~Constants.kReadCacheBitMask; - if (!recordInfo.PreviousAddressIsReadCache) + if (!IsReadCache(recordInfo.PreviousAddress)) goto InMainLog; } @@ -117,47 +117,47 @@ private bool DetachAndReattachReadCacheChain(ref Key key, ref OperationStackCont { // We are not doing LockTable-based locking, so the only place non-ReadCacheEvict codes updates the chain membership is at the HashBucketEntry; // no thread will try to splice at the readcache/log boundary. hei.Address is the highest readcache address. - HashBucketEntry entry = new() { word = stackCtx.hei.Address }; - long highestRcAddress = entry.Address, lowestRcAddress = highestRcAddress; + long highestRcAddress = stackCtx.hei.Address, lowestRcAddress = highestRcAddress; + Debug.Assert(IsReadCache(highestRcAddress), "Expected highestRcAddress to be .ReadCache"); // First detach the chain by CAS'ing in the new log record (whose .PreviousAddress = recSrc.LatestLogicalAddress). if (!stackCtx.hei.TryCAS(newLogicalAddress)) return false; - if (entry.AbsoluteAddress < readcache.HeadAddress) + if (AbsoluteAddress(highestRcAddress) < readcache.HeadAddress) goto Success; - // Traverse from the old address at hash entry to the lowestReadCacheLogicalAddress, invalidating any record matching the key. - for (bool found = false; entry.ReadCache && entry.AbsoluteAddress >= readcache.HeadAddress; /* incremented in loop */) + // Traverse from highestRcAddress to the splice point, invalidating any record matching the key. + for (bool found = false; /* tested in loop */; /* incremented in loop */) { - lowestRcAddress = entry.Address; - var physicalAddress = readcache.GetPhysicalAddress(entry.AbsoluteAddress); + var physicalAddress = readcache.GetPhysicalAddress(AbsoluteAddress(lowestRcAddress)); ref RecordInfo recordInfo = ref readcache.GetInfo(physicalAddress); if (!found && !recordInfo.Invalid && comparer.Equals(ref key, ref readcache.GetKey(physicalAddress))) { found = true; recordInfo.SetInvalidAtomic(); // Atomic needed due to other threads (e.g. ReadCacheEvict) possibly being in this chain before we detached it. } - entry.word = recordInfo.PreviousAddress; - } - if (AbsoluteAddress(highestRcAddress) >= readcache.HeadAddress) - { - // Splice the new recordInfo into the local chain. Used atomic due to other threads (e.g. ReadCacheEvict) possibly being in this - // before we detached it, and setting the record to Invalid (no other thread will be updating anything else in the chain, though). - ref RecordInfo rcri = ref readcache.GetInfo(readcache.GetPhysicalAddress(AbsoluteAddress(lowestRcAddress))); - while (!rcri.TryUpdateAddress(rcri.PreviousAddress, newLogicalAddress)) - Thread.Yield(); - - // Now try to CAS the chain into the HashBucketEntry. If it fails, give up; we lose those readcache records. - // Trying to handle conflicts would require evaluating whether other threads had inserted keys in our chain, and it's too rare to worry about. - if (stackCtx.hei.TryCAS(highestRcAddress)) + // See if we're at the last entry in the readcache prefix. + if (!IsReadCache(recordInfo.PreviousAddress) || AbsoluteAddress(recordInfo.PreviousAddress) < readcache.HeadAddress) { - // If we go below readcache.HeadAddress ReadCacheEvict may race past us, so make sure the lowest address is still in range. - while (lowestRcAddress < readcache.HeadAddress) - lowestRcAddress = ReadCacheEvictChain(readcache.HeadAddress, ref stackCtx.hei); + // Splice the new recordInfo into the local chain. Use atomic due to other threads (e.g. ReadCacheEvict) possibly being in this + // before we detached it, and setting the record to Invalid (no other thread will be updating anything else in the chain, though). + while (!recordInfo.TryUpdateAddress(recordInfo.PreviousAddress, newLogicalAddress)) + Thread.Yield(); + + // Now try to CAS the chain into the HashBucketEntry. If it fails, give up; we lose those readcache records. + // Trying to handle conflicts would require evaluating whether other threads had inserted keys in our chain, and it's too rare to worry about. + if (stackCtx.hei.TryCAS(highestRcAddress)) + { + // If we go below readcache.HeadAddress ReadCacheEvict may race past us, so make sure the lowest address is still in range. + while (lowestRcAddress < readcache.HeadAddress) + lowestRcAddress = ReadCacheEvictChain(readcache.HeadAddress, ref stackCtx.hei); + } + goto Success; } + lowestRcAddress = recordInfo.PreviousAddress; } - + Success: stackCtx.UpdateRecordSourceToCurrentHashEntry(hlog); return true; @@ -194,7 +194,7 @@ internal void SkipReadCache(ref OperationStackContext stackCtx, out stackCtx.recSrc.LowestReadCachePhysicalAddress = readcache.GetPhysicalAddress(stackCtx.recSrc.LowestReadCacheLogicalAddress); RecordInfo recordInfo = readcache.GetInfo(stackCtx.recSrc.LowestReadCachePhysicalAddress); - if (!recordInfo.PreviousAddressIsReadCache) + if (!IsReadCache(recordInfo.PreviousAddress)) { stackCtx.recSrc.LatestLogicalAddress = recordInfo.PreviousAddress; stackCtx.recSrc.LogicalAddress = stackCtx.recSrc.LatestLogicalAddress; @@ -232,18 +232,18 @@ private void SkipReadCacheBucket(HashBucket* bucket) // Called after a readcache insert, to make sure there was no race with another session that added a main-log record at the same time. [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool EnsureNoNewMainLogRecordWasSpliced(ref Key key, RecordSource recSrc, long untilLogicalAddress, ref OperationStatus failStatus) + private bool EnsureNoNewMainLogRecordWasSpliced(ref Key key, RecordSource recSrc, long highestSearchedAddress, ref OperationStatus failStatus) { bool success = true; ref RecordInfo lowest_rcri = ref readcache.GetInfo(recSrc.LowestReadCachePhysicalAddress); - Debug.Assert(!lowest_rcri.PreviousAddressIsReadCache, "lowest-rcri.PreviousAddress should be a main-log address"); - if (lowest_rcri.PreviousAddress > untilLogicalAddress) + Debug.Assert(!IsReadCache(lowest_rcri.PreviousAddress), "lowest-rcri.PreviousAddress should be a main-log address"); + if (lowest_rcri.PreviousAddress > highestSearchedAddress) { // Someone added a new record in the splice region. It won't be readcache; that would've been added at tail. See if it's our key. - var minAddress = untilLogicalAddress > hlog.HeadAddress ? untilLogicalAddress : hlog.HeadAddress; + var minAddress = highestSearchedAddress > hlog.HeadAddress ? highestSearchedAddress : hlog.HeadAddress; if (TraceBackForKeyMatch(ref key, lowest_rcri.PreviousAddress, minAddress + 1, out long prevAddress, out _)) success = false; - else if (prevAddress > untilLogicalAddress && prevAddress < hlog.HeadAddress) + else if (prevAddress > highestSearchedAddress && prevAddress < hlog.HeadAddress) { // One or more records were inserted and escaped to disk during the time of this Read/PENDING operation, untilLogicalAddress // is below hlog.HeadAddress, and there are one or more inserted records between them: @@ -269,13 +269,13 @@ private bool EnsureNoNewMainLogRecordWasSpliced(ref Key key, RecordSource untilEntry.Address || !untilEntry.ReadCache)) @@ -327,7 +327,7 @@ internal void ReadCacheEvict(long rcLogicalAddress, long rcToLogicalAddress) // 2. Call FindTag on that key in the main fkv to get the start of the hash chain. // 3. Walk the hash chain's readcache entries, removing records in the "to be removed" range. // Do not remove Invalid records outside this range; that leads to race conditions. - Debug.Assert(!rcRecordInfo.PreviousAddressIsReadCache || rcRecordInfo.AbsolutePreviousAddress < rcLogicalAddress, "Invalid record ordering in readcache"); + Debug.Assert(!IsReadCache(rcRecordInfo.PreviousAddress) || AbsoluteAddress(rcRecordInfo.PreviousAddress) < rcLogicalAddress, "Invalid record ordering in readcache"); // Find the hash index entry for the key in the FKV's hash table. ref Key key = ref readcache.GetKey(rcPhysicalAddress); diff --git a/cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs b/cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs new file mode 100644 index 000000000..d1de3a36e --- /dev/null +++ b/cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Threading; + +namespace FASTER.core +{ + public unsafe partial class FasterKV : FasterBase, IFasterKV + { + /// + /// Copy a record from the disk to the read cache. + /// + /// + /// + /// + /// + /// Contains the and structures for this operation, + /// and allows passing back the newLogicalAddress for invalidation in the case of exceptions. + /// + /// True if copied to readcache, else false; readcache is "best effort", and we don't fail the read process, or slow it down by retrying. + /// + internal bool TryCopyToReadCache(FasterSession fasterSession, ref PendingContext pendingContext, + ref Key key, ref Input input, ref Value recordValue, ref OperationStackContext stackCtx) + where FasterSession : IFasterSession + { + #region Create new copy in mutable region + var (actualSize, allocatedSize) = hlog.GetRecordSize(ref key, ref recordValue); + + #region Allocate new record and call SingleWriter + + if (!TryAllocateRecordReadCache(ref pendingContext, ref stackCtx, allocatedSize, out long newLogicalAddress, out long newPhysicalAddress, out OperationStatus status)) + return false; + ref var newRecordInfo = ref WriteNewRecordInfo(ref key, readcache, newPhysicalAddress, inNewVersion: false, tombstone: false, stackCtx.hei.Address); + stackCtx.SetNewRecord(newLogicalAddress | Constants.kReadCacheBitMask); + + UpsertInfo upsertInfo = new() + { + Version = fasterSession.Ctx.version, + SessionID = fasterSession.Ctx.sessionID, + Address = Constants.kInvalidAddress, // We do not expose readcache addresses + KeyHash = stackCtx.hei.hash, + RecordInfo = newRecordInfo + }; + + Output output = default; + if (!fasterSession.SingleWriter(ref key, ref input, ref recordValue, ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), + ref output, ref newRecordInfo, ref upsertInfo, WriteReason.CopyToReadCache)) + { + stackCtx.SetNewRecordInvalid(ref newRecordInfo); + return false; + } + + #endregion Allocate new record and call SingleWriter + + // Insert the new record by CAS'ing either directly into the hash entry or splicing into the readcache/mainlog boundary. + // It is possible that we will successfully CAS but subsequently fail validation. + OperationStatus failStatus = OperationStatus.RETRY_NOW; // Default to CAS-failed status, which does not require an epoch refresh + + // ReadCache entries are CAS'd in as the first entry in the hash chain. + var success = stackCtx.hei.TryCAS(newLogicalAddress | Constants.kReadCacheBitMask); + var casSuccess = success; + + if (success && stackCtx.recSrc.LowestReadCacheLogicalAddress != Constants.kInvalidAddress) + { + // If someone added a main-log entry for this key from an update or CTT while we were inserting the new readcache record, then the new + // readcache record is obsolete and must be Invalidated. (If LowestReadCacheLogicalAddress == kInvalidAddress, then the CAS would have + // failed in this case.) If this was the first readcache record in the chain, then once we CAS'd it in someone could have spliced into + // it, but then that splice will call ReadCacheCheckTailAfterSplice and invalidate it if it's the same key. + success = EnsureNoNewMainLogRecordWasSpliced(ref key, stackCtx.recSrc, pendingContext.InitialLatestLogicalAddress, ref failStatus); + } + + if (!success) + { + stackCtx.SetNewRecordInvalid(ref newRecordInfo); + + if (!casSuccess) + { + // Let user dispose similar to a deleted record, and save for retry, *only* if CAS failed; otherwise we must preserve it in the chain. + fasterSession.DisposeSingleWriter(ref readcache.GetKey(newPhysicalAddress), ref input, ref recordValue, ref readcache.GetValue(newPhysicalAddress), + ref output, ref newRecordInfo, ref upsertInfo, WriteReason.CopyToReadCache); + newRecordInfo.PreviousAddress = Constants.kTempInvalidAddress; // Necessary for ReadCacheEvict, but cannot be kInvalidAddress or we have recordInfo.IsNull + } + return false; + } + + // Success. + pendingContext.recordInfo = newRecordInfo; + pendingContext.logicalAddress = upsertInfo.Address; + fasterSession.PostSingleWriter(ref key, ref input, ref recordValue, ref readcache.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output, + ref newRecordInfo, ref upsertInfo, WriteReason.CopyToReadCache); + stackCtx.ClearNewRecord(); + return true; + #endregion + } + } +} diff --git a/cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs b/cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs new file mode 100644 index 000000000..5f5801d76 --- /dev/null +++ b/cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs @@ -0,0 +1,103 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Diagnostics; + +namespace FASTER.core +{ + public unsafe partial class FasterKV : FasterBase, IFasterKV + { + /// + /// Copy a record from the immutable region of the log, from the disk, or from ConditionalCopyToTail to the tail of the log (or splice into the log/readcache boundary). + /// + /// + /// + /// + /// + /// + /// Contains the and structures for this operation, + /// and allows passing back the newLogicalAddress for invalidation in the case of exceptions. + /// if ., the recordInfo to close, if transferring. + /// + /// The reason for this operation. + /// + /// + /// RETRY_NOW: failed CAS, so no copy done. This routine deals entirely with new records, so will not encounter Sealed records + /// SUCCESS: copy was done + /// + /// + internal OperationStatus TryCopyToTail(ref PendingContext pendingContext, + ref Key key, ref Input input, ref Value value, ref Output output, ref OperationStackContext stackCtx, + ref RecordInfo srcRecordInfo, FasterSession fasterSession, WriteReason reason) + where FasterSession : IFasterSession + { + #region Create new copy in mutable region + var (actualSize, allocatedSize) = hlog.GetRecordSize(ref key, ref value); + + #region Allocate new record and call SingleWriter + + if (!TryAllocateRecord(ref pendingContext, ref stackCtx, allocatedSize, recycle: true, out long newLogicalAddress, out long newPhysicalAddress, out OperationStatus status)) + return status; + ref var newRecordInfo = ref WriteNewRecordInfo(ref key, hlog, newPhysicalAddress, inNewVersion: fasterSession.Ctx.InNewVersion, tombstone: false, stackCtx.recSrc.LatestLogicalAddress); + stackCtx.SetNewRecord(newLogicalAddress); + + UpsertInfo upsertInfo = new() + { + Version = fasterSession.Ctx.version, + SessionID = fasterSession.Ctx.sessionID, + Address = newLogicalAddress, + KeyHash = stackCtx.hei.hash, + RecordInfo = newRecordInfo + }; + + if (!fasterSession.SingleWriter(ref key, ref input, ref value, ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), + ref output, ref newRecordInfo, ref upsertInfo, reason)) + { + // No SaveAlloc here as we won't retry, but TODO this record could be reused later. + stackCtx.SetNewRecordInvalid(ref newRecordInfo); + return (upsertInfo.Action == UpsertAction.CancelOperation) ? OperationStatus.CANCELED : OperationStatus.SUCCESS; + } + + #endregion Allocate new record and call SingleWriter + + // Insert the new record by CAS'ing either directly into the hash entry or splicing into the readcache/mainlog boundary. + bool success; + OperationStatus failStatus = OperationStatus.RETRY_NOW; // Default to CAS-failed status, which does not require an epoch refresh + if (DoEphemeralLocking) + newRecordInfo.InitializeLockShared(); // For PostSingleWriter + if (stackCtx.recSrc.LowestReadCacheLogicalAddress == Constants.kInvalidAddress) + { + // ReadCache entries, and main-log records when there are no readcache records, are CAS'd in as the first entry in the hash chain. + success = stackCtx.hei.TryCAS(newLogicalAddress); + } + else + { + // We are doing CopyToTail; we may have a source record from either main log (Compaction) or ReadCache. + Debug.Assert(reason == WriteReason.CopyToTail || reason == WriteReason.Compaction, "Expected WriteReason.CopyToTail or .Compaction"); + success = SpliceIntoHashChainAtReadCacheBoundary(ref key, ref stackCtx, newLogicalAddress); + } + + if (success) + PostCopyToTail(ref key, ref stackCtx, ref srcRecordInfo, pendingContext.InitialEntryAddress); + else + { + stackCtx.SetNewRecordInvalid(ref newRecordInfo); + + // CAS failed, so let the user dispose similar to a deleted record, and save for retry. + fasterSession.DisposeSingleWriter(ref hlog.GetKey(newPhysicalAddress), ref input, ref value, ref hlog.GetValue(newPhysicalAddress), + ref output, ref newRecordInfo, ref upsertInfo, reason); + SaveAllocationForRetry(ref pendingContext, newLogicalAddress, newPhysicalAddress, allocatedSize); + return failStatus; + } + + // Success, and any read locks have been transferred. + pendingContext.recordInfo = newRecordInfo; + pendingContext.logicalAddress = upsertInfo.Address; + fasterSession.PostSingleWriter(ref key, ref input, ref value, ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), ref output, + ref newRecordInfo, ref upsertInfo, reason); + stackCtx.ClearNewRecord(); + return OperationStatusUtils.AdvancedOpCode(OperationStatus.SUCCESS, StatusCode.Found | StatusCode.CopiedRecord); + #endregion + } + } +} diff --git a/cs/src/core/Index/FASTER/ReadFlags.cs b/cs/src/core/Index/FASTER/ReadFlags.cs deleted file mode 100644 index dd666a9e9..000000000 --- a/cs/src/core/Index/FASTER/ReadFlags.cs +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT license. - -using System; - -namespace FASTER.core -{ - /// - /// Flags for the Read-by-address methods - /// - /// Note: must be kept in sync with corresponding PendingContext k* values - [Flags] - public enum ReadFlags - { - /// - /// Use whatever was specified at the previous level, or None if at FasterKV level. - /// - /// - /// May be combined with other flags to add options. - /// - Default = 0x0, - - /// - /// Turn off all flags from higher levels. May be combined with other flags to specify an entirely new set of flags. - /// - None = 0x1, - - /// - /// Do not update read cache with new data - /// - DisableReadCacheUpdates = 0x2, - - /// - /// Skip read cache on reads - /// - DisableReadCacheReads = 0x4, - - /// - /// Skip read cache on reads and updates - /// - DisableReadCache = 0x6, - - /// - /// Copy reads to tail of main log from both read-only region and from . - /// - /// - /// This generally should not be used for reads by address, and especially not for versioned reads, - /// because those would promote obsolete values to the tail of the log. - /// - CopyReadsToTail = 0x8, - - /// - /// Copy reads from only (do not copy from read-only region), to read cache or tail. - /// - CopyFromDeviceOnly = 0x10, - } -} \ No newline at end of file diff --git a/cs/src/core/Index/Interfaces/IFasterKV.cs b/cs/src/core/Index/Interfaces/IFasterKV.cs index 2c92b1bcb..eecf7980e 100644 --- a/cs/src/core/Index/Interfaces/IFasterKV.cs +++ b/cs/src/core/Index/Interfaces/IFasterKV.cs @@ -21,10 +21,10 @@ public interface IFasterKV : IDisposable /// Callback functions. /// Name of session (optional) /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance ClientSession> NewSession(IFunctions functions, - string sessionName = null, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default); + string sessionName = null, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default); /// /// Resume (continue) prior client session with FASTER, used during recovery from failure. @@ -33,10 +33,10 @@ ClientSessionName of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance ClientSession> ResumeSession(IFunctions functions, - string sessionName, out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default); + string sessionName, out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default); /// /// Resume (continue) prior client session with FASTER; used during recovery from failure. @@ -45,10 +45,10 @@ ClientSessionID of previous session to resume /// Prior commit point of durability for session /// Session-specific variable-length struct settings - /// ReadFlags for this session; override those specified at FasterKV level, and may be overridden on individual Read operations + /// for this session; override those specified at FasterKV level, and may be overridden on individual Read operations /// Session instance public ClientSession> ResumeSession(IFunctions functions, int sessionID, - out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadFlags readFlags = ReadFlags.Default); + out CommitPoint commitPoint, SessionVariableLengthStructSettings sessionVariableLengthStructSettings = null, ReadCopyOptions readCopyOptions = default); #endregion #region Growth and Recovery diff --git a/cs/test/AdvancedLockTests.cs b/cs/test/AdvancedLockTests.cs index a2a61c882..c121f77cb 100644 --- a/cs/test/AdvancedLockTests.cs +++ b/cs/test/AdvancedLockTests.cs @@ -5,7 +5,6 @@ using NUnit.Framework; using System; using System.Threading; -using FASTER.test.LockTable; using static FASTER.test.TestUtils; using System.Threading.Tasks; using System.Diagnostics; @@ -17,7 +16,7 @@ namespace FASTER.test.LockTests [TestFixture] internal class AdvancedLockTests { - const int numKeys = 1000; + const int numKeys = 500; const int valueAdd = 1000000; const int mod = 100; @@ -145,7 +144,8 @@ void Populate(bool evict = false) [Category(FasterKVTestCategory)] [Category(LockTestCategory)] //[Repeat(100)] - public async ValueTask SameKeyInsertAndCTTTest([Values(LockingMode.None, LockingMode.Ephemeral /* Standard will hang */)] LockingMode lockingMode) + public async ValueTask SameKeyInsertAndCTTTest([Values(LockingMode.None, LockingMode.Ephemeral /* Standard will hang */)] LockingMode lockingMode, + [Values(ReadCopyTo.ReadCache, ReadCopyTo.MainLog)] ReadCopyTo readCopyTo) { if (TestContext.CurrentContext.CurrentRepeatCount > 0) Debug.WriteLine($"*** Current test iteration: {TestContext.CurrentContext.CurrentRepeatCount + 1} ***"); @@ -171,9 +171,9 @@ await DoTwoThreadRandomKeyTest(numKeys, var sleepFlag = (iter % 5 == 0) ? LockFunctionFlags.None : LockFunctionFlags.SleepAfterEventOperation; functions.readCacheInput = new() { flags = LockFunctionFlags.SetEvent | sleepFlag, sleepRangeMs = 10 }; int output = 0; - ReadOptions readOptions = default; + ReadOptions readOptions = new() { CopyOptions = new(ReadCopyFrom.Device, readCopyTo) }; - // This will copy to ReadCache, and the test is trying to cause a race with the above Upsert. + // This will copy to ReadCache or MainLog tail, and the test is trying to cause a race with the above Upsert. var status = readSession.Read(ref key, ref functions.readCacheInput, ref output, ref readOptions, out _); // If the Upsert completed before the Read started, we may Read() the Upserted value. diff --git a/cs/test/BasicFASTERTests.cs b/cs/test/BasicFASTERTests.cs index 331fde5cb..467661ad5 100644 --- a/cs/test/BasicFASTERTests.cs +++ b/cs/test/BasicFASTERTests.cs @@ -551,11 +551,10 @@ public void ReadBareMinParams([Values] TestUtils.DeviceType deviceType) Assert.AreEqual(key1.kfield2, 14); } - // Test the ReadAtAddress where ReadFlags = ReadFlags.none [Test] [Category("FasterKV")] [Category("Smoke")] - public void ReadAtAddressReadFlagsNone() + public void ReadAtAddressDefaultOptions() { // Just functional test of ReadFlag so one device is enough deviceType = TestUtils.DeviceType.MLSD; @@ -579,8 +578,6 @@ public void ReadAtAddressReadFlagsNone() Assert.AreEqual(key1.kfield2, 14); } - // Test the ReadAtAddress where ReadFlags = ReadFlags.SkipReadCache - class SkipReadCacheFunctions : Functions { internal long expectedReadAddress; @@ -612,7 +609,7 @@ public override void ReadCompletionCallback(ref KeyStruct key, ref InputStruct i [Test] [Category("FasterKV")] [Category("Smoke")] - public void ReadAtAddressReadFlagsSkipReadCache() + public void ReadAtAddressIgnoreReadCache() { // Another ReadFlag functional test so one device is enough deviceType = TestUtils.DeviceType.MLSD; @@ -667,7 +664,7 @@ void VerifyResult() // Do not put it into the read cache. functions.expectedReadAddress = readAtAddress; - ReadOptions readOptions = new() { StartAddress = readAtAddress, ReadFlags = ReadFlags.DisableReadCacheUpdates }; + ReadOptions readOptions = new() { StartAddress = readAtAddress, CopyOptions = ReadCopyOptions.None }; status = skipReadCacheSession.Read(ref key1, ref input, ref output, ref readOptions, out _); VerifyResult(); @@ -675,7 +672,7 @@ void VerifyResult() // Put it into the read cache. functions.expectedReadAddress = readAtAddress; - readOptions.ReadFlags = ReadFlags.None; + readOptions.CopyOptions = new(ReadCopyFrom.AllImmutable, ReadCopyTo.ReadCache); status = skipReadCacheSession.Read(ref key1, ref input, ref output, ref readOptions, out _); Assert.IsTrue(status.IsPending); VerifyResult(); diff --git a/cs/test/BlittableLogCompactionTests.cs b/cs/test/BlittableLogCompactionTests.cs index ebf2c5fdd..76bd5287e 100644 --- a/cs/test/BlittableLogCompactionTests.cs +++ b/cs/test/BlittableLogCompactionTests.cs @@ -3,7 +3,9 @@ using FASTER.core; using NUnit.Framework; +using System; using System.Diagnostics; +using static FASTER.test.TestUtils; #pragma warning disable IDE0060 // Remove unused parameter == Some parameters are just to let [Setup] know what to do @@ -15,6 +17,22 @@ internal class BlittableLogCompactionTests private FasterKV fht; private IDevice log; + struct HashModuloComparer : IFasterEqualityComparer + { + readonly HashModulo modRange; + + internal HashModuloComparer(HashModulo mod) => this.modRange = mod; + + public bool Equals(ref KeyStruct k1, ref KeyStruct k2) => k1.kfield1 == k2.kfield1; + + // Force collisions to create a chain + public long GetHashCode64(ref KeyStruct k) + { + var value = Utility.GetHashCode(k.kfield1); + return this.modRange != HashModulo.NoMod ? value % (long)modRange : value; + } + } + [SetUp] public void Setup() { @@ -22,17 +40,23 @@ public void Setup() log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/BlittableLogCompactionTests.log", deleteOnClose: true); var lockingMode = LockingMode.Standard; + var hashMod = HashModulo.NoMod; foreach (var arg in TestContext.CurrentContext.Test.Arguments) { if (arg is LockingMode locking_mode) { lockingMode = locking_mode; - break; + continue; + } + if (arg is HashModulo mod) + { + hashMod = mod; + continue; } } fht = new FasterKV - (1L << 20, new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 9 }, lockingMode: lockingMode); + (1L << 20, new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 9 }, comparer: new HashModuloComparer(hashMod), lockingMode: lockingMode);; } [TearDown] @@ -45,25 +69,73 @@ public void TearDown() TestUtils.DeleteDirectory(TestUtils.MethodTestDir); } + void VerifyRead(ClientSession session, int totalRecords, Func isDeleted) + { + InputStruct input = default; + int numPending = 0; + + void drainPending() + { + Assert.IsTrue(session.CompletePendingWithOutputs(out var outputs, wait: true)); + using (outputs) + { + for ( ; outputs.Next(); --numPending) + { + if (isDeleted((int)outputs.Current.Key.kfield1)) + { + Assert.IsFalse(outputs.Current.Status.Found); + continue; + } + Assert.IsTrue(outputs.Current.Status.Found); + Assert.AreEqual(outputs.Current.Key.kfield1, outputs.Current.Output.value.vfield1); + Assert.AreEqual(outputs.Current.Key.kfield2, outputs.Current.Output.value.vfield2); + } + } + Assert.AreEqual(numPending, 0); + } + + for (int i = 0; i < totalRecords; i++) + { + OutputStruct output = default; + var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; + var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; + + var status = session.Read(ref key1, ref input, ref output, isDeleted(i) ? 1 : 0, 0); + if (!status.IsPending) + { + if (isDeleted(i)) + { + Assert.IsFalse(status.Found); + continue; + } + Assert.IsTrue(status.Found); + Assert.AreEqual(value.vfield1, output.value.vfield1); + Assert.AreEqual(value.vfield2, output.value.vfield2); + } + else if (++numPending == 256) + drainPending(); + } + + if (numPending > 0) + drainPending(); + } + [Test] [Category("FasterKV")] [Category("Compaction")] [Category("Smoke")] - public void BlittableLogCompactionTest1([Values] CompactionType compactionType, - [Values(LockingMode.Standard)] LockingMode lockingMode) + public void BlittableLogCompactionTest1([Values] CompactionType compactionType, [Values(LockingMode.Standard)] LockingMode lockingMode) { using var session = fht.For(new FunctionsCompaction()).NewSession(); - InputStruct input = default; - - const int totalRecords = 2000; + const int totalRecords = 2_000; var start = fht.Log.TailAddress; long compactUntil = 0; for (int i = 0; i < totalRecords; i++) { - if (i == 1000) + if (i == totalRecords - 1000) compactUntil = fht.Log.TailAddress; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; @@ -71,49 +143,31 @@ public void BlittableLogCompactionTest1([Values] CompactionType compactionType, session.Upsert(ref key1, ref value, 0, 0); } + fht.Log.FlushAndEvict(wait: true); compactUntil = session.Compact(compactUntil, compactionType); fht.Log.Truncate(); Assert.AreEqual(compactUntil, fht.Log.BeginAddress); - // Read 2000 keys - all should be present - for (int i = 0; i < totalRecords; i++) - { - OutputStruct output = default; - var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; - var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; - - var status = session.Read(ref key1, ref input, ref output, 0, 0); - if (status.IsPending) - { - Assert.IsTrue(session.CompletePendingWithOutputs(out var outputs, wait: true)); - (status, output) = TestUtils.GetSinglePendingResult(outputs); - } - - Assert.IsTrue(status.Found); - Assert.AreEqual(value.vfield1, output.value.vfield1); - Assert.AreEqual(value.vfield2, output.value.vfield2); - } + // Read all keys - all should be present + VerifyRead(session, totalRecords, key => false); } - [Test] [Category("FasterKV")] [Category("Compaction")] - public void BlittableLogCompactionTest2([Values] CompactionType compactionType, - [Values(LockingMode.Standard)] LockingMode lockingMode) + public void BlittableLogCompactionTest2([Values] CompactionType compactionType, [Values(LockingMode.Standard)] LockingMode lockingMode, + [Values(HashModulo.NoMod, HashModulo.Hundred)] HashModulo hashMod) { using var session = fht.For(new FunctionsCompaction()).NewSession(); - InputStruct input = default; - - const int totalRecords = 2000; + const int totalRecords = 2_000; var start = fht.Log.TailAddress; long compactUntil = 0; for (int i = 0; i < totalRecords; i++) { - if (i == 1000) + if (i == totalRecords - 1000) compactUntil = fht.Log.TailAddress; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; @@ -121,54 +175,40 @@ public void BlittableLogCompactionTest2([Values] CompactionType compactionType, session.Upsert(ref key1, ref value, 0, 0); } - // Put fresh entries for 1000 records - for (int i = 0; i < 1000; i++) + fht.Log.FlushAndEvict(true); + + // Flush, then put fresh entries for half the records to force IO. We want this to have multiple levels before Compact: + // HeadAddress + // 1. Addresses of these fresh records + // (HeadAddress after Flush) + // 2. Addresses of original records + // BeginAddress + // Without this, the Compaction logic will not caused I/O, because without Level 1, the FindTag would return an entry + // whose address pointed to the record in Level 2 (which would be Level 1 then), which means it will be caught by the + // test that the address is < minAddress, so no IO is needed. + for (int i = 0; i < totalRecords / 2; i++) { var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; session.Upsert(ref key1, ref value, 0, 0); } - fht.Log.Flush(true); - - var tail = fht.Log.TailAddress; compactUntil = session.Compact(compactUntil, compactionType); fht.Log.Truncate(); Assert.AreEqual(compactUntil, fht.Log.BeginAddress); - Assert.AreEqual(tail, fht.Log.TailAddress); - - // Read 2000 keys - all should be present - for (int i = 0; i < totalRecords; i++) - { - OutputStruct output = default; - var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; - var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; - - var status = session.Read(ref key1, ref input, ref output, 0, 0); - if (status.IsPending) - { - Assert.IsTrue(session.CompletePendingWithOutputs(out var outputs, wait: true)); - (status, output) = TestUtils.GetSinglePendingResult(outputs); - } - Assert.IsTrue(status.Found); - Assert.AreEqual(value.vfield1, output.value.vfield1); - Assert.AreEqual(value.vfield2, output.value.vfield2); - } + // Read all keys - all should be present + VerifyRead(session, totalRecords, key => false); } - [Test] [Category("FasterKV")] [Category("Compaction")] - public void BlittableLogCompactionTest3([Values] CompactionType compactionType, - [Values(LockingMode.Standard)] LockingMode lockingMode) + public void BlittableLogCompactionTest3([Values] CompactionType compactionType, [Values(LockingMode.Standard)] LockingMode lockingMode) { using var session = fht.For(new FunctionsCompaction()).NewSession(); - InputStruct input = default; - - const int totalRecords = 2000; + const int totalRecords = 2_000; var start = fht.Log.TailAddress; long compactUntil = 0; @@ -195,32 +235,7 @@ public void BlittableLogCompactionTest3([Values] CompactionType compactionType, Assert.AreEqual(compactUntil, fht.Log.BeginAddress); // Read all keys - all should be present except those we deleted - for (int i = 0; i < totalRecords; i++) - { - OutputStruct output = default; - var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; - var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; - - int ctx = ((i < 500) && (i % 2 == 0)) ? 1 : 0; - - var status = session.Read(ref key1, ref input, ref output, ctx, 0); - if (status.IsPending) - { - Assert.IsTrue(session.CompletePendingWithOutputs(out var outputs, wait: true)); - (status, output) = TestUtils.GetSinglePendingResult(outputs); - } - - if (ctx == 0) - { - Assert.IsTrue(status.Found); - Assert.AreEqual(value.vfield1, output.value.vfield1); - Assert.AreEqual(value.vfield2, output.value.vfield2); - } - else - { - Assert.IsFalse(status.Found); - } - } + VerifyRead(session, totalRecords, key => (key < totalRecords / 4) && (key % 2 == 0)); } [Test] @@ -228,9 +243,7 @@ public void BlittableLogCompactionTest3([Values] CompactionType compactionType, [Category("Compaction")] [Category("Smoke")] - public void BlittableLogCompactionCustomFunctionsTest1([Values] CompactionType compactionType, - [Values(LockingMode.Standard)] - LockingMode lockingMode) + public void BlittableLogCompactionCustomFunctionsTest1([Values] CompactionType compactionType, [Values(LockingMode.Standard)] LockingMode lockingMode) { using var session = fht.For(new FunctionsCompaction()).NewSession(); diff --git a/cs/test/DisposeTests.cs b/cs/test/DisposeTests.cs index 8abc6d8e6..6067192a1 100644 --- a/cs/test/DisposeTests.cs +++ b/cs/test/DisposeTests.cs @@ -595,9 +595,9 @@ void DoPendingReadInsertTest(ReadCopyDestination copyDest, bool initialReadCache session.CompletePending(wait: true); } - ReadOptions readOptions = default; + ReadOptions readOptions = new() { CopyFrom = ReadCopyFrom.Device }; if (copyDest == ReadCopyDestination.Tail) - readOptions.ReadFlags = ReadFlags.CopyReadsToTail; + readOptions.CopyOptions.CopyTo = ReadCopyTo.MainLog; var status = session.Read(ref key, ref input, ref output, ref readOptions, out _); Assert.IsTrue(status.IsPending, status.ToString()); session.CompletePendingWithOutputs(out var completedOutputs, wait: true); @@ -661,9 +661,9 @@ void DoRead(DisposeFunctions functions) { MyOutput output = new(); MyInput input = new(); - ReadOptions readOptions = default; + ReadOptions readOptions = new() { CopyFrom = ReadCopyFrom.Device }; if (copyDest == ReadCopyDestination.Tail) - readOptions.ReadFlags = ReadFlags.CopyReadsToTail; + readOptions.CopyOptions.CopyTo = ReadCopyTo.MainLog; using var session = fht.NewSession(functions); if (functions.isSUT) diff --git a/cs/test/EphemeralLockingTests.cs b/cs/test/EphemeralLockingTests.cs index fa4f36625..e1e286041 100644 --- a/cs/test/EphemeralLockingTests.cs +++ b/cs/test/EphemeralLockingTests.cs @@ -347,7 +347,7 @@ public void CopyToCTTTest() using var session = fht.NewSession(new SimpleFunctions()); long input = 0, output = 0, key = useExistingKey; - ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail }; + ReadOptions readOptions = new() { CopyOptions = new(ReadCopyFrom.AllImmutable, ReadCopyTo.MainLog) }; var status = session.Read(ref key, ref input, ref output, ref readOptions, out _); Assert.IsTrue(status.IsPending, status.ToString()); diff --git a/cs/test/LockableUnsafeContextTests.cs b/cs/test/LockableUnsafeContextTests.cs index 7a470a329..54470f54f 100644 --- a/cs/test/LockableUnsafeContextTests.cs +++ b/cs/test/LockableUnsafeContextTests.cs @@ -922,7 +922,7 @@ public void VerifyLocksAfterReadAndCTTTest() using var session = fht.NewSession(new SimpleFunctions()); var luContext = session.LockableUnsafeContext; long input = 0, output = 0, key = 24; - ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail}; + ReadOptions readOptions = new() { CopyOptions = new(ReadCopyFrom.AllImmutable, ReadCopyTo.MainLog) }; BucketLockTracker blt = new(); luContext.BeginUnsafe(); diff --git a/cs/test/ModifiedBitTests.cs b/cs/test/ModifiedBitTests.cs index 4a6d9b646..77237e369 100644 --- a/cs/test/ModifiedBitTests.cs +++ b/cs/test/ModifiedBitTests.cs @@ -376,7 +376,7 @@ public void CopyToTailTest() var luContext = session.LockableUnsafeContext; int input = 0, output = 0, key = 200; - ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail }; + ReadOptions readOptions = new() { CopyOptions = new(ReadCopyFrom.AllImmutable, ReadCopyTo.MainLog) }; luContext.BeginUnsafe(); luContext.BeginLockable(); diff --git a/cs/test/ReadAddressTests.cs b/cs/test/ReadAddressTests.cs index 6cfc75395..9e5f58c3a 100644 --- a/cs/test/ReadAddressTests.cs +++ b/cs/test/ReadAddressTests.cs @@ -61,7 +61,7 @@ internal class Functions : FunctionsBase { internal long lastWriteAddress = Constants.kInvalidAddress; readonly bool useReadCache; - internal ReadFlags readFlags = ReadFlags.None | ReadFlags.DisableReadCache; + internal ReadCopyOptions readCopyOptions = ReadCopyOptions.None; internal Functions() { @@ -123,7 +123,7 @@ public override void ReadCompletionCallback(ref Key key, ref Value input, ref Ou { if (status.Found) { - if (this.useReadCache && !this.readFlags.HasFlag(ReadFlags.DisableReadCacheReads)) + if (this.useReadCache && this.readCopyOptions.CopyTo == ReadCopyTo.ReadCache) Assert.AreEqual(Constants.kInvalidAddress, recordMetadata.Address, $"key {key}"); else Assert.AreEqual(output.address, recordMetadata.Address, $"key {key}"); // Should agree with what SingleWriter set @@ -146,7 +146,7 @@ private class TestStore : IDisposable internal long[] InsertAddresses = new long[numKeys]; - internal TestStore(bool useReadCache, ReadFlags readFlags, bool flush, LockingMode lockingMode) + internal TestStore(bool useReadCache, ReadCopyOptions readCopyOptions, bool flush, LockingMode lockingMode) { this.testDir = TestUtils.MethodTestDir; TestUtils.DeleteDirectory(this.testDir, wait:true); @@ -158,7 +158,7 @@ internal TestStore(bool useReadCache, ReadFlags readFlags, bool flush, LockingMo LogDevice = logDevice, ObjectLogDevice = new NullDevice(), ReadCacheSettings = useReadCache ? new ReadCacheSettings() : null, - ReadFlags = readFlags, + ReadCopyOptions = readCopyOptions, // Use small-footprint values PageSizeBits = 12, // (4K pages) MemorySizeBits = 20 // (1M memory for main log) @@ -261,14 +261,15 @@ public void Dispose() } // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, false, LockingMode.None)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.Device, ReadCopyTo.MainLog, true, true, LockingMode.Standard)] + [TestCase(UseReadCache.ReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, true, LockingMode.Ephemeral)] [Category("FasterKV"), Category("Read")] - public void VersionedReadSyncTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, [Values] LockingMode lockingMode) + public void VersionedReadSyncTests(UseReadCache urc, ReadCopyFrom readCopyFrom, ReadCopyTo readCopyTo, bool useRMW, bool flush, [Values] LockingMode lockingMode) { var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); + var readCopyOptions = new ReadCopyOptions(readCopyFrom, readCopyTo); + using var testStore = new TestStore(useReadCache, readCopyOptions, flush, lockingMode); testStore.Populate(useRMW, useAsync:false).GetAwaiter().GetResult(); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -279,7 +280,7 @@ public void VersionedReadSyncTests(UseReadCache urc, ReadFlags readFlags, bool u var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - ReadOptions readOptions = new() { ReadFlags = session.functions.readFlags }; + ReadOptions readOptions = new() { CopyOptions = session.functions.readCopyOptions }; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -299,14 +300,15 @@ public void VersionedReadSyncTests(UseReadCache urc, ReadFlags readFlags, bool u } // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, false, LockingMode.None)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.Device, ReadCopyTo.MainLog, true, true, LockingMode.Standard)] + [TestCase(UseReadCache.ReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, true, LockingMode.Ephemeral)] [Category("FasterKV"), Category("Read")] - public async Task VersionedReadAsyncTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, [Values] LockingMode lockingMode) + public async Task VersionedReadAsyncTests(UseReadCache urc, ReadCopyFrom readCopyFrom, ReadCopyTo readCopyTo, bool useRMW, bool flush, [Values] LockingMode lockingMode) { var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); + var readCopyOptions = new ReadCopyOptions(readCopyFrom, readCopyTo); + using var testStore = new TestStore(useReadCache, readCopyOptions, flush, lockingMode); await testStore.Populate(useRMW, useAsync: true); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -316,7 +318,7 @@ public async Task VersionedReadAsyncTests(UseReadCache urc, ReadFlags readFlags, var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - ReadOptions readOptions = new() { ReadFlags = session.functions.readFlags }; + ReadOptions readOptions = new() { CopyOptions = session.functions.readCopyOptions }; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -331,14 +333,15 @@ public async Task VersionedReadAsyncTests(UseReadCache urc, ReadFlags readFlags, } // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, false, LockingMode.None)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.Device, ReadCopyTo.MainLog, true, true, LockingMode.Standard)] + [TestCase(UseReadCache.ReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, true, LockingMode.Ephemeral)] [Category("FasterKV"), Category("Read")] - public void ReadAtAddressSyncTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, [Values] LockingMode lockingMode) + public void ReadAtAddressSyncTests(UseReadCache urc, ReadCopyFrom readCopyFrom, ReadCopyTo readCopyTo, bool useRMW, bool flush, [Values] LockingMode lockingMode) { var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); + var readCopyOptions = new ReadCopyOptions(readCopyFrom, readCopyTo); + using var testStore = new TestStore(useReadCache, readCopyOptions, flush, lockingMode); testStore.Populate(useRMW, useAsync: false).GetAwaiter().GetResult(); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -349,7 +352,7 @@ public void ReadAtAddressSyncTests(UseReadCache urc, ReadFlags readFlags, bool u var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - ReadOptions readOptions = new() { ReadFlags = session.functions.readFlags }; + ReadOptions readOptions = new() { CopyOptions = session.functions.readCopyOptions }; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -386,14 +389,15 @@ public void ReadAtAddressSyncTests(UseReadCache urc, ReadFlags readFlags, bool u } // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, false, LockingMode.None)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.Device, ReadCopyTo.MainLog, true, true, LockingMode.Standard)] + [TestCase(UseReadCache.ReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, true, LockingMode.Ephemeral)] [Category("FasterKV"), Category("Read")] - public async Task ReadAtAddressAsyncTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, [Values] LockingMode lockingMode) + public async Task ReadAtAddressAsyncTests(UseReadCache urc, ReadCopyFrom readCopyFrom, ReadCopyTo readCopyTo, bool useRMW, bool flush, [Values] LockingMode lockingMode) { var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); + var readCopyOptions = new ReadCopyOptions(readCopyFrom, readCopyTo); + using var testStore = new TestStore(useReadCache, readCopyOptions, flush, lockingMode); await testStore.Populate(useRMW, useAsync: true); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -403,7 +407,7 @@ public async Task ReadAtAddressAsyncTests(UseReadCache urc, ReadFlags readFlags, var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - ReadOptions readOptions = new() { ReadFlags = session.functions.readFlags }; + ReadOptions readOptions = new() { CopyOptions = session.functions.readCopyOptions }; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -431,14 +435,15 @@ public async Task ReadAtAddressAsyncTests(UseReadCache urc, ReadFlags readFlags, } // Test is similar to others but tests the Overload where RadFlag.none is set -- probably don't need all combinations of test but doesn't hurt - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, false, LockingMode.None)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.Device, ReadCopyTo.MainLog, true, true, LockingMode.Standard)] + [TestCase(UseReadCache.ReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, true, LockingMode.Ephemeral)] [Category("FasterKV"), Category("Read")] - public async Task ReadAtAddressAsyncReadFlagsNoneTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, [Values] LockingMode lockingMode) + public async Task ReadAtAddressAsyncCopyOptionsNoReadCacheTests(UseReadCache urc, ReadCopyFrom readCopyFrom, ReadCopyTo readCopyTo, bool useRMW, bool flush, [Values] LockingMode lockingMode) { var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); + var readCopyOptions = new ReadCopyOptions(readCopyFrom, readCopyTo); + using var testStore = new TestStore(useReadCache, readCopyOptions, flush, lockingMode); await testStore.Populate(useRMW, useAsync: true); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -448,7 +453,7 @@ public async Task ReadAtAddressAsyncReadFlagsNoneTests(UseReadCache urc, ReadFla var input = default(Value); var key = new Key(defaultKeyToScan); RecordMetadata recordMetadata = default; - ReadOptions readOptions = new() { ReadFlags = session.functions.readFlags }; + ReadOptions readOptions = new() { CopyOptions = session.functions.readCopyOptions }; for (int lap = maxLap - 1; /* tested in loop */; --lap) { @@ -475,61 +480,16 @@ public async Task ReadAtAddressAsyncReadFlagsNoneTests(UseReadCache urc, ReadFla } } - // Test is similar to others but tests the Overload where ReadFlag.SkipReadCache is set - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] - [Category("FasterKV"), Category("Read")] - public async Task ReadAtAddressAsyncReadFlagsSkipCacheTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, [Values] LockingMode lockingMode) - { - var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); - await testStore.Populate(useRMW, useAsync: true); - using var session = testStore.fkv.For(new Functions()).NewSession(); - - // Two iterations to ensure no issues due to read-caching or copying to tail. - for (int iteration = 0; iteration < 2; ++iteration) - { - var input = default(Value); - var key = new Key(defaultKeyToScan); - RecordMetadata recordMetadata = default; - ReadOptions readOptions = default; - - for (int lap = maxLap - 1; /* tested in loop */; --lap) - { - readOptions.ReadFlags = session.functions.readFlags; - var readAsyncResult = await session.ReadAsync(ref key, ref input, ref readOptions, default, serialNo: maxLap + 1); - var (status, output) = readAsyncResult.Complete(out recordMetadata); - - if (!testStore.ProcessChainRecord(status, recordMetadata, lap, ref output)) - break; - - if (readOptions.StartAddress >= testStore.fkv.Log.BeginAddress) - { - var saveOutput = output; - var saveRecordMetadata = recordMetadata; - - readOptions.ReadFlags = ReadFlags.None | ReadFlags.DisableReadCacheReads | ReadFlags.DisableReadCacheUpdates; - readAsyncResult = await session.ReadAtAddressAsync(ref input, ref readOptions, default, maxLap + 1); - (status, output) = readAsyncResult.Complete(out recordMetadata); - - Assert.AreEqual(saveOutput, output); - Assert.AreEqual(saveRecordMetadata.RecordInfo, recordMetadata.RecordInfo); - } - readOptions.StartAddress = recordMetadata.RecordInfo.PreviousAddress; - } - } - } - // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, false, LockingMode.None)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.Device, ReadCopyTo.MainLog, true, true, LockingMode.Standard)] + [TestCase(UseReadCache.ReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, true, LockingMode.Ephemeral)] [Category("FasterKV"), Category("Read")] - public void ReadNoKeySyncTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, [Values] LockingMode lockingMode) // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. + public void ReadNoKeySyncTests(UseReadCache urc, ReadCopyFrom readCopyFrom, ReadCopyTo readCopyTo, bool useRMW, bool flush, [Values] LockingMode lockingMode) // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. { var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); + var readCopyOptions = new ReadCopyOptions(readCopyFrom, readCopyTo); + using var testStore = new TestStore(useReadCache, readCopyOptions, flush, lockingMode); testStore.Populate(useRMW, useAsync: false).GetAwaiter().GetResult(); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -547,7 +507,7 @@ public void ReadNoKeySyncTests(UseReadCache urc, ReadFlags readFlags, bool useRM ReadOptions readOptions = new() { StartAddress = testStore.InsertAddresses[keyOrdinal], - ReadFlags = session.functions.readFlags + CopyOptions = session.functions.readCopyOptions }; var status = session.ReadAtAddress(ref input, ref output, ref readOptions, serialNo: maxLap + 1); if (status.IsPending) @@ -565,14 +525,15 @@ public void ReadNoKeySyncTests(UseReadCache urc, ReadFlags readFlags, bool useRM } // readCache and copyReadsToTail are mutually exclusive and orthogonal to populating by RMW vs. Upsert. - [TestCase(UseReadCache.NoReadCache, ReadFlags.None, false, false, LockingMode.None)] - [TestCase(UseReadCache.NoReadCache, ReadFlags.CopyReadsToTail | ReadFlags.CopyFromDeviceOnly, true, true, LockingMode.Standard)] - [TestCase(UseReadCache.ReadCache, ReadFlags.None, false, true, LockingMode.Standard)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, false, LockingMode.None)] + [TestCase(UseReadCache.NoReadCache, ReadCopyFrom.Device, ReadCopyTo.MainLog, true, true, LockingMode.Standard)] + [TestCase(UseReadCache.ReadCache, ReadCopyFrom.None, ReadCopyTo.None, false, true, LockingMode.Ephemeral)] [Category("FasterKV"), Category("Read")] - public async Task ReadNoKeyAsyncTests(UseReadCache urc, ReadFlags readFlags, bool useRMW, bool flush, LockingMode lockingMode) + public async Task ReadNoKeyAsyncTests(UseReadCache urc, ReadCopyFrom readCopyFrom, ReadCopyTo readCopyTo, bool useRMW, bool flush, LockingMode lockingMode) { var useReadCache = urc == UseReadCache.ReadCache; - using var testStore = new TestStore(useReadCache, readFlags, flush, lockingMode); + var readCopyOptions = new ReadCopyOptions(readCopyFrom, readCopyTo); + using var testStore = new TestStore(useReadCache, readCopyOptions, flush, lockingMode); await testStore.Populate(useRMW, useAsync: true); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -590,7 +551,7 @@ public async Task ReadNoKeyAsyncTests(UseReadCache urc, ReadFlags readFlags, boo ReadOptions readOptions = new() { StartAddress = testStore.InsertAddresses[keyOrdinal], - ReadFlags = session.functions.readFlags + CopyOptions = session.functions.readCopyOptions }; var readAsyncResult = await session.ReadAtAddressAsync(ref input, ref readOptions, default, serialNo: maxLap + 1); @@ -603,73 +564,73 @@ public async Task ReadNoKeyAsyncTests(UseReadCache urc, ReadFlags readFlags, boo await testStore.Flush(); } - internal struct ReadFlagsMerge + internal struct ReadCopyOptionsMerge { - internal ReadFlags Fkv, Session, Read, Expected; + internal ReadCopyOptions Fkv, Session, Read, Expected; } [Test] [Category("FasterKV"), Category("Read")] - public void ReadFlagsMergeTest() + public void ReadCopyOptionssMergeTest() { - ReadFlagsMerge[] merges = new ReadFlagsMerge[] + ReadCopyOptionsMerge[] merges = new ReadCopyOptionsMerge[] { new() { - Fkv = ReadFlags.None, - Session = ReadFlags.Default, - Read = ReadFlags.Default, - Expected = ReadFlags.Default + Fkv = ReadCopyOptions.None, + Session = default, + Read = default, + Expected = ReadCopyOptions.None }, new() { - Fkv = ReadFlags.CopyReadsToTail, - Session = ReadFlags.None, - Read = ReadFlags.Default, - Expected = ReadFlags.Default + Fkv = new(ReadCopyFrom.Device, ReadCopyTo.MainLog), + Session = default, + Read = default, + Expected = new(ReadCopyFrom.Device, ReadCopyTo.MainLog) }, new() { - Fkv = ReadFlags.CopyReadsToTail, - Session = ReadFlags.Default, - Read = ReadFlags.None, - Expected = ReadFlags.Default + Fkv = new(ReadCopyFrom.Device, ReadCopyTo.MainLog), + Session = ReadCopyOptions.None, + Read = default, + Expected = ReadCopyOptions.None }, new() { - Fkv = ReadFlags.CopyReadsToTail, - Session = ReadFlags.Default | ReadFlags.DisableReadCache, - Read = ReadFlags.Default, - Expected = ReadFlags.CopyReadsToTail | ReadFlags.DisableReadCacheReads | ReadFlags.DisableReadCacheUpdates + Fkv = new(ReadCopyFrom.Device, ReadCopyTo.MainLog), + Session = default, + Read = ReadCopyOptions.None, + Expected = ReadCopyOptions.None }, new() { - Fkv = ReadFlags.CopyReadsToTail, - Session = ReadFlags.Default, - Read = ReadFlags.Default | ReadFlags.DisableReadCacheReads, - Expected = ReadFlags.CopyReadsToTail | ReadFlags.DisableReadCacheReads + Fkv = new(ReadCopyFrom.Device, ReadCopyTo.MainLog), + Session = new(ReadCopyFrom.AllImmutable, ReadCopyTo.ReadCache), + Read = default, + Expected = new(ReadCopyFrom.AllImmutable, ReadCopyTo.ReadCache) }, new() { - Fkv = ReadFlags.CopyReadsToTail, - Session = ReadFlags.None | ReadFlags.DisableReadCacheUpdates, - Read = ReadFlags.Default, - Expected = ReadFlags.DisableReadCacheUpdates + Fkv = new(ReadCopyFrom.Device, ReadCopyTo.MainLog), + Session = default, + Read = new(ReadCopyFrom.AllImmutable, ReadCopyTo.ReadCache), + Expected = new(ReadCopyFrom.AllImmutable, ReadCopyTo.ReadCache) }, new() { - Fkv = ReadFlags.CopyReadsToTail, - Session = ReadFlags.Default, - Read = ReadFlags.None | ReadFlags.DisableReadCache, - Expected = ReadFlags.DisableReadCacheReads | ReadFlags.DisableReadCacheUpdates + Fkv = ReadCopyOptions.None, + Session = new(ReadCopyFrom.Device, ReadCopyTo.MainLog), + Read = new(ReadCopyFrom.AllImmutable, ReadCopyTo.ReadCache), + Expected = new(ReadCopyFrom.AllImmutable, ReadCopyTo.ReadCache) }, }; for (var ii = 0; ii < merges.Length; ++ii) { var merge = merges[ii]; - var flags = FasterKV.MergeReadFlags(FasterKV.MergeReadFlags(merge.Fkv, merge.Session), merge.Read); - Assert.AreEqual(merge.Expected, flags, $"iter {ii}"); + var options = ReadCopyOptions.Merge(ReadCopyOptions.Merge(merge.Fkv, merge.Session), merge.Read); + Assert.AreEqual(merge.Expected, options, $"iter {ii}"); } } } diff --git a/cs/test/ReadCacheChainTests.cs b/cs/test/ReadCacheChainTests.cs index eec111ea2..da7b61dd4 100644 --- a/cs/test/ReadCacheChainTests.cs +++ b/cs/test/ReadCacheChainTests.cs @@ -436,7 +436,7 @@ public void SpliceInFromCTTTest([Values] LockingMode lockingMode) using var session = fht.NewSession(new SimpleFunctions()); long input = 0, output = 0, key = lowChainKey - mod; // key must be in evicted region for this test - ReadOptions readOptions = new() { ReadFlags = ReadFlags.CopyReadsToTail }; + ReadOptions readOptions = new() { CopyOptions = new(ReadCopyFrom.AllImmutable, ReadCopyTo.MainLog) }; var status = session.Read(ref key, ref input, ref output, ref readOptions, out _); Assert.IsTrue(status.IsPending, status.ToString()); @@ -632,9 +632,9 @@ class LongStressChainTests struct LongComparerModulo : IFasterEqualityComparer { - readonly ModuloRange modRange; + readonly HashModulo modRange; - internal LongComparerModulo(ModuloRange mod) => this.modRange = mod; + internal LongComparerModulo(HashModulo mod) => this.modRange = mod; public bool Equals(ref long k1, ref long k2) => k1 == k2; @@ -642,7 +642,7 @@ struct LongComparerModulo : IFasterEqualityComparer public long GetHashCode64(ref long k) { long value = Utility.GetHashCode(k); - return this.modRange != ModuloRange.None ? value % (long)modRange : value; + return this.modRange != HashModulo.NoMod ? value % (long)modRange : value; } } @@ -666,10 +666,10 @@ public void Setup() ReadCacheSettings readCacheSettings = new() { MemorySizeBits = 15, PageSizeBits = 9 }; var logSettings = new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 10, ReadCacheSettings = readCacheSettings }; - ModuloRange modRange = ModuloRange.None; + HashModulo modRange = HashModulo.NoMod; foreach (var arg in TestContext.CurrentContext.Test.Arguments) { - if (arg is ModuloRange cr) + if (arg is HashModulo cr) { modRange = cr; continue; @@ -728,8 +728,6 @@ public override bool InitialUpdater(ref long key, ref long input, ref long value } } - public enum ModuloRange { Hundred = 100, Thousand = 1000, None = int.MaxValue } - unsafe void PopulateAndEvict() { using var session = fht.NewSession(new SimpleFunctions()); @@ -751,7 +749,7 @@ unsafe void PopulateAndEvict() [Category(StressTestCategory)] //[Repeat(300)] #pragma warning disable IDE0060 // Remove unused parameter (modRange is used by Setup()) - public void LongRcMultiThreadTest([Values] ModuloRange modRange, [Values(0, 1, 2, 8)] int numReadThreads, [Values(0, 1, 2, 8)] int numWriteThreads, + public void LongRcMultiThreadTest([Values] HashModulo modRange, [Values(0, 1, 2, 8)] int numReadThreads, [Values(0, 1, 2, 8)] int numWriteThreads, [Values(UpdateOp.Upsert, UpdateOp.RMW)] UpdateOp updateOp, #if WINDOWS [Values(DeviceType.LSD @@ -848,9 +846,9 @@ class SpanByteStressChainTests struct SpanByteComparerModulo : IFasterEqualityComparer { - readonly ModuloRange modRange; + readonly HashModulo modRange; - internal SpanByteComparerModulo(ModuloRange mod) => this.modRange = mod; + internal SpanByteComparerModulo(HashModulo mod) => this.modRange = mod; public bool Equals(ref SpanByte k1, ref SpanByte k2) => SpanByteComparer.StaticEquals(ref k1, ref k2); @@ -858,7 +856,7 @@ struct SpanByteComparerModulo : IFasterEqualityComparer public long GetHashCode64(ref SpanByte k) { var value = SpanByteComparer.StaticGetHashCode64(ref k); - return this.modRange != ModuloRange.None ? value % (long)modRange : value; + return this.modRange != HashModulo.NoMod ? value % (long)modRange : value; } } @@ -882,10 +880,10 @@ public void Setup() var readCacheSettings = new ReadCacheSettings { MemorySizeBits = 15, PageSizeBits = 9 }; var logSettings = new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 10, ReadCacheSettings = readCacheSettings }; - ModuloRange modRange = ModuloRange.None; + HashModulo modRange = HashModulo.NoMod; foreach (var arg in TestContext.CurrentContext.Test.Arguments) { - if (arg is ModuloRange cr) + if (arg is HashModulo cr) { modRange = cr; continue; @@ -949,8 +947,6 @@ public override bool InitialUpdater(ref SpanByte key, ref SpanByte input, ref Sp } } - public enum ModuloRange { Hundred = 100, Thousand = 1000, None = int.MaxValue } - unsafe void PopulateAndEvict() { using var session = fht.NewSession(new SpanByteFunctions()); @@ -980,7 +976,7 @@ static void ClearCountsOnError(ClientSession $"kfield1 {kfield1}, kfield2 {kfield2}"; } public struct ValueStruct { public long vfield1; public long vfield2; + public override string ToString() => $"vfield1 {vfield1}, vfield2 {vfield2}"; } public struct InputStruct { public long ifield1; public long ifield2; + public override string ToString() => $"ifield1 {ifield1}, ifield2 {ifield2}"; } public struct OutputStruct @@ -49,6 +48,7 @@ public struct ContextStruct { public long cfield1; public long cfield2; + public override string ToString() => $"cfield1 {cfield1}, cfield2 {cfield2}"; } public class Functions : FunctionsWithContext diff --git a/cs/test/TestUtils.cs b/cs/test/TestUtils.cs index 36190a57c..ee06b7930 100644 --- a/cs/test/TestUtils.cs +++ b/cs/test/TestUtils.cs @@ -204,6 +204,8 @@ public enum BatchMode { Batch, NoBatch }; public enum UpdateOp { Upsert, RMW, Delete } + public enum HashModulo { NoMod = 0, Hundred = 100, Thousand = 1000 } + internal static (Status status, TOutput output) GetSinglePendingResult(CompletedOutputIterator completedOutputs) => GetSinglePendingResult(completedOutputs, out _); diff --git a/cs/test/UnsafeContextTests.cs b/cs/test/UnsafeContextTests.cs index 920492cee..b07e2c14d 100644 --- a/cs/test/UnsafeContextTests.cs +++ b/cs/test/UnsafeContextTests.cs @@ -672,41 +672,5 @@ public void ReadBareMinParams([Values] DeviceType deviceType) uContext.EndUnsafe(); } } - - // Test the ReadAtAddress where ReadFlags = ReadFlags.none - [Test] - [Category("FasterKV")] - [Category("Smoke")] - public void ReadAtAddressReadFlagsNone() - { - // Just functional test of ReadFlag so one device is enough - deviceType = DeviceType.MLSD; - - Setup(128, new LogSettings { MemorySizeBits = 29 }, deviceType); - uContext.BeginUnsafe(); - - try - { - InputStruct input = default; - OutputStruct output = default; - - var key1 = new KeyStruct { kfield1 = 13, kfield2 = 14 }; - var value = new ValueStruct { vfield1 = 23, vfield2 = 24 }; - ReadOptions readOptions = new() { StartAddress = fht.Log.BeginAddress }; - - uContext.Upsert(ref key1, ref value, Empty.Default, 0); - var status = uContext.ReadAtAddress(ref input, ref output, ref readOptions, Empty.Default, 0); - AssertCompleted(new(StatusCode.Found), status); - - Assert.AreEqual(value.vfield1, output.value.vfield1); - Assert.AreEqual(value.vfield2, output.value.vfield2); - Assert.AreEqual(key1.kfield1, 13); - Assert.AreEqual(key1.kfield2, 14); - } - finally - { - uContext.EndUnsafe(); - } - } - } + } } \ No newline at end of file