Skip to content

Commit

Permalink
Remove ReusableObject in FASTER remote (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored Oct 15, 2021
1 parent 28f6034 commit e11b6fb
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 114 deletions.
31 changes: 16 additions & 15 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public unsafe sealed partial class ClientSession<Key, Value, Input, Output, Cont
private bool subscriptionSession;

bool disposed;
ReusableObject<SeaaBuffer> sendObject;
SeaaBuffer sendObject;
byte* offset;
int numMessages;
int numPendingBatches;
Expand Down Expand Up @@ -75,7 +75,7 @@ public ClientSession(string address, int port, Functions functions, WireFormat w

numPendingBatches = 0;
sendObject = messageManager.GetReusableSeaaBuffer();
offset = sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size;
offset = sendObject.bufferPtr + sizeof(int) + BatchHeader.Size;
numMessages = 0;
sendSocket = GetSendSocket(address, port);
}
Expand Down Expand Up @@ -278,15 +278,15 @@ public void PSubscribe(Key prefix, Context userContext = default, long serialNo
/// </summary>
public void Flush()
{
if (offset > sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size)
if (offset > sendObject.bufferPtr + sizeof(int) + BatchHeader.Size)
{
int payloadSize = (int)(offset - sendObject.obj.bufferPtr);
int payloadSize = (int)(offset - sendObject.bufferPtr);

((BatchHeader*)(sendObject.obj.bufferPtr + sizeof(int)))->SetNumMessagesProtocol(numMessages, wireFormat);
((BatchHeader*)(sendObject.bufferPtr + sizeof(int)))->SetNumMessagesProtocol(numMessages, wireFormat);
Interlocked.Increment(ref numPendingBatches);

// Set packet size in header
*(int*)sendObject.obj.bufferPtr = -(payloadSize - sizeof(int));
*(int*)sendObject.bufferPtr = -(payloadSize - sizeof(int));

try
{
Expand All @@ -298,7 +298,7 @@ public void Flush()
throw;
}
sendObject = messageManager.GetReusableSeaaBuffer();
offset = sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size;
offset = sendObject.bufferPtr + sizeof(int) + BatchHeader.Size;
numMessages = 0;
}
}
Expand All @@ -322,7 +322,8 @@ public void CompletePending(bool wait = true)
public void Dispose()
{
disposed = true;
sendObject.Dispose();
if (sendObject != null)
messageManager.Return(sendObject);
sendSocket.Dispose();
messageManager.Dispose();
}
Expand Down Expand Up @@ -725,7 +726,7 @@ private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Inp

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -748,7 +749,7 @@ private unsafe Status InternalSubscribeKV(MessageType messageType, ref Key key,

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -771,7 +772,7 @@ private unsafe Status InternalPublish(MessageType messageType, ref Key key, ref

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -794,7 +795,7 @@ private unsafe Status InternalSubscribe(MessageType messageType, ref Key key, Co

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -816,7 +817,7 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -839,7 +840,7 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -862,7 +863,7 @@ private unsafe Status InternalDelete(MessageType messageType, ref Key key, Conte

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand Down
20 changes: 13 additions & 7 deletions cs/remote/src/FASTER.common/NetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void Dispose()
/// Get reusable SocketAsyncEventArgs buffer
/// </summary>
/// <returns></returns>
public ReusableObject<SeaaBuffer> GetReusableSeaaBuffer() => reusableSeaaBuffer.Checkout();
public SeaaBuffer GetReusableSeaaBuffer() => reusableSeaaBuffer.Checkout();

/// <summary>
/// Send
Expand All @@ -44,21 +44,27 @@ public void Dispose()
/// <param name="sendObject">Reusable SocketAsyncEventArgs buffer</param>
/// <param name="offset">Offset</param>
/// <param name="size">Size in bytes</param>
public unsafe void Send(Socket socket, ReusableObject<SeaaBuffer> sendObject, int offset, int size)
public unsafe void Send(Socket socket, SeaaBuffer sendObject, int offset, int size)
{
// Reset send buffer
sendObject.obj.socketEventAsyncArgs.SetBuffer(offset, size);
sendObject.socketEventAsyncArgs.SetBuffer(offset, size);
// Set user context to reusable object handle for disposal when send is done
sendObject.obj.socketEventAsyncArgs.UserToken = sendObject;
if (!socket.SendAsync(sendObject.obj.socketEventAsyncArgs))
SeaaBuffer_Completed(null, sendObject.obj.socketEventAsyncArgs);
sendObject.socketEventAsyncArgs.UserToken = sendObject;
if (!socket.SendAsync(sendObject.socketEventAsyncArgs))
SeaaBuffer_Completed(null, sendObject.socketEventAsyncArgs);
}

private void SeaaBuffer_Completed(object sender, SocketAsyncEventArgs e)
{
((ReusableObject<SeaaBuffer>)e.UserToken).Dispose();
reusableSeaaBuffer.Return((SeaaBuffer)e.UserToken);
}

/// <summary>
/// Return to pool
/// </summary>
/// <param name="obj"></param>
public void Return(SeaaBuffer obj) => reusableSeaaBuffer.Return(obj);

/// <summary>
/// Receive
/// </summary>
Expand Down
39 changes: 0 additions & 39 deletions cs/remote/src/FASTER.common/ReusableObject.cs

This file was deleted.

26 changes: 14 additions & 12 deletions cs/remote/src/FASTER.common/SimpleObjectPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ internal class SimpleObjectPool<T> : IDisposable where T : class, IDisposable
private readonly Func<T> factory;
private readonly LightConcurrentStack<T> stack;
private int allocatedObjects;
private readonly int maxObjects;

/// <summary>
/// Constructor
Expand All @@ -27,8 +26,7 @@ internal class SimpleObjectPool<T> : IDisposable where T : class, IDisposable
public SimpleObjectPool(Func<T> factory, int maxObjects = 128)
{
this.factory = factory;
this.maxObjects = maxObjects;
stack = new LightConcurrentStack<T>();
stack = new LightConcurrentStack<T>(maxObjects);
allocatedObjects = 0;
}

Expand All @@ -46,19 +44,23 @@ public void Dispose()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ReusableObject<T> Checkout()
public T Checkout()
{
if (!stack.TryPop(out var obj))
{
if (allocatedObjects < maxObjects)
{
Interlocked.Increment(ref allocatedObjects);
return new ReusableObject<T>(factory(), stack);
}
// Overflow objects are simply discarded after use
return new ReusableObject<T>(factory(), null);
Interlocked.Increment(ref allocatedObjects);
return factory();
}
return obj;
}

public void Return(T obj)
{
if (!stack.TryPush(obj))
{
obj.Dispose();
Interlocked.Decrement(ref allocatedObjects);
}
return new ReusableObject<T>(obj, stack);
}
}
}
32 changes: 17 additions & 15 deletions cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public override int TryConsumeMessages(byte[] buf)

public override void CompleteRead(ref Output output, long ctx, Status status)
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
byte* d = responseObject.bufferPtr;
var dend = d + responseObject.buffer.Length;

if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize)
SendAndReset(ref d, ref dend);
Expand All @@ -74,8 +74,8 @@ public override void CompleteRead(ref Output output, long ctx, Status status)

public override void CompleteRMW(ref Output output, long ctx, Status status)
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
byte* d = responseObject.bufferPtr;
var dend = d + responseObject.buffer.Length;

if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize)
SendAndReset(ref d, ref dend);
Expand Down Expand Up @@ -115,8 +115,8 @@ private unsafe void ProcessBatch(byte[] buf, int offset)

fixed (byte* b = &buf[offset])
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
byte* d = responseObject.bufferPtr;
var dend = d + responseObject.buffer.Length;
dcurr = d + sizeof(int); // reserve space for size
int origPendingSeqNo = pendingSeqNo;

Expand Down Expand Up @@ -216,7 +216,10 @@ private unsafe void ProcessBatch(byte[] buf, int offset)
if (msgnum - start > 0)
Send(d);
else
responseObject.Dispose();
{
messageManager.Return(responseObject);
responseObject = null;
}
}
}

Expand Down Expand Up @@ -249,8 +252,8 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r

ref Key key = ref serializer.ReadKeyByRef(ref keyPtr);

byte* d = respObj.obj.bufferPtr;
var dend = d + respObj.obj.buffer.Length;
byte* d = respObj.bufferPtr;
var dend = d + respObj.buffer.Length;
var dcurr = d + sizeof(int); // reserve space for size
byte* outputDcurr;

Expand Down Expand Up @@ -294,14 +297,14 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
Unsafe.AsRef<BatchHeader>(dstart).SeqNo = 0;
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)respObj.obj.bufferPtr = -(payloadSize - sizeof(int));
*(int*)respObj.bufferPtr = -(payloadSize - sizeof(int));
try
{
messageManager.Send(socket, respObj, 0, payloadSize);
}
catch
{
respObj.Dispose();
messageManager.Return(respObj);
}
}

Expand All @@ -327,8 +330,8 @@ private void SendAndReset(ref byte* d, ref byte* dend)
{
Send(d);
GetResponseObject();
d = responseObject.obj.bufferPtr;
dend = d + responseObject.obj.buffer.Length;
d = responseObject.bufferPtr;
dend = d + responseObject.buffer.Length;
dcurr = d + sizeof(int);
start = msgnum;
}
Expand All @@ -341,9 +344,8 @@ private void Send(byte* d)
Unsafe.AsRef<BatchHeader>(dstart).SeqNo = seqNo++;
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)responseObject.obj.bufferPtr = -(payloadSize - sizeof(int));
*(int*)responseObject.bufferPtr = -(payloadSize - sizeof(int));
SendResponse(payloadSize);
responseObject.obj = null;
}

private bool HandlePubSub(MessageType message, ref byte* src, ref byte* d, ref byte* dend)
Expand Down
Loading

0 comments on commit e11b6fb

Please sign in to comment.