diff --git a/cs/benchmark/Functions.cs b/cs/benchmark/Functions.cs index 3095d78e9..043295258 100644 --- a/cs/benchmark/Functions.cs +++ b/cs/benchmark/Functions.cs @@ -85,6 +85,8 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va newValue.value = input.value + oldValue.value; } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true; + public bool SupportsLocking => locking; public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) diff --git a/cs/remote/samples/FixedLenServer/Types.cs b/cs/remote/samples/FixedLenServer/Types.cs index a73fe64b0..fe890f964 100644 --- a/cs/remote/samples/FixedLenServer/Types.cs +++ b/cs/remote/samples/FixedLenServer/Types.cs @@ -119,6 +119,9 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va output.value = newValue; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true; + public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { } public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) => true; diff --git a/cs/remote/src/FASTER.client/ClientSession.cs b/cs/remote/src/FASTER.client/ClientSession.cs index c6519f4b8..9e4d4ca63 100644 --- a/cs/remote/src/FASTER.client/ClientSession.cs +++ b/cs/remote/src/FASTER.client/ClientSession.cs @@ -255,7 +255,6 @@ public Status Publish(Key key, Value desiredValue, Context userContext = default /// SubscribeKV operation /// /// Key - /// Input /// User context /// Serial number /// Status of operation @@ -266,7 +265,6 @@ public void Subscribe(Key key, Context userContext = default, long serialNo = 0) /// PSubscribe operation /// /// Key - /// Input /// User context /// Serial number /// Status of operation diff --git a/cs/remote/src/FASTER.common/HeaderReaderWriter.cs b/cs/remote/src/FASTER.common/HeaderReaderWriter.cs index 0833fff4d..b0f05a12a 100644 --- a/cs/remote/src/FASTER.common/HeaderReaderWriter.cs +++ b/cs/remote/src/FASTER.common/HeaderReaderWriter.cs @@ -38,6 +38,13 @@ public unsafe bool Write(MessageType s, ref byte* dst, int length) return true; } + /// + /// Write serial number to memory + /// + /// Message type + /// Destination memory + /// Length of destination + /// Whether write succeeded [MethodImpl(MethodImplOptions.AggressiveInlining)] public unsafe bool Write(long seqNum, ref byte* dst, int length) { @@ -58,6 +65,11 @@ public unsafe MessageType ReadMessageType(ref byte* dst) return (MessageType)(*dst++); } + /// + /// Read serial number + /// + /// Source memory + /// Message type [MethodImpl(MethodImplOptions.AggressiveInlining)] public unsafe long ReadSerialNum(ref byte* dst) { diff --git a/cs/remote/src/FASTER.server/ServerKVFunctions.cs b/cs/remote/src/FASTER.server/ServerKVFunctions.cs index 27387a061..a6067e8de 100644 --- a/cs/remote/src/FASTER.server/ServerKVFunctions.cs +++ b/cs/remote/src/FASTER.server/ServerKVFunctions.cs @@ -40,6 +40,9 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address); + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) + => functions.PostCopyUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); + public void DeleteCompletionCallback(ref Key key, long ctx) => functions.DeleteCompletionCallback(ref key, ctx); diff --git a/cs/src/core/ClientSession/AdvancedClientSession.cs b/cs/src/core/ClientSession/AdvancedClientSession.cs index 254772fd3..f20f7de51 100644 --- a/cs/src/core/ClientSession/AdvancedClientSession.cs +++ b/cs/src/core/ClientSession/AdvancedClientSession.cs @@ -1034,6 +1034,33 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) + => !this.SupportsLocking + ? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address) + : PostCopyUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) + { + recordInfo.Version = _clientSession.ctx.version; + return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); + } + + private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) + { + long context = 0; + this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); + try + { + // KeyIndexes do not need notification of in-place updates because the key does not change. + return !recordInfo.Tombstone && PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address); + } + finally + { + this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context); + } + } public void DeleteCompletionCallback(ref Key key, Context ctx) => _clientSession.functions.DeleteCompletionCallback(ref key, ctx); diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index de351ae9d..38e1bdcad 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -1112,11 +1112,41 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) => _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) + => !this.SupportsLocking + ? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address) + : PostCopyUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) + { + recordInfo.Version = _clientSession.ctx.version; + return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref value, ref output); + } + + private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) + { + long context = 0; + this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); + try + { + // KeyIndexes do not need notification of in-place updates because the key does not change. + return !recordInfo.Tombstone && PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address); + } + finally + { + this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context); + } + } + public void DeleteCompletionCallback(ref Key key, Context ctx) => _clientSession.functions.DeleteCompletionCallback(ref key, ctx); - public int GetInitialLength(ref Input input) - => _clientSession.variableLengthStruct.GetInitialLength(ref input); + public int GetInitialLength(ref Input input) + { + return _clientSession.variableLengthStruct.GetInitialLength(ref input); + } public int GetLength(ref Value t, ref Input input) => _clientSession.variableLengthStruct.GetLength(ref t, ref input); diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index ee1a86538..c78091f49 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -973,15 +973,22 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), foundEntry.word = Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word); if (foundEntry.word == entry.word) { - pendingContext.logicalAddress = newLogicalAddress; + // If IU, return notfound. Else (CU), call PCU. If PCU is true, return success. Else retry op. + if (status != OperationStatus.SUCCESS || + fasterSession.PostCopyUpdater(ref key, ref input, + ref hlog.GetValue(newPhysicalAddress), + ref output, ref hlog.GetInfo(physicalAddress), newLogicalAddress)) + { + pendingContext.logicalAddress = newLogicalAddress; + return status; + } } else { // CAS failed hlog.GetInfo(newPhysicalAddress).Invalid = true; - status = OperationStatus.RETRY_NOW; } - + status = OperationStatus.RETRY_NOW; return status; } @@ -1552,16 +1559,26 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), updatedEntry.Tentative = false; var foundEntry = default(HashBucketEntry); - foundEntry.word = Interlocked.CompareExchange( - ref bucket->bucket_entries[slot], - updatedEntry.word, entry.word); - + foundEntry.word = Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word); if (foundEntry.word == entry.word) - return status; - - // CAS failed. Fall through to call InternalRMW again to restart the sequence and return that status. - hlog.GetInfo(newPhysicalAddress).Invalid = true; -#endregion + { + // If IU, return notfound. Else (CU), call PCU. If PCU is true, return success. Else retry op. + if (status != OperationStatus.SUCCESS || + fasterSession.PostCopyUpdater(ref key, + ref pendingContext.input.Get(), + ref hlog.GetValue(newPhysicalAddress), + ref pendingContext.output, ref hlog.GetInfo(newPhysicalAddress), newLogicalAddress)) + { + pendingContext.logicalAddress = newLogicalAddress; + return status; + } + } + else + { + // CAS failed. Fall through to call InternalRMW again to restart the sequence and return that status. + hlog.GetInfo(newPhysicalAddress).Invalid = true; + } + #endregion } OperationStatus internalStatus; diff --git a/cs/src/core/Index/FASTER/FASTERLegacy.cs b/cs/src/core/Index/FASTER/FASTERLegacy.cs index e51ae0c80..073cbcb73 100644 --- a/cs/src/core/Index/FASTER/FASTERLegacy.cs +++ b/cs/src/core/Index/FASTER/FASTERLegacy.cs @@ -345,6 +345,11 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va _fasterKV._functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output); } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) + { + return true; + } + public void DeleteCompletionCallback(ref Key key, Context ctx) { _fasterKV._functions.DeleteCompletionCallback(ref key, ctx); diff --git a/cs/src/core/Index/FASTER/LogCompactionFunctions.cs b/cs/src/core/Index/FASTER/LogCompactionFunctions.cs index 30652f8ef..ff1a0494f 100644 --- a/cs/src/core/Index/FASTER/LogCompactionFunctions.cs +++ b/cs/src/core/Index/FASTER/LogCompactionFunctions.cs @@ -28,6 +28,7 @@ public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoi public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) => true; public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { } + public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true; public void DeleteCompletionCallback(ref Key key, Context ctx) { } diff --git a/cs/src/core/Index/Interfaces/FunctionsBase.cs b/cs/src/core/Index/Interfaces/FunctionsBase.cs index 35a091780..e9264b098 100644 --- a/cs/src/core/Index/Interfaces/FunctionsBase.cs +++ b/cs/src/core/Index/Interfaces/FunctionsBase.cs @@ -39,6 +39,9 @@ public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true; /// public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { } + /// + public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true; + /// public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true; @@ -160,6 +163,8 @@ public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value /// public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) { } /// + public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true; + /// public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true; /// diff --git a/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs b/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs index d547ede1a..014def227 100644 --- a/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs +++ b/cs/src/core/Index/Interfaces/IAdvancedFunctions.cs @@ -102,6 +102,18 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output /// The logical address of the record being updated; used as a RecordId by indexing void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address); + /// + /// Called after a copy-update for RMW is successfully installed + /// + /// The key for this record + /// The user input to be used for computing the updated + /// The destination to be updated; because this is an in-place update, there is a previous value there. + /// The location where the result of the operation on is to be copied + /// A reference to the header of the record; may be used by + /// The logical address of the record being updated; used as a RecordId by indexing + /// True if the value was successful updated, else false (e.g. the value was expired) + bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address); + /// /// In-place update for RMW /// diff --git a/cs/src/core/Index/Interfaces/IFunctions.cs b/cs/src/core/Index/Interfaces/IFunctions.cs index c2122783c..581bfa46e 100644 --- a/cs/src/core/Index/Interfaces/IFunctions.cs +++ b/cs/src/core/Index/Interfaces/IFunctions.cs @@ -99,6 +99,16 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output /// The location where is to be copied void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output); + /// + /// Called after a copy-update for RMW is successfully installed + /// + /// The key for this record + /// The user input to be used for computing the updated + /// The destination to be updated; because this is an in-place update, there is a previous value there. + /// The location where the result of the operation on is to be copied + /// True if the value was successful updated, else false (e.g. the value was expired) + bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output); + /// /// In-place update for RMW /// diff --git a/cs/test/CompletePendingTests.cs b/cs/test/CompletePendingTests.cs index f18d3c5e4..25655b23a 100644 --- a/cs/test/CompletePendingTests.cs +++ b/cs/test/CompletePendingTests.cs @@ -98,7 +98,8 @@ internal void Process(CompletedOutputIterator