From 54b92c990bc864b0c210c66ace78ac39c1bb0fc5 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 8 Apr 2020 20:52:19 -0700 Subject: [PATCH] Fix latching logic to work uniformly for normal and relaxed CPR. (#260) Fix latching logic to work uniformly for normal and relaxed CPR. Eliminated code redundancy for continue RMW. --- cs/src/core/ClientSession/FASTERAsync.cs | 10 +- cs/src/core/Index/Common/Contexts.cs | 3 +- cs/src/core/Index/FASTER/FASTERImpl.cs | 1180 +++++++++------------- cs/src/core/Index/FASTER/FASTERThread.cs | 172 +--- 4 files changed, 524 insertions(+), 841 deletions(-) diff --git a/cs/src/core/ClientSession/FASTERAsync.cs b/cs/src/core/ClientSession/FASTERAsync.cs index ee7a168fc..2f5c5a02f 100644 --- a/cs/src/core/ClientSession/FASTERAsync.cs +++ b/cs/src/core/ClientSession/FASTERAsync.cs @@ -47,12 +47,12 @@ internal async ValueTask CompletePendingAsync(ClientSession 0) { - CompleteRetryRequests(clientSession.ctx.prevCtx, clientSession.ctx, clientSession); + InternalCompleteRetryRequests(clientSession.ctx.prevCtx, clientSession.ctx, clientSession); } done &= (clientSession.ctx.prevCtx.HasNoPendingRequests); @@ -60,8 +60,8 @@ internal async ValueTask CompletePendingAsync(ClientSession key; internal IHeapContainer value; internal Input input; internal Output output; internal Context userContext; + // Some additional information about the previous attempt internal long id; internal int version; internal long logicalAddress; internal long serialNum; internal HashBucketEntry entry; + internal LatchOperation heldLatch; public void Dispose() { diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 2bd7f6858..97b8e90e4 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -20,11 +20,11 @@ public unsafe partial class FasterKV { - enum LatchOperation : byte + internal enum LatchOperation : byte { None, - ReleaseShared, - ReleaseExclusive + Shared, + Exclusive } #region Read Operation @@ -73,6 +73,7 @@ internal OperationStatus InternalRead( var slot = default(int); var physicalAddress = default(long); var latestRecordVersion = -1; + var heldOperation = LatchOperation.None; var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); @@ -173,10 +174,22 @@ internal OperationStatus InternalRead( if (sessionCtx.phase == Phase.PREPARE) { - if (!HashBucket.TryAcquireSharedLatch(bucket)) + Debug.Assert(heldOperation != LatchOperation.Exclusive); + if (heldOperation == LatchOperation.Shared || HashBucket.TryAcquireSharedLatch(bucket)) + { + heldOperation = LatchOperation.Shared; + } + else { status = OperationStatus.CPR_SHIFT_DETECTED; } + + if (RelaxedCPR) // don't hold on to shared latched during IO + { + if (heldOperation == LatchOperation.Shared) + HashBucket.ReleaseSharedLatch(bucket); + heldOperation = LatchOperation.None; + } } goto CreatePendingContext; @@ -193,7 +206,6 @@ internal OperationStatus InternalRead( #region Create pending context CreatePendingContext: { - pendingContext.type = OperationType.READ; pendingContext.key = hlog.GetKeyContainer(ref key); pendingContext.input = input; @@ -203,165 +215,12 @@ internal OperationStatus InternalRead( pendingContext.logicalAddress = logicalAddress; pendingContext.version = sessionCtx.version; pendingContext.serialNum = lsn; + pendingContext.heldLatch = heldOperation; } #endregion return status; } - - /// - /// Continue a pending read operation. Computes 'output' from 'input' and value corresponding to 'key' - /// obtained from disk. Optionally, it copies the value to tail to serve future read/write requests quickly. - /// - /// The thread (or session) context to execute operation in. - /// Async response from disk. - /// Pending context corresponding to operation. - /// - /// - /// - /// - /// Value - /// Description - /// - /// - /// SUCCESS - /// The output has been computed and stored in 'output'. - /// - /// - /// - internal OperationStatus InternalContinuePendingRead( - FasterExecutionContext ctx, - AsyncIOContext request, - ref PendingContext pendingContext, FasterExecutionContext currentCtx) - { - Debug.Assert(RelaxedCPR || pendingContext.version == ctx.version); - - if (request.logicalAddress >= hlog.BeginAddress) - { - Debug.Assert(hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Version <= ctx.version); - - if (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone) - return OperationStatus.NOTFOUND; - - functions.SingleReader(ref pendingContext.key.Get(), ref pendingContext.input, - ref hlog.GetContextRecordValue(ref request), ref pendingContext.output); - - if (CopyReadsToTail || UseReadCache) - { - InternalContinuePendingReadCopyToTail(ctx, request, ref pendingContext, currentCtx); - } - } - else - return OperationStatus.NOTFOUND; - - return OperationStatus.SUCCESS; - } - - /// - /// Copies the record read from disk to tail of the HybridLog. - /// - /// The thread(or session) context to execute operation in. - /// Async response from disk. - /// Pending context corresponding to operation. - /// - internal void InternalContinuePendingReadCopyToTail( - FasterExecutionContext opCtx, - AsyncIOContext request, - ref PendingContext pendingContext, FasterExecutionContext currentCtx) - { - Debug.Assert(RelaxedCPR || pendingContext.version == opCtx.version); - - var recordSize = default(int); - var bucket = default(HashBucket*); - var slot = default(int); - var logicalAddress = Constants.kInvalidAddress; - var physicalAddress = default(long); - var latestRecordVersion = default(int); - - var hash = comparer.GetHashCode64(ref pendingContext.key.Get()); - var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - - #region Trace back record in in-memory HybridLog - var entry = default(HashBucketEntry); - FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress); - logicalAddress = entry.word & Constants.kAddressMask; - - if (UseReadCache) - SkipReadCache(ref logicalAddress, ref latestRecordVersion); - var latestLogicalAddress = logicalAddress; - - if (logicalAddress >= hlog.HeadAddress) - { - physicalAddress = hlog.GetPhysicalAddress(logicalAddress); - if (!comparer.Equals(ref pendingContext.key.Get(), ref hlog.GetKey(physicalAddress))) - { - logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; - TraceBackForKeyMatch(ref pendingContext.key.Get(), - logicalAddress, - hlog.HeadAddress, - out logicalAddress, - out physicalAddress); - } - } - #endregion - - if (logicalAddress > pendingContext.entry.Address) - { - // Give up early - return; - } - - #region Create new copy in mutable region - physicalAddress = (long)request.record.GetValidPointer(); - recordSize = hlog.GetRecordSize(physicalAddress); - - long newLogicalAddress, newPhysicalAddress; - if (UseReadCache) - { - BlockAllocateReadCache(recordSize, out newLogicalAddress, currentCtx); - newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress); - RecordInfo.WriteInfo(ref readcache.GetInfo(newPhysicalAddress), opCtx.version, - true, false, false, - entry.Address); - readcache.ShallowCopy(ref pendingContext.key.Get(), ref readcache.GetKey(newPhysicalAddress)); - functions.SingleWriter(ref pendingContext.key.Get(), - ref hlog.GetContextRecordValue(ref request), - ref readcache.GetValue(newPhysicalAddress)); - } - else - { - BlockAllocate(recordSize, out newLogicalAddress, currentCtx); - newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); - RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version, - true, false, false, - latestLogicalAddress); - hlog.ShallowCopy(ref pendingContext.key.Get(), ref hlog.GetKey(newPhysicalAddress)); - functions.SingleWriter(ref pendingContext.key.Get(), - ref hlog.GetContextRecordValue(ref request), - ref hlog.GetValue(newPhysicalAddress)); - } - - - var updatedEntry = default(HashBucketEntry); - updatedEntry.Tag = tag; - updatedEntry.Address = newLogicalAddress & Constants.kAddressMask; - updatedEntry.Pending = entry.Pending; - updatedEntry.Tentative = false; - updatedEntry.ReadCache = UseReadCache; - - var foundEntry = default(HashBucketEntry); - foundEntry.word = Interlocked.CompareExchange( - ref bucket->bucket_entries[slot], - updatedEntry.word, - entry.word); - if (foundEntry.word != entry.word) - { - if (!UseReadCache) hlog.GetInfo(newPhysicalAddress).Invalid = true; - // We don't retry, just give up - } - #endregion - } - #endregion #region Upsert Operation @@ -408,7 +267,6 @@ internal OperationStatus InternalUpsert( var logicalAddress = Constants.kInvalidAddress; var physicalAddress = default(long); var latchOperation = default(LatchOperation); - var version = default(int); var latestRecordVersion = -1; var hash = comparer.GetHashCode64(ref key); @@ -459,12 +317,11 @@ internal OperationStatus InternalUpsert( { case Phase.PREPARE: { - version = sessionCtx.version; if (HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) - latchOperation = LatchOperation.ReleaseShared; - if (latestRecordVersion != -1 && latestRecordVersion > version) + latchOperation = LatchOperation.Shared; + if (latestRecordVersion != -1 && latestRecordVersion > sessionCtx.version) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreatePendingContext; // Pivot Thread @@ -479,13 +336,12 @@ internal OperationStatus InternalUpsert( } case Phase.IN_PROGRESS: { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) + if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version) { if (HashBucket.TryAcquireExclusiveLatch(bucket)) { // Set to release exclusive latch (default) - latchOperation = LatchOperation.ReleaseExclusive; + latchOperation = LatchOperation.Exclusive; goto CreateNewRecord; // Create a (v+1) record } else @@ -498,8 +354,7 @@ internal OperationStatus InternalUpsert( } case Phase.WAIT_PENDING: { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) + if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version) { if (HashBucket.NoSharedLatches(bucket)) { @@ -515,8 +370,7 @@ internal OperationStatus InternalUpsert( } case Phase.WAIT_FLUSH: { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) + if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version) { goto CreateNewRecord; // Create a (v+1) record } @@ -604,10 +458,10 @@ internal OperationStatus InternalUpsert( { switch (latchOperation) { - case LatchOperation.ReleaseShared: + case LatchOperation.Shared: HashBucket.ReleaseSharedLatch(bucket); break; - case LatchOperation.ReleaseExclusive: + case LatchOperation.Exclusive: HashBucket.ReleaseExclusiveLatch(bucket); break; default: @@ -676,10 +530,10 @@ internal OperationStatus InternalRMW( var slot = default(int); var logicalAddress = Constants.kInvalidAddress; var physicalAddress = default(long); - var version = default(int); var latestRecordVersion = -1; var status = default(OperationStatus); var latchOperation = LatchOperation.None; + var heldOperation = LatchOperation.None; var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); @@ -729,12 +583,12 @@ internal OperationStatus InternalRMW( { case Phase.PREPARE: { - version = sessionCtx.version; - if (HashBucket.TryAcquireSharedLatch(bucket)) + Debug.Assert(pendingContext.heldLatch != LatchOperation.Exclusive); + if (pendingContext.heldLatch == LatchOperation.Shared || HashBucket.TryAcquireSharedLatch(bucket)) { // Set to release shared latch (default) - latchOperation = LatchOperation.ReleaseShared; - if (latestRecordVersion != -1 && latestRecordVersion > version) + latchOperation = LatchOperation.Shared; + if (latestRecordVersion != -1 && latestRecordVersion > sessionCtx.version) { status = OperationStatus.CPR_SHIFT_DETECTED; goto CreateFailureContext; // Pivot Thread @@ -749,13 +603,13 @@ internal OperationStatus InternalRMW( } case Phase.IN_PROGRESS: { - version = (sessionCtx.version - 1); - if (latestRecordVersion <= version) + if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version) { - if (HashBucket.TryAcquireExclusiveLatch(bucket)) + Debug.Assert(pendingContext.heldLatch != LatchOperation.Shared); + if (pendingContext.heldLatch == LatchOperation.Exclusive || HashBucket.TryAcquireExclusiveLatch(bucket)) { // Set to release exclusive latch (default) - latchOperation = LatchOperation.ReleaseExclusive; + latchOperation = LatchOperation.Exclusive; goto CreateNewRecord; // Create a (v+1) record } else @@ -768,8 +622,7 @@ internal OperationStatus InternalRMW( } case Phase.WAIT_PENDING: { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) + if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version) { if (HashBucket.NoSharedLatches(bucket)) { @@ -785,8 +638,7 @@ internal OperationStatus InternalRMW( } case Phase.WAIT_FLUSH: { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) + if (latestRecordVersion != -1 && latestRecordVersion < sessionCtx.version) { goto CreateNewRecord; // Create a (v+1) record } @@ -821,10 +673,15 @@ internal OperationStatus InternalRMW( else if (logicalAddress >= hlog.SafeReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone) { status = OperationStatus.RETRY_LATER; - // Retain the shared latch (if acquired) - if (latchOperation == LatchOperation.ReleaseShared) + // Do not retain latch for pendings ops in relaxed CPR + if (!RelaxedCPR) { - latchOperation = LatchOperation.None; + // Retain the shared latch (if acquired) + if (latchOperation == LatchOperation.Shared) + { + heldOperation = latchOperation; + latchOperation = LatchOperation.None; + } } goto CreateFailureContext; // Go pending } @@ -839,10 +696,15 @@ internal OperationStatus InternalRMW( else if (logicalAddress >= hlog.BeginAddress) { status = OperationStatus.RECORD_ON_DISK; - // Retain the shared latch (if acquired) - if (latchOperation == LatchOperation.ReleaseShared) + // Do not retain latch for pendings ops in relaxed CPR + if (!RelaxedCPR) { - latchOperation = LatchOperation.None; + // Retain the shared latch (if acquired) + if (latchOperation == LatchOperation.Shared) + { + heldOperation = latchOperation; + latchOperation = LatchOperation.None; + } } goto CreateFailureContext; // Go pending } @@ -912,7 +774,7 @@ ref hlog.GetValue(physicalAddress), } else { - // ah, CAS failed + // CAS failed hlog.GetInfo(newPhysicalAddress).Invalid = true; status = OperationStatus.RETRY_NOW; goto LatchRelease; @@ -931,6 +793,7 @@ ref hlog.GetValue(physicalAddress), pendingContext.logicalAddress = logicalAddress; pendingContext.version = sessionCtx.version; pendingContext.serialNum = lsn; + pendingContext.heldLatch = heldOperation; } #endregion @@ -939,10 +802,10 @@ ref hlog.GetValue(physicalAddress), { switch (latchOperation) { - case LatchOperation.ReleaseShared: + case LatchOperation.Shared: HashBucket.ReleaseSharedLatch(bucket); break; - case LatchOperation.ReleaseExclusive: + case LatchOperation.Exclusive: HashBucket.ReleaseExclusiveLatch(bucket); break; default: @@ -961,12 +824,19 @@ ref hlog.GetValue(physicalAddress), } } + #endregion + + #region Delete Operation + /// - /// Retries a pending RMW operation. + /// Delete operation. Replaces the value corresponding to 'key' with tombstone. + /// If at head, tries to remove item from hash chain /// - /// Thread (or session) context under which operation must be executed. - /// Internal context of the RMW operation. + /// Key of the record to be deleted. + /// User context for the operation, in case it goes pending. + /// Pending context used internally to store the context of the operation. /// Session context + /// Operation serial number /// /// /// @@ -975,32 +845,32 @@ ref hlog.GetValue(physicalAddress), /// /// /// SUCCESS - /// The value has been successfully updated(or inserted). + /// The value has been successfully deleted /// /// - /// RECORD_ON_DISK - /// The record corresponding to 'key' is on disk. Issue async IO to retrieve record and retry later. + /// RETRY_LATER + /// Cannot be processed immediately due to system state. Add to pending list and retry later /// /// - /// RETRY_LATER - /// Cannot be processed immediately due to system state. Add to pending list and retry later. + /// CPR_SHIFT_DETECTED + /// A shift in version has been detected. Synchronize immediately to avoid violating CPR consistency. /// /// /// - internal OperationStatus InternalRetryPendingRMW( - FasterExecutionContext opCtx, - ref PendingContext pendingContext, FasterExecutionContext sessionCtx) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal OperationStatus InternalDelete( + ref Key key, + ref Context userContext, + ref PendingContext pendingContext, FasterExecutionContext sessionCtx, long lsn) { - var recordSize = default(int); + var status = default(OperationStatus); var bucket = default(HashBucket*); var slot = default(int); var logicalAddress = Constants.kInvalidAddress; var physicalAddress = default(long); + var latchOperation = default(LatchOperation); var version = default(int); var latestRecordVersion = -1; - var status = default(OperationStatus); - var latchOperation = LatchOperation.None; - ref Key key = ref pendingContext.key.Get(); var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); @@ -1010,15 +880,17 @@ internal OperationStatus InternalRetryPendingRMW( #region Trace back for record in in-memory HybridLog var entry = default(HashBucketEntry); - FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress); + var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); + if (!tagExists) + return OperationStatus.NOTFOUND; + logicalAddress = entry.Address; - // For simplicity, we don't let RMW operations use read cache if (UseReadCache) - SkipReadCache(ref logicalAddress, ref latestRecordVersion); + SkipAndInvalidateReadCache(ref logicalAddress, ref latestRecordVersion, ref key); var latestLogicalAddress = logicalAddress; - if (logicalAddress >= hlog.HeadAddress) + if (logicalAddress >= hlog.ReadOnlyAddress) { physicalAddress = hlog.GetPhysicalAddress(logicalAddress); if (latestRecordVersion == -1) @@ -1026,153 +898,170 @@ internal OperationStatus InternalRetryPendingRMW( if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress))) { logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; - TraceBackForKeyMatch(ref key, logicalAddress, - hlog.HeadAddress, - out logicalAddress, - out physicalAddress); + TraceBackForKeyMatch(ref key, + logicalAddress, + hlog.ReadOnlyAddress, + out logicalAddress, + out physicalAddress); } } #endregion + // NO optimization for most common case + //if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress) + //{ + // hlog.GetInfo(physicalAddress).Tombstone = true; + // return OperationStatus.SUCCESS; + //} + #region Entry latch operation if (sessionCtx.phase != Phase.REST) { - if (!((opCtx.version < sessionCtx.version) - || - (sessionCtx.phase == Phase.PREPARE))) + switch (sessionCtx.phase) { - // Processing a pending (v+1) request - version = (sessionCtx.version - 1); - switch (sessionCtx.phase) - { - case Phase.IN_PROGRESS: + case Phase.PREPARE: + { + version = sessionCtx.version; + if (HashBucket.TryAcquireSharedLatch(bucket)) { - if (latestRecordVersion != -1 && latestRecordVersion <= version) + // Set to release shared latch (default) + latchOperation = LatchOperation.Shared; + if (latestRecordVersion != -1 && latestRecordVersion > version) { - if (HashBucket.TryAcquireExclusiveLatch(bucket)) - { - // Set to release exclusive latch (default) - latchOperation = LatchOperation.ReleaseExclusive; - goto CreateNewRecord; // Create a (v+1) record - } - else - { - status = OperationStatus.RETRY_LATER; - goto UpdateFailureContext; // Go Pending - } + status = OperationStatus.CPR_SHIFT_DETECTED; + goto CreatePendingContext; // Pivot Thread } break; // Normal Processing } - case Phase.WAIT_PENDING: + else { - if (latestRecordVersion != -1 && latestRecordVersion <= version) + status = OperationStatus.CPR_SHIFT_DETECTED; + goto CreatePendingContext; // Pivot Thread + } + } + case Phase.IN_PROGRESS: + { + version = (sessionCtx.version - 1); + if (latestRecordVersion != -1 && latestRecordVersion <= version) + { + if (HashBucket.TryAcquireExclusiveLatch(bucket)) { - if (HashBucket.NoSharedLatches(bucket)) - { - goto CreateNewRecord; // Create a (v+1) record - } - else - { - status = OperationStatus.RETRY_LATER; - goto UpdateFailureContext; // Go Pending - } + // Set to release exclusive latch (default) + latchOperation = LatchOperation.Exclusive; + goto CreateNewRecord; // Create a (v+1) record + } + else + { + status = OperationStatus.RETRY_LATER; + goto CreatePendingContext; // Go Pending } - break; // Normal Processing } - case Phase.WAIT_FLUSH: + break; // Normal Processing + } + case Phase.WAIT_PENDING: + { + version = (sessionCtx.version - 1); + if (latestRecordVersion != -1 && latestRecordVersion <= version) { - if (latestRecordVersion != -1 && latestRecordVersion <= version) + if (HashBucket.NoSharedLatches(bucket)) { goto CreateNewRecord; // Create a (v+1) record } - break; // Normal Processing + else + { + status = OperationStatus.RETRY_LATER; + goto CreatePendingContext; // Go Pending + } } - default: - break; - } + break; // Normal Processing + } + case Phase.WAIT_FLUSH: + { + version = (sessionCtx.version - 1); + if (latestRecordVersion != -1 && latestRecordVersion <= version) + { + goto CreateNewRecord; // Create a (v+1) record + } + break; // Normal Processing + } + default: + break; } } #endregion + Debug.Assert(latestRecordVersion <= sessionCtx.version); + #region Normal processing - // Mutable Region: Update the record in-place + // Record is in memory, try to update hash chain and completely elide record + // only if previous address points to invalid address if (logicalAddress >= hlog.ReadOnlyAddress) { - if (FoldOverSnapshot) + if (entry.Address == logicalAddress && hlog.GetInfo(physicalAddress).PreviousAddress < hlog.BeginAddress) { - Debug.Assert(hlog.GetInfo(physicalAddress).Version == sessionCtx.version); - } + var updatedEntry = default(HashBucketEntry); + updatedEntry.Tag = 0; + if (hlog.GetInfo(physicalAddress).PreviousAddress == Constants.kTempInvalidAddress) + updatedEntry.Address = Constants.kInvalidAddress; + else + updatedEntry.Address = hlog.GetInfo(physicalAddress).PreviousAddress; + updatedEntry.Pending = entry.Pending; + updatedEntry.Tentative = false; - if (functions.InPlaceUpdater(ref key, ref pendingContext.input, ref hlog.GetValue(physicalAddress))) - { - status = OperationStatus.SUCCESS; - goto LatchRelease; - } - } + if (entry.word == Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word)) + { + // Apply tombstone bit to the record + hlog.GetInfo(physicalAddress).Tombstone = true; - // Fuzzy Region: Must go pending due to lost-update anomaly - else if (logicalAddress >= hlog.SafeReadOnlyAddress) - { - status = OperationStatus.RETRY_LATER; - goto UpdateFailureContext; // Go pending - } + if (WriteDefaultOnDelete) + { + // Write default value + // Ignore return value, the record is already marked + Value v = default; + functions.ConcurrentWriter(ref hlog.GetKey(physicalAddress), ref v, ref hlog.GetValue(physicalAddress)); + } - // Safe Read-Only Region: Create a record in the mutable region - else if (logicalAddress >= hlog.HeadAddress) - { - goto CreateNewRecord; + status = OperationStatus.SUCCESS; + goto LatchRelease; // Release shared latch (if acquired) + } + } } - // Disk Region: Need to issue async io requests - else if (logicalAddress >= hlog.BeginAddress) + // Mutable Region: Update the record in-place + if (logicalAddress >= hlog.ReadOnlyAddress) { - status = OperationStatus.RECORD_ON_DISK; - goto UpdateFailureContext; // Go pending - } + hlog.GetInfo(physicalAddress).Tombstone = true; - // No record exists - create new - else - { - goto CreateNewRecord; + if (WriteDefaultOnDelete) + { + // Write default value + // Ignore return value, the record is already marked + Value v = default; + functions.ConcurrentWriter(ref hlog.GetKey(physicalAddress), ref v, ref hlog.GetValue(physicalAddress)); + } + + status = OperationStatus.SUCCESS; + goto LatchRelease; // Release shared latch (if acquired) } + // All other regions: Create a record in the mutable region #endregion - #region Create new record in mutable region + #region Create new record in the mutable region CreateNewRecord: { - recordSize = (logicalAddress < hlog.BeginAddress) ? - hlog.GetInitialRecordSize(ref key, ref pendingContext.input) : - hlog.GetRecordSize(physicalAddress); + var value = default(Value); + // Immutable region or new record + // Allocate default record size for tombstone + var recordSize = hlog.GetRecordSize(ref key, ref value); BlockAllocate(recordSize, out long newLogicalAddress, sessionCtx); var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); - RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), pendingContext.version, - true, false, false, + RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), + sessionCtx.version, + true, true, false, latestLogicalAddress); hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); - if (logicalAddress < hlog.BeginAddress) - { - functions.InitialUpdater(ref key, - ref pendingContext.input, - ref hlog.GetValue(newPhysicalAddress)); - status = OperationStatus.NOTFOUND; - } - else if (logicalAddress >= hlog.HeadAddress) - { - functions.CopyUpdater(ref key, - ref pendingContext.input, - ref hlog.GetValue(physicalAddress), - ref hlog.GetValue(newPhysicalAddress)); - status = OperationStatus.SUCCESS; - } - else - { - // record slipped onto disk - hlog.GetInfo(newPhysicalAddress).Invalid = true; - status = OperationStatus.RETRY_NOW; - goto LatchRelease; - } var updatedEntry = default(HashBucketEntry); updatedEntry.Tag = tag; @@ -1187,11 +1076,11 @@ ref hlog.GetValue(physicalAddress), if (foundEntry.word == entry.word) { + status = OperationStatus.SUCCESS; goto LatchRelease; } else { - // ah, CAS failed hlog.GetInfo(newPhysicalAddress).Invalid = true; status = OperationStatus.RETRY_NOW; goto LatchRelease; @@ -1199,11 +1088,16 @@ ref hlog.GetValue(physicalAddress), } #endregion - #region Update failure context - UpdateFailureContext: + #region Create pending context + CreatePendingContext: { + pendingContext.type = OperationType.DELETE; + pendingContext.key = hlog.GetKeyContainer(ref key); + pendingContext.userContext = userContext; pendingContext.entry.word = entry.word; pendingContext.logicalAddress = logicalAddress; + pendingContext.version = sessionCtx.version; + pendingContext.serialNum = lsn; } #endregion @@ -1212,11 +1106,12 @@ ref hlog.GetValue(physicalAddress), { switch (latchOperation) { - case LatchOperation.ReleaseExclusive: + case LatchOperation.Shared: + HashBucket.ReleaseSharedLatch(bucket); + break; + case LatchOperation.Exclusive: HashBucket.ReleaseExclusiveLatch(bucket); break; - case LatchOperation.ReleaseShared: - throw new FasterException("Should not release shared latch here!"); default: break; } @@ -1225,7 +1120,7 @@ ref hlog.GetValue(physicalAddress), if (status == OperationStatus.RETRY_NOW) { - return InternalRetryPendingRMW(opCtx, ref pendingContext, sessionCtx); + return InternalDelete(ref key, ref userContext, ref pendingContext, sessionCtx, lsn); } else { @@ -1233,152 +1128,230 @@ ref hlog.GetValue(physicalAddress), } } - /// - /// Continue a pending RMW operation with the record retrieved from disk. - /// - /// thread (or session) context under which operation must be executed. - /// record read from the disk. - /// internal context for the pending RMW operation - /// Session context - /// - /// - /// - /// Value - /// Description - /// - /// - /// SUCCESS - /// The value has been successfully updated(or inserted). - /// - /// - /// RECORD_ON_DISK - /// The record corresponding to 'key' is on disk. Issue async IO to retrieve record and retry later. - /// - /// - /// RETRY_LATER - /// Cannot be processed immediately due to system state. Add to pending list and retry later. - /// - /// - /// - internal OperationStatus InternalContinuePendingRMW( - FasterExecutionContext opCtx, - AsyncIOContext request, - ref PendingContext pendingContext, FasterExecutionContext sessionCtx) + #endregion + + #region ContainsKeyInMemory + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal Status InternalContainsKeyInMemory(ref Key key, FasterExecutionContext sessionCtx, long fromAddress = -1) { - var recordSize = default(int); + if (fromAddress == -1) + fromAddress = hlog.HeadAddress; + else + Debug.Assert(fromAddress >= hlog.HeadAddress); + var bucket = default(HashBucket*); var slot = default(int); - var logicalAddress = Constants.kInvalidAddress; - var physicalAddress = default(long); - var status = default(OperationStatus); - var latestRecordVersion = default(int); - ref Key key = ref pendingContext.key.Get(); + long physicalAddress; + var latestRecordVersion = -1; var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - #region Trace Back for Record on In-Memory HybridLog - var entry = default(HashBucketEntry); - FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress); - logicalAddress = entry.Address; + if (sessionCtx.phase != Phase.REST) + HeavyEnter(hash, sessionCtx); - // For simplicity, we don't let RMW operations use read cache - if (UseReadCache) - SkipReadCache(ref logicalAddress, ref latestRecordVersion); - var latestLogicalAddress = logicalAddress; + HashBucketEntry entry = default; + var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); - if (logicalAddress >= hlog.HeadAddress) + if (tagExists) { - physicalAddress = hlog.GetPhysicalAddress(logicalAddress); - if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress))) + long logicalAddress = entry.Address; + + if (UseReadCache) + SkipReadCache(ref logicalAddress, ref latestRecordVersion); + + if (logicalAddress >= fromAddress) { - logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; - TraceBackForKeyMatch(ref key, - logicalAddress, - hlog.HeadAddress, - out logicalAddress, - out physicalAddress); + physicalAddress = hlog.GetPhysicalAddress(logicalAddress); + + if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress))) + { + logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; + TraceBackForKeyMatch(ref key, + logicalAddress, + fromAddress, + out logicalAddress, + out _); + } + + if (logicalAddress < fromAddress) + return Status.NOTFOUND; + else + return Status.OK; } + else + return Status.NOTFOUND; } - #endregion - - var previousFirstRecordAddress = pendingContext.entry.Address; - if (logicalAddress > previousFirstRecordAddress) + else { - goto Retry; - } + // no tag found + return Status.NOTFOUND; + } + } + #endregion - #region Create record in mutable region - if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone)) + #region Continue operations + /// + /// Continue a pending read operation. Computes 'output' from 'input' and value corresponding to 'key' + /// obtained from disk. Optionally, it copies the value to tail to serve future read/write requests quickly. + /// + /// The thread (or session) context to execute operation in. + /// Async response from disk. + /// Pending context corresponding to operation. + /// + /// + /// + /// + /// Value + /// Description + /// + /// + /// SUCCESS + /// The output has been computed and stored in 'output'. + /// + /// + /// + internal OperationStatus InternalContinuePendingRead( + FasterExecutionContext ctx, + AsyncIOContext request, + ref PendingContext pendingContext, FasterExecutionContext currentCtx) + { + Debug.Assert(RelaxedCPR || pendingContext.version == ctx.version); + + if (request.logicalAddress >= hlog.BeginAddress) { - recordSize = hlog.GetInitialRecordSize(ref key, ref pendingContext.input); + Debug.Assert(hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Version <= ctx.version); + + if (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone) + return OperationStatus.NOTFOUND; + + functions.SingleReader(ref pendingContext.key.Get(), ref pendingContext.input, + ref hlog.GetContextRecordValue(ref request), ref pendingContext.output); + + if (CopyReadsToTail || UseReadCache) + { + InternalContinuePendingReadCopyToTail(ctx, request, ref pendingContext, currentCtx); + } } else + return OperationStatus.NOTFOUND; + + return OperationStatus.SUCCESS; + } + + /// + /// Copies the record read from disk to tail of the HybridLog. + /// + /// The thread(or session) context to execute operation in. + /// Async response from disk. + /// Pending context corresponding to operation. + /// + internal void InternalContinuePendingReadCopyToTail( + FasterExecutionContext opCtx, + AsyncIOContext request, + ref PendingContext pendingContext, FasterExecutionContext currentCtx) + { + Debug.Assert(RelaxedCPR || pendingContext.version == opCtx.version); + + var recordSize = default(int); + var bucket = default(HashBucket*); + var slot = default(int); + var logicalAddress = Constants.kInvalidAddress; + var physicalAddress = default(long); + var latestRecordVersion = default(int); + + var hash = comparer.GetHashCode64(ref pendingContext.key.Get()); + var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); + + #region Trace back record in in-memory HybridLog + var entry = default(HashBucketEntry); + FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress); + logicalAddress = entry.word & Constants.kAddressMask; + + if (UseReadCache) + SkipReadCache(ref logicalAddress, ref latestRecordVersion); + var latestLogicalAddress = logicalAddress; + + if (logicalAddress >= hlog.HeadAddress) { - physicalAddress = (long)request.record.GetValidPointer(); - recordSize = hlog.GetRecordSize(physicalAddress); + physicalAddress = hlog.GetPhysicalAddress(logicalAddress); + if (!comparer.Equals(ref pendingContext.key.Get(), ref hlog.GetKey(physicalAddress))) + { + logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; + TraceBackForKeyMatch(ref pendingContext.key.Get(), + logicalAddress, + hlog.HeadAddress, + out logicalAddress, + out physicalAddress); + } } - BlockAllocate(recordSize, out long newLogicalAddress, sessionCtx); - var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); - RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version, - true, false, false, - latestLogicalAddress); - hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); - if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone)) + #endregion + + if (logicalAddress > pendingContext.entry.Address) { - functions.InitialUpdater(ref key, - ref pendingContext.input, - ref hlog.GetValue(newPhysicalAddress)); - status = OperationStatus.NOTFOUND; + // Give up early + return; + } + + #region Create new copy in mutable region + physicalAddress = (long)request.record.GetValidPointer(); + recordSize = hlog.GetRecordSize(physicalAddress); + + long newLogicalAddress, newPhysicalAddress; + if (UseReadCache) + { + BlockAllocateReadCache(recordSize, out newLogicalAddress, currentCtx); + newPhysicalAddress = readcache.GetPhysicalAddress(newLogicalAddress); + RecordInfo.WriteInfo(ref readcache.GetInfo(newPhysicalAddress), opCtx.version, + true, false, false, + entry.Address); + readcache.ShallowCopy(ref pendingContext.key.Get(), ref readcache.GetKey(newPhysicalAddress)); + functions.SingleWriter(ref pendingContext.key.Get(), + ref hlog.GetContextRecordValue(ref request), + ref readcache.GetValue(newPhysicalAddress)); } else { - functions.CopyUpdater(ref key, - ref pendingContext.input, - ref hlog.GetContextRecordValue(ref request), - ref hlog.GetValue(newPhysicalAddress)); - status = OperationStatus.SUCCESS; + BlockAllocate(recordSize, out newLogicalAddress, currentCtx); + newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); + RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version, + true, false, false, + latestLogicalAddress); + hlog.ShallowCopy(ref pendingContext.key.Get(), ref hlog.GetKey(newPhysicalAddress)); + functions.SingleWriter(ref pendingContext.key.Get(), + ref hlog.GetContextRecordValue(ref request), + ref hlog.GetValue(newPhysicalAddress)); } + var updatedEntry = default(HashBucketEntry); updatedEntry.Tag = tag; updatedEntry.Address = newLogicalAddress & Constants.kAddressMask; updatedEntry.Pending = entry.Pending; updatedEntry.Tentative = false; + updatedEntry.ReadCache = UseReadCache; var foundEntry = default(HashBucketEntry); foundEntry.word = Interlocked.CompareExchange( - ref bucket->bucket_entries[slot], - updatedEntry.word, entry.word); - - if (foundEntry.word == entry.word) - { - return status; - } - else + ref bucket->bucket_entries[slot], + updatedEntry.word, + entry.word); + if (foundEntry.word != entry.word) { - hlog.GetInfo(newPhysicalAddress).Invalid = true; - goto Retry; + if (!UseReadCache) hlog.GetInfo(newPhysicalAddress).Invalid = true; + // We don't retry, just give up } #endregion - - Retry: - return InternalRetryPendingRMW(opCtx, ref pendingContext, sessionCtx); } - #endregion - - #region Delete Operation - /// - /// Delete operation. Replaces the value corresponding to 'key' with tombstone. - /// If at head, tries to remove item from hash chain + /// Continue a pending RMW operation with the record retrieved from disk. /// - /// Key of the record to be deleted. - /// User context for the operation, in case it goes pending. - /// Pending context used internally to store the context of the operation. + /// thread (or session) context under which operation must be executed. + /// record read from the disk. + /// internal context for the pending RMW operation /// Session context - /// Operation serial number /// /// /// @@ -1387,350 +1360,124 @@ ref hlog.GetContextRecordValue(ref request), /// /// /// SUCCESS - /// The value has been successfully deleted + /// The value has been successfully updated(or inserted). /// /// - /// RETRY_LATER - /// Cannot be processed immediately due to system state. Add to pending list and retry later + /// RECORD_ON_DISK + /// The record corresponding to 'key' is on disk. Issue async IO to retrieve record and retry later. /// /// - /// CPR_SHIFT_DETECTED - /// A shift in version has been detected. Synchronize immediately to avoid violating CPR consistency. + /// RETRY_LATER + /// Cannot be processed immediately due to system state. Add to pending list and retry later. /// /// /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal OperationStatus InternalDelete( - ref Key key, - ref Context userContext, - ref PendingContext pendingContext, FasterExecutionContext sessionCtx, long lsn) + internal OperationStatus InternalContinuePendingRMW( + FasterExecutionContext opCtx, + AsyncIOContext request, + ref PendingContext pendingContext, FasterExecutionContext sessionCtx) { - var status = default(OperationStatus); + var recordSize = default(int); var bucket = default(HashBucket*); var slot = default(int); var logicalAddress = Constants.kInvalidAddress; var physicalAddress = default(long); - var latchOperation = default(LatchOperation); - var version = default(int); - var latestRecordVersion = -1; + var status = default(OperationStatus); + var latestRecordVersion = default(int); + ref Key key = ref pendingContext.key.Get(); var hash = comparer.GetHashCode64(ref key); var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - if (sessionCtx.phase != Phase.REST) - HeavyEnter(hash, sessionCtx); - - #region Trace back for record in in-memory HybridLog + #region Trace Back for Record on In-Memory HybridLog var entry = default(HashBucketEntry); - var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); - if (!tagExists) - return OperationStatus.NOTFOUND; - + FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress); logicalAddress = entry.Address; + // For simplicity, we don't let RMW operations use read cache if (UseReadCache) - SkipAndInvalidateReadCache(ref logicalAddress, ref latestRecordVersion, ref key); + SkipReadCache(ref logicalAddress, ref latestRecordVersion); var latestLogicalAddress = logicalAddress; - if (logicalAddress >= hlog.ReadOnlyAddress) + if (logicalAddress >= hlog.HeadAddress) { physicalAddress = hlog.GetPhysicalAddress(logicalAddress); - if (latestRecordVersion == -1) - latestRecordVersion = hlog.GetInfo(physicalAddress).Version; if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress))) { logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; TraceBackForKeyMatch(ref key, - logicalAddress, - hlog.ReadOnlyAddress, - out logicalAddress, - out physicalAddress); + logicalAddress, + hlog.HeadAddress, + out logicalAddress, + out physicalAddress); } } #endregion - // NO optimization for most common case - //if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress) - //{ - // hlog.GetInfo(physicalAddress).Tombstone = true; - // return OperationStatus.SUCCESS; - //} + var previousFirstRecordAddress = pendingContext.entry.Address; + if (logicalAddress > previousFirstRecordAddress) + { + goto Retry; + } - #region Entry latch operation - if (sessionCtx.phase != Phase.REST) + #region Create record in mutable region + if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone)) { - switch (sessionCtx.phase) - { - case Phase.PREPARE: - { - version = sessionCtx.version; - if (HashBucket.TryAcquireSharedLatch(bucket)) - { - // Set to release shared latch (default) - latchOperation = LatchOperation.ReleaseShared; - if (latestRecordVersion != -1 && latestRecordVersion > version) - { - status = OperationStatus.CPR_SHIFT_DETECTED; - goto CreatePendingContext; // Pivot Thread - } - break; // Normal Processing - } - else - { - status = OperationStatus.CPR_SHIFT_DETECTED; - goto CreatePendingContext; // Pivot Thread - } - } - case Phase.IN_PROGRESS: - { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) - { - if (HashBucket.TryAcquireExclusiveLatch(bucket)) - { - // Set to release exclusive latch (default) - latchOperation = LatchOperation.ReleaseExclusive; - goto CreateNewRecord; // Create a (v+1) record - } - else - { - status = OperationStatus.RETRY_LATER; - goto CreatePendingContext; // Go Pending - } - } - break; // Normal Processing - } - case Phase.WAIT_PENDING: - { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) - { - if (HashBucket.NoSharedLatches(bucket)) - { - goto CreateNewRecord; // Create a (v+1) record - } - else - { - status = OperationStatus.RETRY_LATER; - goto CreatePendingContext; // Go Pending - } - } - break; // Normal Processing - } - case Phase.WAIT_FLUSH: - { - version = (sessionCtx.version - 1); - if (latestRecordVersion != -1 && latestRecordVersion <= version) - { - goto CreateNewRecord; // Create a (v+1) record - } - break; // Normal Processing - } - default: - break; - } + recordSize = hlog.GetInitialRecordSize(ref key, ref pendingContext.input); } - #endregion - - Debug.Assert(latestRecordVersion <= sessionCtx.version); - - #region Normal processing - - // Record is in memory, try to update hash chain and completely elide record - // only if previous address points to invalid address - if (logicalAddress >= hlog.ReadOnlyAddress) + else { - if (entry.Address == logicalAddress && hlog.GetInfo(physicalAddress).PreviousAddress < hlog.BeginAddress) - { - var updatedEntry = default(HashBucketEntry); - updatedEntry.Tag = 0; - if (hlog.GetInfo(physicalAddress).PreviousAddress == Constants.kTempInvalidAddress) - updatedEntry.Address = Constants.kInvalidAddress; - else - updatedEntry.Address = hlog.GetInfo(physicalAddress).PreviousAddress; - updatedEntry.Pending = entry.Pending; - updatedEntry.Tentative = false; - - if (entry.word == Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word)) - { - // Apply tombstone bit to the record - hlog.GetInfo(physicalAddress).Tombstone = true; - - if (WriteDefaultOnDelete) - { - // Write default value - // Ignore return value, the record is already marked - Value v = default; - functions.ConcurrentWriter(ref hlog.GetKey(physicalAddress), ref v, ref hlog.GetValue(physicalAddress)); - } - - status = OperationStatus.SUCCESS; - goto LatchRelease; // Release shared latch (if acquired) - } - } + physicalAddress = (long)request.record.GetValidPointer(); + recordSize = hlog.GetRecordSize(physicalAddress); } - - // Mutable Region: Update the record in-place - if (logicalAddress >= hlog.ReadOnlyAddress) + BlockAllocate(recordSize, out long newLogicalAddress, sessionCtx); + var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); + RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version, + true, false, false, + latestLogicalAddress); + hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); + if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone)) { - hlog.GetInfo(physicalAddress).Tombstone = true; - - if (WriteDefaultOnDelete) - { - // Write default value - // Ignore return value, the record is already marked - Value v = default; - functions.ConcurrentWriter(ref hlog.GetKey(physicalAddress), ref v, ref hlog.GetValue(physicalAddress)); - } - - status = OperationStatus.SUCCESS; - goto LatchRelease; // Release shared latch (if acquired) + functions.InitialUpdater(ref key, + ref pendingContext.input, + ref hlog.GetValue(newPhysicalAddress)); + status = OperationStatus.NOTFOUND; } - - // All other regions: Create a record in the mutable region - #endregion - - #region Create new record in the mutable region - CreateNewRecord: + else { - var value = default(Value); - // Immutable region or new record - // Allocate default record size for tombstone - var recordSize = hlog.GetRecordSize(ref key, ref value); - BlockAllocate(recordSize, out long newLogicalAddress, sessionCtx); - var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress); - RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), - sessionCtx.version, - true, true, false, - latestLogicalAddress); - hlog.ShallowCopy(ref key, ref hlog.GetKey(newPhysicalAddress)); + functions.CopyUpdater(ref key, + ref pendingContext.input, + ref hlog.GetContextRecordValue(ref request), + ref hlog.GetValue(newPhysicalAddress)); + status = OperationStatus.SUCCESS; + } - var updatedEntry = default(HashBucketEntry); - updatedEntry.Tag = tag; - updatedEntry.Address = newLogicalAddress & Constants.kAddressMask; - updatedEntry.Pending = entry.Pending; - updatedEntry.Tentative = false; + var updatedEntry = default(HashBucketEntry); + updatedEntry.Tag = tag; + updatedEntry.Address = newLogicalAddress & Constants.kAddressMask; + updatedEntry.Pending = entry.Pending; + updatedEntry.Tentative = false; - var foundEntry = default(HashBucketEntry); - foundEntry.word = Interlocked.CompareExchange( + var foundEntry = default(HashBucketEntry); + foundEntry.word = Interlocked.CompareExchange( ref bucket->bucket_entries[slot], updatedEntry.word, entry.word); - if (foundEntry.word == entry.word) - { - status = OperationStatus.SUCCESS; - goto LatchRelease; - } - else - { - hlog.GetInfo(newPhysicalAddress).Invalid = true; - status = OperationStatus.RETRY_NOW; - goto LatchRelease; - } - } - #endregion - - #region Create pending context - CreatePendingContext: - { - pendingContext.type = OperationType.DELETE; - pendingContext.key = hlog.GetKeyContainer(ref key); - pendingContext.userContext = userContext; - pendingContext.entry.word = entry.word; - pendingContext.logicalAddress = logicalAddress; - pendingContext.version = sessionCtx.version; - pendingContext.serialNum = lsn; - } - #endregion - - #region Latch release - LatchRelease: - { - switch (latchOperation) - { - case LatchOperation.ReleaseShared: - HashBucket.ReleaseSharedLatch(bucket); - break; - case LatchOperation.ReleaseExclusive: - HashBucket.ReleaseExclusiveLatch(bucket); - break; - default: - break; - } - } - #endregion - - if (status == OperationStatus.RETRY_NOW) + if (foundEntry.word == entry.word) { - return InternalDelete(ref key, ref userContext, ref pendingContext, sessionCtx, lsn); + return status; } else { - return status; + hlog.GetInfo(newPhysicalAddress).Invalid = true; + goto Retry; } - } - #endregion - #region ContainsKeyInMemory - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal Status InternalContainsKeyInMemory(ref Key key, FasterExecutionContext sessionCtx, long fromAddress = -1) - { - if (fromAddress == -1) - fromAddress = hlog.HeadAddress; - else - Debug.Assert(fromAddress >= hlog.HeadAddress); - - var bucket = default(HashBucket*); - var slot = default(int); - long physicalAddress; - var latestRecordVersion = -1; - - var hash = comparer.GetHashCode64(ref key); - var tag = (ushort)((ulong)hash >> Constants.kHashTagShift); - - if (sessionCtx.phase != Phase.REST) - HeavyEnter(hash, sessionCtx); - - HashBucketEntry entry = default; - var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry); - - if (tagExists) - { - long logicalAddress = entry.Address; - - if (UseReadCache) - SkipReadCache(ref logicalAddress, ref latestRecordVersion); - - if (logicalAddress >= fromAddress) - { - physicalAddress = hlog.GetPhysicalAddress(logicalAddress); - - if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress))) - { - logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress; - TraceBackForKeyMatch(ref key, - logicalAddress, - fromAddress, - out logicalAddress, - out _); - } - - if (logicalAddress < fromAddress) - return Status.NOTFOUND; - else - return Status.OK; - } - else - return Status.NOTFOUND; - } - else - { - // no tag found - return Status.NOTFOUND; - } + Retry: + return InternalRMW(ref pendingContext.key.Get(), ref pendingContext.input, ref pendingContext.userContext, ref pendingContext, sessionCtx, pendingContext.serialNum); } + #endregion #region Helper Functions @@ -1793,7 +1540,10 @@ ref pendingContext.value.Get(), ref pendingContext, currentCtx, pendingContext.serialNum); break; case OperationType.RMW: - internalStatus = InternalRetryPendingRMW(currentCtx, ref pendingContext, currentCtx); + internalStatus = InternalRMW(ref pendingContext.key.Get(), + ref pendingContext.input, + ref pendingContext.userContext, + ref pendingContext, currentCtx, pendingContext.serialNum); break; } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 649596894..9f32d6c5a 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -156,8 +156,8 @@ internal bool InternalCompletePending(FasterExecutionContext ctx, bool wait = fa { if (ctx.phase == Phase.IN_PROGRESS || ctx.phase == Phase.WAIT_PENDING) { - CompleteIOPendingRequests(ctx.prevCtx, ctx); - CompleteRetryRequests(ctx.prevCtx, ctx); + InternalCompletePendingRequests(ctx.prevCtx, ctx); + InternalCompleteRetryRequests(ctx.prevCtx, ctx); InternalRefresh(ctx); done &= (ctx.prevCtx.HasNoPendingRequests); @@ -165,8 +165,8 @@ internal bool InternalCompletePending(FasterExecutionContext ctx, bool wait = fa } #endregion - CompleteIOPendingRequests(ctx, ctx); - CompleteRetryRequests(ctx, ctx); + InternalCompletePendingRequests(ctx, ctx); + InternalCompleteRetryRequests(ctx, ctx); InternalRefresh(ctx); done &= (ctx.HasNoPendingRequests); @@ -188,111 +188,39 @@ internal bool InternalCompletePending(FasterExecutionContext ctx, bool wait = fa internal bool InRestPhase() => _systemState.phase == Phase.REST; - internal void CompleteRetryRequests(FasterExecutionContext opCtx, FasterExecutionContext currentCtx) + #region Complete Retry Requests + internal void InternalCompleteRetryRequests(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, ClientSession clientSession = null) { int count = opCtx.retryRequests.Count; if (count == 0) return; + clientSession?.UnsafeResumeThread(); for (int i = 0; i < count; i++) { var pendingContext = opCtx.retryRequests.Dequeue(); - InternalRetryRequestAndCallback(opCtx, currentCtx, pendingContext); + InternalCompleteRetryRequest(opCtx, currentCtx, pendingContext); } + clientSession?.UnsafeSuspendThread(); } - internal void CompleteRetryRequests(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, ClientSession clientSession) - { - int count = opCtx.retryRequests.Count; - - if (count == 0) return; - - clientSession.UnsafeResumeThread(); - for (int i = 0; i < count; i++) - { - var pendingContext = opCtx.retryRequests.Dequeue(); - InternalRetryRequestAndCallback(opCtx, currentCtx, pendingContext); - } - clientSession.UnsafeSuspendThread(); - } - - internal void CompleteIOPendingRequests(FasterExecutionContext opCtx, FasterExecutionContext currentCtx) - { - if (opCtx.readyResponses.Count == 0) return; - - while (opCtx.readyResponses.TryDequeue(out AsyncIOContext request)) - { - InternalContinuePendingRequestAndCallback(opCtx, currentCtx, request); - } - } - - internal async ValueTask CompleteIOPendingRequestsAsync(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, ClientSession clientSession, CancellationToken token = default) - { - while (opCtx.ioPendingRequests.Count > 0) - { - AsyncIOContext request; - - if (opCtx.readyResponses.Count > 0) - { - clientSession.UnsafeResumeThread(); - while (opCtx.readyResponses.Count > 0) - { - opCtx.readyResponses.TryDequeue(out request); - InternalContinuePendingRequestAndCallback(opCtx, currentCtx, request); - } - clientSession.UnsafeSuspendThread(); - } - else - { - request = await opCtx.readyResponses.DequeueAsync(token); - - clientSession.UnsafeResumeThread(); - InternalContinuePendingRequestAndCallback(opCtx, currentCtx, request); - clientSession.UnsafeSuspendThread(); - } - } - } - - internal void InternalRetryRequestAndCallback( - FasterExecutionContext opCtx, - FasterExecutionContext currentCtx, - PendingContext pendingContext) + internal void InternalCompleteRetryRequest(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, PendingContext pendingContext) { var internalStatus = default(OperationStatus); ref Key key = ref pendingContext.key.Get(); ref Value value = ref pendingContext.value.Get(); - bool handleLatches = false; - - if (!RelaxedCPR) - { - #region Entry latch operation - - if ((opCtx.version < currentCtx.version) // Thread has already shifted to (v+1) - || - (currentCtx.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch - { - handleLatches = true; - } - #endregion - } - // Issue retry command switch (pendingContext.type) { case OperationType.RMW: - internalStatus = InternalRetryPendingRMW(opCtx, ref pendingContext, currentCtx); + internalStatus = InternalRMW(ref key, ref pendingContext.input, ref pendingContext.userContext, ref pendingContext, currentCtx, pendingContext.serialNum); break; case OperationType.UPSERT: - internalStatus = InternalUpsert(ref key, - ref value, - ref pendingContext.userContext, - ref pendingContext, currentCtx, pendingContext.serialNum); + internalStatus = InternalUpsert(ref key, ref value, ref pendingContext.userContext, ref pendingContext, currentCtx, pendingContext.serialNum); break; case OperationType.DELETE: - internalStatus = InternalDelete(ref key, - ref pendingContext.userContext, - ref pendingContext, currentCtx, pendingContext.serialNum); + internalStatus = InternalDelete(ref key, ref pendingContext.userContext, ref pendingContext, currentCtx, pendingContext.serialNum); break; case OperationType.READ: throw new FasterException("Cannot happen!"); @@ -313,7 +241,7 @@ internal void InternalRetryRequestAndCallback( // If done, callback user code. if (status == Status.OK || status == Status.NOTFOUND) { - if (handleLatches) + if (pendingContext.heldLatch == LatchOperation.Shared) ReleaseSharedLatch(key); switch (pendingContext.type) @@ -338,24 +266,48 @@ internal void InternalRetryRequestAndCallback( } } + #endregion - internal void InternalContinuePendingRequestAndCallback( - FasterExecutionContext opCtx, - FasterExecutionContext currentCtx, - AsyncIOContext request) + #region Complete Pending Requests + internal void InternalCompletePendingRequests(FasterExecutionContext opCtx, FasterExecutionContext currentCtx) { - bool handleLatches = false; + if (opCtx.readyResponses.Count == 0) return; - if (!RelaxedCPR) + while (opCtx.readyResponses.TryDequeue(out AsyncIOContext request)) { - if ((opCtx.version < currentCtx.version) // Thread has already shifted to (v+1) - || - (currentCtx.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch + InternalCompletePendingRequest(opCtx, currentCtx, request); + } + } + + internal async ValueTask InternalCompletePendingRequestsAsync(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, ClientSession clientSession, CancellationToken token = default) + { + while (opCtx.ioPendingRequests.Count > 0) + { + AsyncIOContext request; + + if (opCtx.readyResponses.Count > 0) + { + clientSession.UnsafeResumeThread(); + while (opCtx.readyResponses.Count > 0) + { + opCtx.readyResponses.TryDequeue(out request); + InternalCompletePendingRequest(opCtx, currentCtx, request); + } + clientSession.UnsafeSuspendThread(); + } + else { - handleLatches = true; + request = await opCtx.readyResponses.DequeueAsync(token); + + clientSession.UnsafeResumeThread(); + InternalCompletePendingRequest(opCtx, currentCtx, request); + clientSession.UnsafeSuspendThread(); } } + } + internal void InternalCompletePendingRequest(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, AsyncIOContext request) + { if (opCtx.ioPendingRequests.TryGetValue(request.id, out PendingContext pendingContext)) { ref Key key = ref pendingContext.key.Get(); @@ -390,7 +342,7 @@ internal void InternalContinuePendingRequestAndCallback( // If done, callback user code if (status == Status.OK || status == Status.NOTFOUND) { - if (handleLatches) + if (pendingContext.heldLatch == LatchOperation.Shared) ReleaseSharedLatch(key); if (pendingContext.type == OperationType.READ) @@ -413,26 +365,10 @@ internal void InternalContinuePendingRequestAndCallback( } } - - internal (Status, Output) InternalCompleteIOPendingReadRequestsAsync( - FasterExecutionContext opCtx, - FasterExecutionContext currentCtx, - AsyncIOContext request, PendingContext pendingContext) + internal (Status, Output) InternalCompletePendingReadRequestAsync(FasterExecutionContext opCtx, FasterExecutionContext currentCtx, AsyncIOContext request, PendingContext pendingContext) { - bool handleLatches = false; (Status, Output) s = default; - if (!RelaxedCPR) - { - if ((opCtx.version < currentCtx.version) // Thread has already shifted to (v+1) - || - (currentCtx.phase == Phase.PREPARE)) // Thread still in version v, but acquired shared-latch - { - handleLatches = true; - } - } - - ref Key key = ref pendingContext.key.Get(); OperationStatus internalStatus = InternalContinuePendingRead(opCtx, request, ref pendingContext, currentCtx); @@ -450,11 +386,9 @@ internal void InternalContinuePendingRequestAndCallback( throw new Exception($"Unexpected {nameof(OperationStatus)} while reading => {internalStatus}"); } - - if (handleLatches) + if (pendingContext.heldLatch == LatchOperation.Shared) ReleaseSharedLatch(key); - functions.ReadCompletionCallback(ref key, ref pendingContext.input, ref pendingContext.output, @@ -463,12 +397,10 @@ internal void InternalContinuePendingRequestAndCallback( s.Item1 = status; s.Item2 = pendingContext.output; - pendingContext.Dispose(); - return s; } - + #endregion } }