Skip to content

Commit

Permalink
[C#] Added PostCopyUpdater to IAdvancedFunctions (#561)
Browse files Browse the repository at this point in the history
* Add support for record-expiration management to RMW and I(Advanced)Functions

* Update Remote to RMW changes for expiration support

* Update SpanByteFunctionsForServer.cs

* Update SpanByteFunctionsForServer.cs

* Update ServerKVFunctions.cs

* Added SpanByteAdvancedFunctions

* moving remote to IAdvFun

* LogDir semantics - minor update

* Added PCU

* fixed pcu logic

* Fixing testcase so that RMW does not expect old address.

* Fix PCU bugs

* Extend PostCopyUpdate to IFunctions; fix a few comments in remote

Co-authored-by: TedHartMS <15467143+TedHartMS@users.noreply.github.com>
  • Loading branch information
badrishc and TedHartMS authored Oct 3, 2021
1 parent 2e93d9b commit a80b077
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
newValue.value = input.value + oldValue.value;
}

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

public bool SupportsLocking => locking;

public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext)
Expand Down
3 changes: 3 additions & 0 deletions cs/remote/samples/FixedLenServer/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
output.value = newValue;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true;

public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { }

public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) => true;
Expand Down
2 changes: 0 additions & 2 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ public Status Publish(Key key, Value desiredValue, Context userContext = default
/// SubscribeKV operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
Expand All @@ -266,7 +265,6 @@ public void Subscribe(Key key, Context userContext = default, long serialNo = 0)
/// PSubscribe operation
/// </summary>
/// <param name="prefix">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
Expand Down
12 changes: 12 additions & 0 deletions cs/remote/src/FASTER.common/HeaderReaderWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public unsafe bool Write(MessageType s, ref byte* dst, int length)
return true;
}

/// <summary>
/// Write serial number to memory
/// </summary>
/// <param name="seqNum">Message type</param>
/// <param name="dst">Destination memory</param>
/// <param name="length">Length of destination</param>
/// <returns>Whether write succeeded</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe bool Write(long seqNum, ref byte* dst, int length)
{
Expand All @@ -58,6 +65,11 @@ public unsafe MessageType ReadMessageType(ref byte* dst)
return (MessageType)(*dst++);
}

/// <summary>
/// Read serial number
/// </summary>
/// <param name="dst">Source memory</param>
/// <returns>Message type</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe long ReadSerialNum(ref byte* dst)
{
Expand Down
3 changes: 3 additions & 0 deletions cs/remote/src/FASTER.server/ServerKVFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address)
=> functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address);

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
=> functions.PostCopyUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address);

public void DeleteCompletionCallback(ref Key key, long ctx)
=> functions.DeleteCompletionCallback(ref key, ctx);

Expand Down
27 changes: 27 additions & 0 deletions cs/src/core/ClientSession/AdvancedClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,33 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address)
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
=> !this.SupportsLocking
? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address)
: PostCopyUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
recordInfo.Version = _clientSession.ctx.version;
return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address);
}

private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
long context = 0;
this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context);
try
{
// KeyIndexes do not need notification of in-place updates because the key does not change.
return !recordInfo.Tombstone && PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address);
}
finally
{
this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context);
}
}
public void DeleteCompletionCallback(ref Key key, Context ctx)
=> _clientSession.functions.DeleteCompletionCallback(ref key, ctx);

Expand Down
34 changes: 32 additions & 2 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,11 +1112,41 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address)
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
=> !this.SupportsLocking
? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address)
: PostCopyUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
recordInfo.Version = _clientSession.ctx.version;
return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref value, ref output);
}

private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
long context = 0;
this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context);
try
{
// KeyIndexes do not need notification of in-place updates because the key does not change.
return !recordInfo.Tombstone && PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address);
}
finally
{
this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context);
}
}

public void DeleteCompletionCallback(ref Key key, Context ctx)
=> _clientSession.functions.DeleteCompletionCallback(ref key, ctx);

public int GetInitialLength(ref Input input)
=> _clientSession.variableLengthStruct.GetInitialLength(ref input);
public int GetInitialLength(ref Input input)
{
return _clientSession.variableLengthStruct.GetInitialLength(ref input);
}

public int GetLength(ref Value t, ref Input input)
=> _clientSession.variableLengthStruct.GetLength(ref t, ref input);
Expand Down
41 changes: 29 additions & 12 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -973,15 +973,22 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
foundEntry.word = Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word);
if (foundEntry.word == entry.word)
{
pendingContext.logicalAddress = newLogicalAddress;
// If IU, return notfound. Else (CU), call PCU. If PCU is true, return success. Else retry op.
if (status != OperationStatus.SUCCESS ||
fasterSession.PostCopyUpdater(ref key, ref input,
ref hlog.GetValue(newPhysicalAddress),
ref output, ref hlog.GetInfo(physicalAddress), newLogicalAddress))
{
pendingContext.logicalAddress = newLogicalAddress;
return status;
}
}
else
{
// CAS failed
hlog.GetInfo(newPhysicalAddress).Invalid = true;
status = OperationStatus.RETRY_NOW;
}

status = OperationStatus.RETRY_NOW;
return status;
}

Expand Down Expand Up @@ -1552,16 +1559,26 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
updatedEntry.Tentative = false;

var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word, entry.word);

foundEntry.word = Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word);
if (foundEntry.word == entry.word)
return status;

// CAS failed. Fall through to call InternalRMW again to restart the sequence and return that status.
hlog.GetInfo(newPhysicalAddress).Invalid = true;
#endregion
{
// If IU, return notfound. Else (CU), call PCU. If PCU is true, return success. Else retry op.
if (status != OperationStatus.SUCCESS ||
fasterSession.PostCopyUpdater(ref key,
ref pendingContext.input.Get(),
ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref hlog.GetInfo(newPhysicalAddress), newLogicalAddress))
{
pendingContext.logicalAddress = newLogicalAddress;
return status;
}
}
else
{
// CAS failed. Fall through to call InternalRMW again to restart the sequence and return that status.
hlog.GetInfo(newPhysicalAddress).Invalid = true;
}
#endregion
}

OperationStatus internalStatus;
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Index/FASTER/FASTERLegacy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
_fasterKV._functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output);
}

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
{
return true;
}

public void DeleteCompletionCallback(ref Key key, Context ctx)
{
_fasterKV._functions.DeleteCompletionCallback(ref key, ctx);
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Index/FASTER/LogCompactionFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoi
public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) => true;

public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { }
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

public void DeleteCompletionCallback(ref Key key, Context ctx) { }

Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Index/Interfaces/FunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value
public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true;
/// <inheritdoc/>
public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { }
/// <inheritdoc/>
public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

/// <inheritdoc/>
public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

Expand Down Expand Up @@ -160,6 +163,8 @@ public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value
/// <inheritdoc/>
public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) { }
/// <inheritdoc/>
public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true;
/// <inheritdoc/>
public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true;

/// <inheritdoc/>
Expand Down
12 changes: 12 additions & 0 deletions cs/src/core/Index/Interfaces/IAdvancedFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output
/// <param name="address">The logical address of the record being updated; used as a RecordId by indexing</param>
void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address);

/// <summary>
/// Called after a copy-update for RMW is successfully installed
/// </summary>
/// <param name="key">The key for this record</param>
/// <param name="input">The user input to be used for computing the updated <paramref name="value"/></param>
/// <param name="value">The destination to be updated; because this is an in-place update, there is a previous value there.</param>
/// <param name="output">The location where the result of the <paramref name="input"/> operation on <paramref name="value"/> is to be copied</param>
/// <param name="recordInfo">A reference to the header of the record; may be used by <see cref="Lock(ref RecordInfo, ref Key, ref Value, LockType, ref long)"/></param>
/// <param name="address">The logical address of the record being updated; used as a RecordId by indexing</param>
/// <returns>True if the value was successful updated, else false (e.g. the value was expired)</returns>
bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address);

/// <summary>
/// In-place update for RMW
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/Index/Interfaces/IFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output
/// <param name="output">The location where <paramref name="newValue"/> is to be copied</param>
void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output);

/// <summary>
/// Called after a copy-update for RMW is successfully installed
/// </summary>
/// <param name="key">The key for this record</param>
/// <param name="input">The user input to be used for computing the updated <paramref name="value"/></param>
/// <param name="value">The destination to be updated; because this is an in-place update, there is a previous value there.</param>
/// <param name="output">The location where the result of the <paramref name="input"/> operation on <paramref name="value"/> is to be copied</param>
/// <returns>True if the value was successful updated, else false (e.g. the value was expired)</returns>
bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output);

/// <summary>
/// In-place update for RMW
/// </summary>
Expand Down
3 changes: 2 additions & 1 deletion cs/test/CompletePendingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ internal void Process(CompletedOutputIterator<KeyStruct, ValueStruct, InputStruc
{
ref var result = ref completedOutputs.Current;
VerifyStructs((int)result.Key.kfield1, ref result.Key, ref result.Input, ref result.Output, ref result.Context, useRMW);
Assert.AreEqual(keyAddressDict[(int)result.Key.kfield1], result.Address);
if (!useRMW)
Assert.AreEqual(keyAddressDict[(int)result.Key.kfield1], result.Address);
}
completedOutputs.Dispose();
Assert.AreEqual(deferredPending + 1, count);
Expand Down

0 comments on commit a80b077

Please sign in to comment.