Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Added PostCopyUpdater to IAdvancedFunctions #561

Merged
merged 27 commits into from
Oct 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2ef10bd
Add support for record-expiration management to RMW and I(Advanced)Fu…
TedHartMS Aug 18, 2021
a3eee17
Merge remote-tracking branch 'origin/master' into rmw-expiration
TedHartMS Aug 18, 2021
1800057
Update Remote to RMW changes for expiration support
TedHartMS Aug 19, 2021
58dd528
Merge remote-tracking branch 'origin/master' into rmw-expiration
TedHartMS Aug 20, 2021
0cb12ed
Merge branch 'master' into rmw-expiration
badrishc Aug 21, 2021
bfa0f4f
Merge branch 'master' into rmw-expiration
badrishc Aug 31, 2021
c152046
Update SpanByteFunctionsForServer.cs
badrishc Aug 31, 2021
4aa407a
Update SpanByteFunctionsForServer.cs
badrishc Aug 31, 2021
12f750d
Merge branch 'master' into rmw-expiration
badrishc Sep 2, 2021
58cb9cd
Merge remote-tracking branch 'origin/master' into rmw-expiration
TedHartMS Sep 10, 2021
ec4f87b
Merge remote-tracking branch 'origin/master' into rmw-expiration
TedHartMS Sep 13, 2021
4ae75e7
Update ServerKVFunctions.cs
badrishc Sep 14, 2021
a8fdde9
Merge branch 'master' into rmw-expiration
badrishc Sep 14, 2021
fba4b4b
Added SpanByteAdvancedFunctions
badrishc Sep 14, 2021
540f9b6
moving remote to IAdvFun
badrishc Sep 14, 2021
6cf1d91
LogDir semantics - minor update
badrishc Sep 15, 2021
36e4bf4
Merge branch 'master' into rmw-expiration
badrishc Sep 15, 2021
0e78fb8
Added PCU
badrishc Sep 18, 2021
faf21cf
fixed pcu logic
badrishc Sep 18, 2021
958d9bd
Fixing testcase so that RMW does not expect old address.
badrishc Sep 20, 2021
3e9b9d5
Fix PCU bugs
badrishc Sep 21, 2021
34952b5
Merge remote-tracking branch 'origin/master' into rmw-expiration
TedHartMS Sep 27, 2021
2119726
Merge remote-tracking branch 'origin/master' into functions-pcu
TedHartMS Sep 27, 2021
51d3f0f
merge v2
TedHartMS Oct 3, 2021
a2d3dfc
Extend PostCopyUpdate to IFunctions; fix a few comments in remote
TedHartMS Oct 3, 2021
3d0099a
merge v2 via rmw-expiration
TedHartMS Oct 3, 2021
43b682a
merge v2
TedHartMS Oct 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cs/benchmark/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
newValue.value = input.value + oldValue.value;
}

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

public bool SupportsLocking => locking;

public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext)
Expand Down
3 changes: 3 additions & 0 deletions cs/remote/samples/FixedLenServer/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
output.value = newValue;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true;

public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { }

public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) => true;
Expand Down
2 changes: 0 additions & 2 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ public Status Publish(Key key, Value desiredValue, Context userContext = default
/// SubscribeKV operation
/// </summary>
/// <param name="key">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
Expand All @@ -266,7 +265,6 @@ public void Subscribe(Key key, Context userContext = default, long serialNo = 0)
/// PSubscribe operation
/// </summary>
/// <param name="prefix">Key</param>
/// <param name="input">Input</param>
/// <param name="userContext">User context</param>
/// <param name="serialNo">Serial number</param>
/// <returns>Status of operation</returns>
Expand Down
12 changes: 12 additions & 0 deletions cs/remote/src/FASTER.common/HeaderReaderWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public unsafe bool Write(MessageType s, ref byte* dst, int length)
return true;
}

/// <summary>
/// Write serial number to memory
/// </summary>
/// <param name="seqNum">Message type</param>
/// <param name="dst">Destination memory</param>
/// <param name="length">Length of destination</param>
/// <returns>Whether write succeeded</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe bool Write(long seqNum, ref byte* dst, int length)
{
Expand All @@ -58,6 +65,11 @@ public unsafe MessageType ReadMessageType(ref byte* dst)
return (MessageType)(*dst++);
}

/// <summary>
/// Read serial number
/// </summary>
/// <param name="dst">Source memory</param>
/// <returns>Message type</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe long ReadSerialNum(ref byte* dst)
{
Expand Down
3 changes: 3 additions & 0 deletions cs/remote/src/FASTER.server/ServerKVFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address)
=> functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address);

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
=> functions.PostCopyUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address);

public void DeleteCompletionCallback(ref Key key, long ctx)
=> functions.DeleteCompletionCallback(ref key, ctx);

Expand Down
27 changes: 27 additions & 0 deletions cs/src/core/ClientSession/AdvancedClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,33 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address)
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output, ref recordInfo, address);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
=> !this.SupportsLocking
? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address)
: PostCopyUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
recordInfo.Version = _clientSession.ctx.version;
return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address);
}

private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
long context = 0;
this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context);
try
{
// KeyIndexes do not need notification of in-place updates because the key does not change.
return !recordInfo.Tombstone && PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address);
}
finally
{
this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context);
}
}
public void DeleteCompletionCallback(ref Key key, Context ctx)
=> _clientSession.functions.DeleteCompletionCallback(ref key, ctx);

Expand Down
34 changes: 32 additions & 2 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,11 +1112,41 @@ public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref
public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address)
=> _clientSession.functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
=> !this.SupportsLocking
? PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address)
: PostCopyUpdaterLock(ref key, ref input, ref output, ref value, ref recordInfo, address);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool PostCopyUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
recordInfo.Version = _clientSession.ctx.version;
return _clientSession.functions.PostCopyUpdater(ref key, ref input, ref value, ref output);
}

private bool PostCopyUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address)
{
long context = 0;
this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context);
try
{
// KeyIndexes do not need notification of in-place updates because the key does not change.
return !recordInfo.Tombstone && PostCopyUpdaterNoLock(ref key, ref input, ref output, ref value, ref recordInfo, address);
}
finally
{
this.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context);
}
}

public void DeleteCompletionCallback(ref Key key, Context ctx)
=> _clientSession.functions.DeleteCompletionCallback(ref key, ctx);

public int GetInitialLength(ref Input input)
=> _clientSession.variableLengthStruct.GetInitialLength(ref input);
public int GetInitialLength(ref Input input)
{
return _clientSession.variableLengthStruct.GetInitialLength(ref input);
}

public int GetLength(ref Value t, ref Input input)
=> _clientSession.variableLengthStruct.GetLength(ref t, ref input);
Expand Down
41 changes: 29 additions & 12 deletions cs/src/core/Index/FASTER/FASTERImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -973,15 +973,22 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
foundEntry.word = Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word);
if (foundEntry.word == entry.word)
{
pendingContext.logicalAddress = newLogicalAddress;
// If IU, return notfound. Else (CU), call PCU. If PCU is true, return success. Else retry op.
if (status != OperationStatus.SUCCESS ||
fasterSession.PostCopyUpdater(ref key, ref input,
ref hlog.GetValue(newPhysicalAddress),
ref output, ref hlog.GetInfo(physicalAddress), newLogicalAddress))
{
pendingContext.logicalAddress = newLogicalAddress;
return status;
}
}
else
{
// CAS failed
hlog.GetInfo(newPhysicalAddress).Invalid = true;
status = OperationStatus.RETRY_NOW;
}

status = OperationStatus.RETRY_NOW;
return status;
}

Expand Down Expand Up @@ -1552,16 +1559,26 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
updatedEntry.Tentative = false;

var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word, entry.word);

foundEntry.word = Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word);
if (foundEntry.word == entry.word)
return status;

// CAS failed. Fall through to call InternalRMW again to restart the sequence and return that status.
hlog.GetInfo(newPhysicalAddress).Invalid = true;
#endregion
{
// If IU, return notfound. Else (CU), call PCU. If PCU is true, return success. Else retry op.
if (status != OperationStatus.SUCCESS ||
fasterSession.PostCopyUpdater(ref key,
ref pendingContext.input.Get(),
ref hlog.GetValue(newPhysicalAddress),
ref pendingContext.output, ref hlog.GetInfo(newPhysicalAddress), newLogicalAddress))
{
pendingContext.logicalAddress = newLogicalAddress;
return status;
}
}
else
{
// CAS failed. Fall through to call InternalRMW again to restart the sequence and return that status.
hlog.GetInfo(newPhysicalAddress).Invalid = true;
}
#endregion
}

OperationStatus internalStatus;
Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Index/FASTER/FASTERLegacy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
_fasterKV._functions.CopyUpdater(ref key, ref input, ref oldValue, ref newValue, ref output);
}

public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
{
return true;
}

public void DeleteCompletionCallback(ref Key key, Context ctx)
{
_fasterKV._functions.DeleteCompletionCallback(ref key, ctx);
Expand Down
1 change: 1 addition & 0 deletions cs/src/core/Index/FASTER/LogCompactionFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoi
public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst) => true;

public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { }
public bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

public void DeleteCompletionCallback(ref Key key, Context ctx) { }

Expand Down
5 changes: 5 additions & 0 deletions cs/src/core/Index/Interfaces/FunctionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value
public virtual bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output) => true;
/// <inheritdoc/>
public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output) { }
/// <inheritdoc/>
public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

/// <inheritdoc/>
public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output) => true;

Expand Down Expand Up @@ -160,6 +163,8 @@ public virtual void InitialUpdater(ref Key key, ref Input input, ref Value value
/// <inheritdoc/>
public virtual void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address) { }
/// <inheritdoc/>
public virtual bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true;
/// <inheritdoc/>
public virtual bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address) => true;

/// <inheritdoc/>
Expand Down
12 changes: 12 additions & 0 deletions cs/src/core/Index/Interfaces/IAdvancedFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output
/// <param name="address">The logical address of the record being updated; used as a RecordId by indexing</param>
void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output, ref RecordInfo recordInfo, long address);

/// <summary>
/// Called after a copy-update for RMW is successfully installed
/// </summary>
/// <param name="key">The key for this record</param>
/// <param name="input">The user input to be used for computing the updated <paramref name="value"/></param>
/// <param name="value">The destination to be updated; because this is an in-place update, there is a previous value there.</param>
/// <param name="output">The location where the result of the <paramref name="input"/> operation on <paramref name="value"/> is to be copied</param>
/// <param name="recordInfo">A reference to the header of the record; may be used by <see cref="Lock(ref RecordInfo, ref Key, ref Value, LockType, ref long)"/></param>
/// <param name="address">The logical address of the record being updated; used as a RecordId by indexing</param>
/// <returns>True if the value was successful updated, else false (e.g. the value was expired)</returns>
bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address);

/// <summary>
/// In-place update for RMW
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/Index/Interfaces/IFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output
/// <param name="output">The location where <paramref name="newValue"/> is to be copied</param>
void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Value newValue, ref Output output);

/// <summary>
/// Called after a copy-update for RMW is successfully installed
/// </summary>
/// <param name="key">The key for this record</param>
/// <param name="input">The user input to be used for computing the updated <paramref name="value"/></param>
/// <param name="value">The destination to be updated; because this is an in-place update, there is a previous value there.</param>
/// <param name="output">The location where the result of the <paramref name="input"/> operation on <paramref name="value"/> is to be copied</param>
/// <returns>True if the value was successful updated, else false (e.g. the value was expired)</returns>
bool PostCopyUpdater(ref Key key, ref Input input, ref Value value, ref Output output);

/// <summary>
/// In-place update for RMW
/// </summary>
Expand Down
3 changes: 2 additions & 1 deletion cs/test/CompletePendingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ internal void Process(CompletedOutputIterator<KeyStruct, ValueStruct, InputStruc
{
ref var result = ref completedOutputs.Current;
VerifyStructs((int)result.Key.kfield1, ref result.Key, ref result.Input, ref result.Output, ref result.Context, useRMW);
Assert.AreEqual(keyAddressDict[(int)result.Key.kfield1], result.Address);
if (!useRMW)
Assert.AreEqual(keyAddressDict[(int)result.Key.kfield1], result.Address);
}
completedOutputs.Dispose();
Assert.AreEqual(deferredPending + 1, count);
Expand Down