Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Remove ReusableObject #573

Merged
merged 2 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public unsafe sealed partial class ClientSession<Key, Value, Input, Output, Cont
private bool subscriptionSession;

bool disposed;
ReusableObject<SeaaBuffer> sendObject;
SeaaBuffer sendObject;
byte* offset;
int numMessages;
int numPendingBatches;
Expand Down Expand Up @@ -75,7 +75,7 @@ public ClientSession(string address, int port, Functions functions, WireFormat w

numPendingBatches = 0;
sendObject = messageManager.GetReusableSeaaBuffer();
offset = sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size;
offset = sendObject.bufferPtr + sizeof(int) + BatchHeader.Size;
numMessages = 0;
sendSocket = GetSendSocket(address, port);
}
Expand Down Expand Up @@ -278,15 +278,15 @@ public void PSubscribe(Key prefix, Context userContext = default, long serialNo
/// </summary>
public void Flush()
{
if (offset > sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size)
if (offset > sendObject.bufferPtr + sizeof(int) + BatchHeader.Size)
{
int payloadSize = (int)(offset - sendObject.obj.bufferPtr);
int payloadSize = (int)(offset - sendObject.bufferPtr);

((BatchHeader*)(sendObject.obj.bufferPtr + sizeof(int)))->SetNumMessagesProtocol(numMessages, wireFormat);
((BatchHeader*)(sendObject.bufferPtr + sizeof(int)))->SetNumMessagesProtocol(numMessages, wireFormat);
Interlocked.Increment(ref numPendingBatches);

// Set packet size in header
*(int*)sendObject.obj.bufferPtr = -(payloadSize - sizeof(int));
*(int*)sendObject.bufferPtr = -(payloadSize - sizeof(int));

try
{
Expand All @@ -298,7 +298,7 @@ public void Flush()
throw;
}
sendObject = messageManager.GetReusableSeaaBuffer();
offset = sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size;
offset = sendObject.bufferPtr + sizeof(int) + BatchHeader.Size;
numMessages = 0;
}
}
Expand All @@ -322,7 +322,8 @@ public void CompletePending(bool wait = true)
public void Dispose()
{
disposed = true;
sendObject.Dispose();
if (sendObject != null)
messageManager.Return(sendObject);
sendSocket.Dispose();
messageManager.Dispose();
}
Expand Down Expand Up @@ -725,7 +726,7 @@ private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Inp

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -748,7 +749,7 @@ private unsafe Status InternalSubscribeKV(MessageType messageType, ref Key key,

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -771,7 +772,7 @@ private unsafe Status InternalPublish(MessageType messageType, ref Key key, ref

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -794,7 +795,7 @@ private unsafe Status InternalSubscribe(MessageType messageType, ref Key key, Co

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -816,7 +817,7 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -839,7 +840,7 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand All @@ -862,7 +863,7 @@ private unsafe Status InternalDelete(MessageType messageType, ref Key key, Conte

while (true)
{
byte* end = sendObject.obj.bufferPtr + bufferSize;
byte* end = sendObject.bufferPtr + bufferSize;
byte* curr = offset;
if (hrw.Write(messageType, ref curr, (int)(end - curr)))
if (hrw.Write(serialNo, ref curr, (int)(end - curr)))
Expand Down
20 changes: 13 additions & 7 deletions cs/remote/src/FASTER.common/NetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void Dispose()
/// Get reusable SocketAsyncEventArgs buffer
/// </summary>
/// <returns></returns>
public ReusableObject<SeaaBuffer> GetReusableSeaaBuffer() => reusableSeaaBuffer.Checkout();
public SeaaBuffer GetReusableSeaaBuffer() => reusableSeaaBuffer.Checkout();

/// <summary>
/// Send
Expand All @@ -44,21 +44,27 @@ public void Dispose()
/// <param name="sendObject">Reusable SocketAsyncEventArgs buffer</param>
/// <param name="offset">Offset</param>
/// <param name="size">Size in bytes</param>
public unsafe void Send(Socket socket, ReusableObject<SeaaBuffer> sendObject, int offset, int size)
public unsafe void Send(Socket socket, SeaaBuffer sendObject, int offset, int size)
{
// Reset send buffer
sendObject.obj.socketEventAsyncArgs.SetBuffer(offset, size);
sendObject.socketEventAsyncArgs.SetBuffer(offset, size);
// Set user context to reusable object handle for disposal when send is done
sendObject.obj.socketEventAsyncArgs.UserToken = sendObject;
if (!socket.SendAsync(sendObject.obj.socketEventAsyncArgs))
SeaaBuffer_Completed(null, sendObject.obj.socketEventAsyncArgs);
sendObject.socketEventAsyncArgs.UserToken = sendObject;
if (!socket.SendAsync(sendObject.socketEventAsyncArgs))
SeaaBuffer_Completed(null, sendObject.socketEventAsyncArgs);
}

private void SeaaBuffer_Completed(object sender, SocketAsyncEventArgs e)
{
((ReusableObject<SeaaBuffer>)e.UserToken).Dispose();
reusableSeaaBuffer.Return((SeaaBuffer)e.UserToken);
}

/// <summary>
/// Return to pool
/// </summary>
/// <param name="obj"></param>
public void Return(SeaaBuffer obj) => reusableSeaaBuffer.Return(obj);

/// <summary>
/// Receive
/// </summary>
Expand Down
39 changes: 0 additions & 39 deletions cs/remote/src/FASTER.common/ReusableObject.cs

This file was deleted.

26 changes: 14 additions & 12 deletions cs/remote/src/FASTER.common/SimpleObjectPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ internal class SimpleObjectPool<T> : IDisposable where T : class, IDisposable
private readonly Func<T> factory;
private readonly LightConcurrentStack<T> stack;
private int allocatedObjects;
private readonly int maxObjects;

/// <summary>
/// Constructor
Expand All @@ -27,8 +26,7 @@ internal class SimpleObjectPool<T> : IDisposable where T : class, IDisposable
public SimpleObjectPool(Func<T> factory, int maxObjects = 128)
{
this.factory = factory;
this.maxObjects = maxObjects;
stack = new LightConcurrentStack<T>();
stack = new LightConcurrentStack<T>(maxObjects);
allocatedObjects = 0;
}

Expand All @@ -46,19 +44,23 @@ public void Dispose()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ReusableObject<T> Checkout()
public T Checkout()
{
if (!stack.TryPop(out var obj))
{
if (allocatedObjects < maxObjects)
{
Interlocked.Increment(ref allocatedObjects);
return new ReusableObject<T>(factory(), stack);
}
// Overflow objects are simply discarded after use
return new ReusableObject<T>(factory(), null);
Interlocked.Increment(ref allocatedObjects);
return factory();
}
return obj;
}

public void Return(T obj)
{
if (!stack.TryPush(obj))
{
obj.Dispose();
Interlocked.Decrement(ref allocatedObjects);
}
return new ReusableObject<T>(obj, stack);
}
}
}
32 changes: 17 additions & 15 deletions cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public override int TryConsumeMessages(byte[] buf)

public override void CompleteRead(ref Output output, long ctx, Status status)
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
byte* d = responseObject.bufferPtr;
var dend = d + responseObject.buffer.Length;

if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize)
SendAndReset(ref d, ref dend);
Expand All @@ -74,8 +74,8 @@ public override void CompleteRead(ref Output output, long ctx, Status status)

public override void CompleteRMW(ref Output output, long ctx, Status status)
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
byte* d = responseObject.bufferPtr;
var dend = d + responseObject.buffer.Length;

if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize)
SendAndReset(ref d, ref dend);
Expand Down Expand Up @@ -115,8 +115,8 @@ private unsafe void ProcessBatch(byte[] buf, int offset)

fixed (byte* b = &buf[offset])
{
byte* d = responseObject.obj.bufferPtr;
var dend = d + responseObject.obj.buffer.Length;
byte* d = responseObject.bufferPtr;
var dend = d + responseObject.buffer.Length;
dcurr = d + sizeof(int); // reserve space for size
int origPendingSeqNo = pendingSeqNo;

Expand Down Expand Up @@ -216,7 +216,10 @@ private unsafe void ProcessBatch(byte[] buf, int offset)
if (msgnum - start > 0)
Send(d);
else
responseObject.Dispose();
{
messageManager.Return(responseObject);
responseObject = null;
}
}
}

Expand Down Expand Up @@ -249,8 +252,8 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r

ref Key key = ref serializer.ReadKeyByRef(ref keyPtr);

byte* d = respObj.obj.bufferPtr;
var dend = d + respObj.obj.buffer.Length;
byte* d = respObj.bufferPtr;
var dend = d + respObj.buffer.Length;
var dcurr = d + sizeof(int); // reserve space for size
byte* outputDcurr;

Expand Down Expand Up @@ -294,14 +297,14 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
Unsafe.AsRef<BatchHeader>(dstart).SeqNo = 0;
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)respObj.obj.bufferPtr = -(payloadSize - sizeof(int));
*(int*)respObj.bufferPtr = -(payloadSize - sizeof(int));
try
{
messageManager.Send(socket, respObj, 0, payloadSize);
}
catch
{
respObj.Dispose();
messageManager.Return(respObj);
}
}

Expand All @@ -327,8 +330,8 @@ private void SendAndReset(ref byte* d, ref byte* dend)
{
Send(d);
GetResponseObject();
d = responseObject.obj.bufferPtr;
dend = d + responseObject.obj.buffer.Length;
d = responseObject.bufferPtr;
dend = d + responseObject.buffer.Length;
dcurr = d + sizeof(int);
start = msgnum;
}
Expand All @@ -341,9 +344,8 @@ private void Send(byte* d)
Unsafe.AsRef<BatchHeader>(dstart).SeqNo = seqNo++;
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)responseObject.obj.bufferPtr = -(payloadSize - sizeof(int));
*(int*)responseObject.bufferPtr = -(payloadSize - sizeof(int));
SendResponse(payloadSize);
responseObject.obj = null;
}

private bool HandlePubSub(MessageType message, ref byte* src, ref byte* d, ref byte* dend)
Expand Down
Loading