Skip to content

Commit

Permalink
Pass client serial numbers to remote FASTER instance (#528)
Browse files Browse the repository at this point in the history
Co-authored-by: Tianyu Li <t-litianyu@microsoft.com>
  • Loading branch information
tli2 and Tianyu Li authored Jul 27, 2021
1 parent f29432b commit 9fb08a2
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 35 deletions.
66 changes: 35 additions & 31 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,15 @@ private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Inp
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, output, userContext));
return Status.PENDING;
}
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, output, userContext));
return Status.PENDING;
}
Flush();
}
}
Expand All @@ -564,14 +565,15 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref desiredValue, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
upsertQueue.Enqueue((key, desiredValue, userContext));
return Status.PENDING;
}
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref desiredValue, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
upsertQueue.Enqueue((key, desiredValue, userContext));
return Status.PENDING;
}
Flush();
}
}
Expand All @@ -584,14 +586,15 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, output, userContext));
return Status.PENDING;
}
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
if (serializer.Write(ref input, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
readrmwQueue.Enqueue((key, input, output, userContext));
return Status.PENDING;
}
Flush();
}
}
Expand All @@ -604,13 +607,14 @@ private unsafe Status InternalDelete(MessageType messageType, ref Key key, Conte
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
upsertQueue.Enqueue((key, default, userContext));
return Status.PENDING;
}
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
if (serializer.Write(ref key, ref curr, (int)(end - curr)))
{
numMessages++;
offset = curr;
upsertQueue.Enqueue((key, default, userContext));
return Status.PENDING;
}
Flush();
}
}
Expand Down
17 changes: 17 additions & 0 deletions cs/remote/src/FASTER.common/HeaderReaderWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public unsafe bool Write(MessageType s, ref byte* dst, int length)
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe bool Write(long seqNum, ref byte* dst, int length)
{
if (length < sizeof(long)) return false;
*(long*) dst = seqNum;
dst += sizeof(long);
return true;
}

/// <summary>
/// Read message type
/// </summary>
Expand All @@ -48,5 +57,13 @@ public unsafe MessageType ReadMessageType(ref byte* dst)
{
return (MessageType)(*dst++);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe long ReadSerialNum(ref byte* dst)
{
var result = *(long*) dst;
dst += sizeof(long);
return result;
}
}
}
9 changes: 5 additions & 4 deletions cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ private unsafe void ProcessBatch(byte[] buf, int offset)
for (msgnum = 0; msgnum < num; msgnum++)
{
var message = (MessageType)(*src++);
var serialNum = hrw.ReadSerialNum(ref src);
switch (message)
{
case MessageType.Upsert:
case MessageType.UpsertAsync:
if ((int)(dend - dcurr) < 2)
SendAndReset(ref d, ref dend);

status = session.Upsert(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadValueByRef(ref src));
status = session.Upsert(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadValueByRef(ref src), serialNo: serialNum);
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
break;
Expand All @@ -145,7 +146,7 @@ private unsafe void ProcessBatch(byte[] buf, int offset)

long ctx = ((long)message << 32) | (long)pendingSeqNo;
status = session.Read(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src),
ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, 0);
ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, serialNum);

hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
Expand All @@ -163,7 +164,7 @@ private unsafe void ProcessBatch(byte[] buf, int offset)

ctx = ((long)message << 32) | (long)pendingSeqNo;
status = session.RMW(ref serializer.ReadKeyByRef(ref src), ref serializer.ReadInputByRef(ref src),
ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx);
ref serializer.AsRefOutput(dcurr + 2, (int)(dend - dcurr)), ctx, serialNum);

hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
Expand All @@ -178,7 +179,7 @@ private unsafe void ProcessBatch(byte[] buf, int offset)
if ((int)(dend - dcurr) < 2)
SendAndReset(ref d, ref dend);

status = session.Delete(ref serializer.ReadKeyByRef(ref src));
status = session.Delete(ref serializer.ReadKeyByRef(ref src), serialNo: serialNum);
hrw.Write(message, ref dcurr, (int)(dend - dcurr));
Write(ref status, ref dcurr, (int)(dend - dcurr));
break;
Expand Down

0 comments on commit 9fb08a2

Please sign in to comment.