From b9f4fbefea09b4578afa819f62c94c606183931d Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 15 Oct 2021 12:10:04 -0700 Subject: [PATCH 1/3] Make provider extensible. (#575) --- cs/remote/samples/FixedLenClient/Program.cs | 1 + .../FixedLenServer/FixedLenServer.csproj | 1 + cs/remote/samples/FixedLenServer/Program.cs | 1 + cs/remote/samples/VarLenClient/Program.cs | 1 + .../Providers/SpanByteFasterKVProvider.cs | 39 ++++++++++++++----- .../src/FASTER.server/Servers/VarLenServer.cs | 2 +- 6 files changed, 34 insertions(+), 11 deletions(-) diff --git a/cs/remote/samples/FixedLenClient/Program.cs b/cs/remote/samples/FixedLenClient/Program.cs index b91cfa7d2..c24835e56 100644 --- a/cs/remote/samples/FixedLenClient/Program.cs +++ b/cs/remote/samples/FixedLenClient/Program.cs @@ -17,6 +17,7 @@ class Program { static void Main(string[] args) { + Environment.SetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS", "1"); string ip = "127.0.0.1"; int port = 3278; diff --git a/cs/remote/samples/FixedLenServer/FixedLenServer.csproj b/cs/remote/samples/FixedLenServer/FixedLenServer.csproj index c205d6c8e..2a2976167 100644 --- a/cs/remote/samples/FixedLenServer/FixedLenServer.csproj +++ b/cs/remote/samples/FixedLenServer/FixedLenServer.csproj @@ -6,6 +6,7 @@ true AnyCPU;x64 latest + true diff --git a/cs/remote/samples/FixedLenServer/Program.cs b/cs/remote/samples/FixedLenServer/Program.cs index fc00c4b82..f28ee1aa6 100644 --- a/cs/remote/samples/FixedLenServer/Program.cs +++ b/cs/remote/samples/FixedLenServer/Program.cs @@ -18,6 +18,7 @@ class Program { static void Main(string[] args) { + Environment.SetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS", "1"); Trace.Listeners.Add(new ConsoleTraceListener()); Console.WriteLine("FASTER fixed-length (binary) KV server"); diff --git a/cs/remote/samples/VarLenClient/Program.cs b/cs/remote/samples/VarLenClient/Program.cs index aad8cee54..a48406f02 100644 --- a/cs/remote/samples/VarLenClient/Program.cs +++ b/cs/remote/samples/VarLenClient/Program.cs @@ -13,6 +13,7 @@ class Program { static void Main(string[] args) { + Environment.SetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS", "1"); string ip = "127.0.0.1"; int port = 3278; diff --git a/cs/remote/src/FASTER.server/Providers/SpanByteFasterKVProvider.cs b/cs/remote/src/FASTER.server/Providers/SpanByteFasterKVProvider.cs index 482635371..89cc5ca9e 100644 --- a/cs/remote/src/FASTER.server/Providers/SpanByteFasterKVProvider.cs +++ b/cs/remote/src/FASTER.server/Providers/SpanByteFasterKVProvider.cs @@ -8,13 +8,32 @@ namespace FASTER.server /// Session provider for FasterKV store based on /// [K, V, I, O, C] = [SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long] /// - public sealed class SpanByteFasterKVProvider : ISessionProvider + public class SpanByteFasterKVProvider : ISessionProvider { - readonly FasterKV store; - readonly SpanByteServerSerializer serializer; - readonly SubscribeKVBroker> kvBroker; - readonly SubscribeBroker> broker; - readonly MaxSizeSettings maxSizeSettings; + /// + /// Store + /// + protected readonly FasterKV store; + + /// + /// Serializer + /// + protected readonly SpanByteServerSerializer serializer; + + /// + /// KV broker + /// + protected readonly SubscribeKVBroker> kvBroker; + + /// + /// Broker + /// + protected readonly SubscribeBroker> broker; + + /// + /// Size settings + /// + protected readonly MaxSizeSettings maxSizeSettings; /// /// Create SpanByte FasterKV backend @@ -22,12 +41,12 @@ public sealed class SpanByteFasterKVProvider : ISessionProvider /// /// /// - /// + /// /// - public SpanByteFasterKVProvider(FasterKV store, SubscribeKVBroker> kvBroker = null, SubscribeBroker> broker = null, ServerOptions serverOptions = null, MaxSizeSettings maxSizeSettings = default) + public SpanByteFasterKVProvider(FasterKV store, SubscribeKVBroker> kvBroker = null, SubscribeBroker> broker = null, bool recoverStore = false, MaxSizeSettings maxSizeSettings = default) { this.store = store; - if ((serverOptions ?? new ServerOptions()).Recover) + if (recoverStore) { try { @@ -43,7 +62,7 @@ public SpanByteFasterKVProvider(FasterKV store, SubscribeKVB } /// - public IServerSession GetSession(WireFormat wireFormat, Socket socket) + public virtual IServerSession GetSession(WireFormat wireFormat, Socket socket) { switch (wireFormat) { diff --git a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs index 60c06a731..78855aa28 100644 --- a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs +++ b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs @@ -45,7 +45,7 @@ public VarLenServer(ServerOptions opts) } // Create session provider for VarLen - provider = new SpanByteFasterKVProvider(store, kvBroker, broker, opts); + provider = new SpanByteFasterKVProvider(store, kvBroker, broker, opts.Recover); server = new FasterServer(opts.Address, opts.Port); server.Register(WireFormat.DefaultVarLenKV, provider); From 28f60340c0648ab14756a3e98377caeacfac48c3 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 15 Oct 2021 12:48:30 -0700 Subject: [PATCH 2/3] Update VarLenServer.cs --- cs/remote/src/FASTER.server/Servers/VarLenServer.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs index 78855aa28..6ece966a4 100644 --- a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs +++ b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs @@ -49,6 +49,7 @@ public VarLenServer(ServerOptions opts) server = new FasterServer(opts.Address, opts.Port); server.Register(WireFormat.DefaultVarLenKV, provider); + server.Register(WireFormat.WebSocket, provider); } /// From e11b6fbb7c3d3f7f7bcbe21060a4e4742990c09f Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 15 Oct 2021 14:26:01 -0700 Subject: [PATCH 3/3] Remove ReusableObject in FASTER remote (#573) --- cs/remote/src/FASTER.client/ClientSession.cs | 31 ++++++++------- cs/remote/src/FASTER.common/NetworkSender.cs | 20 ++++++---- cs/remote/src/FASTER.common/ReusableObject.cs | 39 ------------------- .../src/FASTER.common/SimpleObjectPool.cs | 26 +++++++------ .../src/FASTER.server/BinaryServerSession.cs | 32 ++++++++------- .../src/FASTER.server/ServerSessionBase.cs | 26 ++++++++----- .../FASTER.server/WebsocketServerSession.cs | 35 +++++++++-------- 7 files changed, 95 insertions(+), 114 deletions(-) delete mode 100644 cs/remote/src/FASTER.common/ReusableObject.cs diff --git a/cs/remote/src/FASTER.client/ClientSession.cs b/cs/remote/src/FASTER.client/ClientSession.cs index c6519f4b8..bcca380d5 100644 --- a/cs/remote/src/FASTER.client/ClientSession.cs +++ b/cs/remote/src/FASTER.client/ClientSession.cs @@ -38,7 +38,7 @@ public unsafe sealed partial class ClientSession sendObject; + SeaaBuffer sendObject; byte* offset; int numMessages; int numPendingBatches; @@ -75,7 +75,7 @@ public ClientSession(string address, int port, Functions functions, WireFormat w numPendingBatches = 0; sendObject = messageManager.GetReusableSeaaBuffer(); - offset = sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size; + offset = sendObject.bufferPtr + sizeof(int) + BatchHeader.Size; numMessages = 0; sendSocket = GetSendSocket(address, port); } @@ -278,15 +278,15 @@ public void PSubscribe(Key prefix, Context userContext = default, long serialNo /// public void Flush() { - if (offset > sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size) + if (offset > sendObject.bufferPtr + sizeof(int) + BatchHeader.Size) { - int payloadSize = (int)(offset - sendObject.obj.bufferPtr); + int payloadSize = (int)(offset - sendObject.bufferPtr); - ((BatchHeader*)(sendObject.obj.bufferPtr + sizeof(int)))->SetNumMessagesProtocol(numMessages, wireFormat); + ((BatchHeader*)(sendObject.bufferPtr + sizeof(int)))->SetNumMessagesProtocol(numMessages, wireFormat); Interlocked.Increment(ref numPendingBatches); // Set packet size in header - *(int*)sendObject.obj.bufferPtr = -(payloadSize - sizeof(int)); + *(int*)sendObject.bufferPtr = -(payloadSize - sizeof(int)); try { @@ -298,7 +298,7 @@ public void Flush() throw; } sendObject = messageManager.GetReusableSeaaBuffer(); - offset = sendObject.obj.bufferPtr + sizeof(int) + BatchHeader.Size; + offset = sendObject.bufferPtr + sizeof(int) + BatchHeader.Size; numMessages = 0; } } @@ -322,7 +322,8 @@ public void CompletePending(bool wait = true) public void Dispose() { disposed = true; - sendObject.Dispose(); + if (sendObject != null) + messageManager.Return(sendObject); sendSocket.Dispose(); messageManager.Dispose(); } @@ -725,7 +726,7 @@ private unsafe Status InternalRead(MessageType messageType, ref Key key, ref Inp while (true) { - byte* end = sendObject.obj.bufferPtr + bufferSize; + byte* end = sendObject.bufferPtr + bufferSize; byte* curr = offset; if (hrw.Write(messageType, ref curr, (int)(end - curr))) if (hrw.Write(serialNo, ref curr, (int)(end - curr))) @@ -748,7 +749,7 @@ private unsafe Status InternalSubscribeKV(MessageType messageType, ref Key key, while (true) { - byte* end = sendObject.obj.bufferPtr + bufferSize; + byte* end = sendObject.bufferPtr + bufferSize; byte* curr = offset; if (hrw.Write(messageType, ref curr, (int)(end - curr))) if (hrw.Write(serialNo, ref curr, (int)(end - curr))) @@ -771,7 +772,7 @@ private unsafe Status InternalPublish(MessageType messageType, ref Key key, ref while (true) { - byte* end = sendObject.obj.bufferPtr + bufferSize; + byte* end = sendObject.bufferPtr + bufferSize; byte* curr = offset; if (hrw.Write(messageType, ref curr, (int)(end - curr))) if (hrw.Write(serialNo, ref curr, (int)(end - curr))) @@ -794,7 +795,7 @@ private unsafe Status InternalSubscribe(MessageType messageType, ref Key key, Co while (true) { - byte* end = sendObject.obj.bufferPtr + bufferSize; + byte* end = sendObject.bufferPtr + bufferSize; byte* curr = offset; if (hrw.Write(messageType, ref curr, (int)(end - curr))) if (hrw.Write(serialNo, ref curr, (int)(end - curr))) @@ -816,7 +817,7 @@ private unsafe Status InternalUpsert(MessageType messageType, ref Key key, ref V while (true) { - byte* end = sendObject.obj.bufferPtr + bufferSize; + byte* end = sendObject.bufferPtr + bufferSize; byte* curr = offset; if (hrw.Write(messageType, ref curr, (int)(end - curr))) if (hrw.Write(serialNo, ref curr, (int)(end - curr))) @@ -839,7 +840,7 @@ private unsafe Status InternalRMW(MessageType messageType, ref Key key, ref Inpu while (true) { - byte* end = sendObject.obj.bufferPtr + bufferSize; + byte* end = sendObject.bufferPtr + bufferSize; byte* curr = offset; if (hrw.Write(messageType, ref curr, (int)(end - curr))) if (hrw.Write(serialNo, ref curr, (int)(end - curr))) @@ -862,7 +863,7 @@ private unsafe Status InternalDelete(MessageType messageType, ref Key key, Conte while (true) { - byte* end = sendObject.obj.bufferPtr + bufferSize; + byte* end = sendObject.bufferPtr + bufferSize; byte* curr = offset; if (hrw.Write(messageType, ref curr, (int)(end - curr))) if (hrw.Write(serialNo, ref curr, (int)(end - curr))) diff --git a/cs/remote/src/FASTER.common/NetworkSender.cs b/cs/remote/src/FASTER.common/NetworkSender.cs index 801e7b41d..997ad324b 100644 --- a/cs/remote/src/FASTER.common/NetworkSender.cs +++ b/cs/remote/src/FASTER.common/NetworkSender.cs @@ -35,7 +35,7 @@ public void Dispose() /// Get reusable SocketAsyncEventArgs buffer /// /// - public ReusableObject GetReusableSeaaBuffer() => reusableSeaaBuffer.Checkout(); + public SeaaBuffer GetReusableSeaaBuffer() => reusableSeaaBuffer.Checkout(); /// /// Send @@ -44,21 +44,27 @@ public void Dispose() /// Reusable SocketAsyncEventArgs buffer /// Offset /// Size in bytes - public unsafe void Send(Socket socket, ReusableObject sendObject, int offset, int size) + public unsafe void Send(Socket socket, SeaaBuffer sendObject, int offset, int size) { // Reset send buffer - sendObject.obj.socketEventAsyncArgs.SetBuffer(offset, size); + sendObject.socketEventAsyncArgs.SetBuffer(offset, size); // Set user context to reusable object handle for disposal when send is done - sendObject.obj.socketEventAsyncArgs.UserToken = sendObject; - if (!socket.SendAsync(sendObject.obj.socketEventAsyncArgs)) - SeaaBuffer_Completed(null, sendObject.obj.socketEventAsyncArgs); + sendObject.socketEventAsyncArgs.UserToken = sendObject; + if (!socket.SendAsync(sendObject.socketEventAsyncArgs)) + SeaaBuffer_Completed(null, sendObject.socketEventAsyncArgs); } private void SeaaBuffer_Completed(object sender, SocketAsyncEventArgs e) { - ((ReusableObject)e.UserToken).Dispose(); + reusableSeaaBuffer.Return((SeaaBuffer)e.UserToken); } + /// + /// Return to pool + /// + /// + public void Return(SeaaBuffer obj) => reusableSeaaBuffer.Return(obj); + /// /// Receive /// diff --git a/cs/remote/src/FASTER.common/ReusableObject.cs b/cs/remote/src/FASTER.common/ReusableObject.cs deleted file mode 100644 index 204e8afb0..000000000 --- a/cs/remote/src/FASTER.common/ReusableObject.cs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT license. - -using System; -using System.Collections.Generic; - -namespace FASTER.common -{ - /// - /// Reusable object - /// - /// - public struct ReusableObject : IDisposable where T : class, IDisposable - { - private readonly LightConcurrentStack pool; - - /// - /// Object - /// - public T obj; - - internal ReusableObject(T obj, LightConcurrentStack pool) - { - this.pool = pool; - this.obj = obj; - } - - /// - /// Dispose instance - /// - public void Dispose() - { - if (pool != null) - pool.TryPush(obj); - else - obj.Dispose(); - } - } -} \ No newline at end of file diff --git a/cs/remote/src/FASTER.common/SimpleObjectPool.cs b/cs/remote/src/FASTER.common/SimpleObjectPool.cs index f2751fa17..a278e4d48 100644 --- a/cs/remote/src/FASTER.common/SimpleObjectPool.cs +++ b/cs/remote/src/FASTER.common/SimpleObjectPool.cs @@ -17,7 +17,6 @@ internal class SimpleObjectPool : IDisposable where T : class, IDisposable private readonly Func factory; private readonly LightConcurrentStack stack; private int allocatedObjects; - private readonly int maxObjects; /// /// Constructor @@ -27,8 +26,7 @@ internal class SimpleObjectPool : IDisposable where T : class, IDisposable public SimpleObjectPool(Func factory, int maxObjects = 128) { this.factory = factory; - this.maxObjects = maxObjects; - stack = new LightConcurrentStack(); + stack = new LightConcurrentStack(maxObjects); allocatedObjects = 0; } @@ -46,19 +44,23 @@ public void Dispose() } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public ReusableObject Checkout() + public T Checkout() { if (!stack.TryPop(out var obj)) { - if (allocatedObjects < maxObjects) - { - Interlocked.Increment(ref allocatedObjects); - return new ReusableObject(factory(), stack); - } - // Overflow objects are simply discarded after use - return new ReusableObject(factory(), null); + Interlocked.Increment(ref allocatedObjects); + return factory(); + } + return obj; + } + + public void Return(T obj) + { + if (!stack.TryPush(obj)) + { + obj.Dispose(); + Interlocked.Decrement(ref allocatedObjects); } - return new ReusableObject(obj, stack); } } } \ No newline at end of file diff --git a/cs/remote/src/FASTER.server/BinaryServerSession.cs b/cs/remote/src/FASTER.server/BinaryServerSession.cs index 0159b9fa2..265a08bbb 100644 --- a/cs/remote/src/FASTER.server/BinaryServerSession.cs +++ b/cs/remote/src/FASTER.server/BinaryServerSession.cs @@ -57,8 +57,8 @@ public override int TryConsumeMessages(byte[] buf) public override void CompleteRead(ref Output output, long ctx, Status status) { - byte* d = responseObject.obj.bufferPtr; - var dend = d + responseObject.obj.buffer.Length; + byte* d = responseObject.bufferPtr; + var dend = d + responseObject.buffer.Length; if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize) SendAndReset(ref d, ref dend); @@ -74,8 +74,8 @@ public override void CompleteRead(ref Output output, long ctx, Status status) public override void CompleteRMW(ref Output output, long ctx, Status status) { - byte* d = responseObject.obj.bufferPtr; - var dend = d + responseObject.obj.buffer.Length; + byte* d = responseObject.bufferPtr; + var dend = d + responseObject.buffer.Length; if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize) SendAndReset(ref d, ref dend); @@ -115,8 +115,8 @@ private unsafe void ProcessBatch(byte[] buf, int offset) fixed (byte* b = &buf[offset]) { - byte* d = responseObject.obj.bufferPtr; - var dend = d + responseObject.obj.buffer.Length; + byte* d = responseObject.bufferPtr; + var dend = d + responseObject.buffer.Length; dcurr = d + sizeof(int); // reserve space for size int origPendingSeqNo = pendingSeqNo; @@ -216,7 +216,10 @@ private unsafe void ProcessBatch(byte[] buf, int offset) if (msgnum - start > 0) Send(d); else - responseObject.Dispose(); + { + messageManager.Return(responseObject); + responseObject = null; + } } } @@ -249,8 +252,8 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r ref Key key = ref serializer.ReadKeyByRef(ref keyPtr); - byte* d = respObj.obj.bufferPtr; - var dend = d + respObj.obj.buffer.Length; + byte* d = respObj.bufferPtr; + var dend = d + respObj.buffer.Length; var dcurr = d + sizeof(int); // reserve space for size byte* outputDcurr; @@ -294,14 +297,14 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r Unsafe.AsRef(dstart).SeqNo = 0; int payloadSize = (int)(dcurr - d); // Set packet size in header - *(int*)respObj.obj.bufferPtr = -(payloadSize - sizeof(int)); + *(int*)respObj.bufferPtr = -(payloadSize - sizeof(int)); try { messageManager.Send(socket, respObj, 0, payloadSize); } catch { - respObj.Dispose(); + messageManager.Return(respObj); } } @@ -327,8 +330,8 @@ private void SendAndReset(ref byte* d, ref byte* dend) { Send(d); GetResponseObject(); - d = responseObject.obj.bufferPtr; - dend = d + responseObject.obj.buffer.Length; + d = responseObject.bufferPtr; + dend = d + responseObject.buffer.Length; dcurr = d + sizeof(int); start = msgnum; } @@ -341,9 +344,8 @@ private void Send(byte* d) Unsafe.AsRef(dstart).SeqNo = seqNo++; int payloadSize = (int)(dcurr - d); // Set packet size in header - *(int*)responseObject.obj.bufferPtr = -(payloadSize - sizeof(int)); + *(int*)responseObject.bufferPtr = -(payloadSize - sizeof(int)); SendResponse(payloadSize); - responseObject.obj = null; } private bool HandlePubSub(MessageType message, ref byte* src, ref byte* d, ref byte* dend) diff --git a/cs/remote/src/FASTER.server/ServerSessionBase.cs b/cs/remote/src/FASTER.server/ServerSessionBase.cs index b437ddead..e140b925b 100644 --- a/cs/remote/src/FASTER.server/ServerSessionBase.cs +++ b/cs/remote/src/FASTER.server/ServerSessionBase.cs @@ -25,7 +25,7 @@ public abstract class ServerSessionBase : IServerSession /// /// Response object /// - protected ReusableObject responseObject; + protected SeaaBuffer responseObject; /// /// Bytes read @@ -63,7 +63,7 @@ public ServerSessionBase(Socket socket, MaxSizeSettings maxSizeSettings) /// /// Get response object /// - protected void GetResponseObject() { if (responseObject.obj == null) responseObject = messageManager.GetReusableSeaaBuffer(); } + protected void GetResponseObject() { if (responseObject == null) responseObject = messageManager.GetReusableSeaaBuffer(); } /// /// Send response @@ -71,13 +71,15 @@ public ServerSessionBase(Socket socket, MaxSizeSettings maxSizeSettings) /// protected void SendResponse(int size) { + var _r = responseObject; + responseObject = null; try { - messageManager.Send(socket, responseObject, 0, size); + messageManager.Send(socket, _r, 0, size); } catch { - responseObject.Dispose(); + messageManager.Return(_r); } } @@ -88,13 +90,15 @@ protected void SendResponse(int size) /// protected void SendResponse(int offset, int size) { + var _r = responseObject; + responseObject = null; try { - messageManager.Send(socket, responseObject, offset, size); + messageManager.Send(socket, _r, offset, size); } catch { - responseObject.Dispose(); + messageManager.Return(_r); } } @@ -128,8 +132,9 @@ protected void SendResponse(int offset, int size) public virtual void Dispose() { socket.Dispose(); - if (responseObject.obj != null) - responseObject.Dispose(); + var _r = responseObject; + if (_r != null) + messageManager.Return(_r); messageManager.Dispose(); } @@ -138,8 +143,9 @@ public virtual void Dispose() /// public virtual void CompleteSends() { - if (responseObject.obj != null) - responseObject.Dispose(); + var _r = responseObject; + if (_r != null) + messageManager.Return(_r); messageManager.Dispose(); } } diff --git a/cs/remote/src/FASTER.server/WebsocketServerSession.cs b/cs/remote/src/FASTER.server/WebsocketServerSession.cs index 1303cb3cc..e8edce724 100644 --- a/cs/remote/src/FASTER.server/WebsocketServerSession.cs +++ b/cs/remote/src/FASTER.server/WebsocketServerSession.cs @@ -81,8 +81,8 @@ public override int TryConsumeMessages(byte[] buf) public override void CompleteRead(ref Output output, long ctx, core.Status status) { - byte* d = responseObject.obj.bufferPtr; - var dend = d + responseObject.obj.buffer.Length; + byte* d = responseObject.bufferPtr; + var dend = d + responseObject.buffer.Length; if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize) SendAndReset(ref d, ref dend); @@ -98,8 +98,8 @@ public override void CompleteRead(ref Output output, long ctx, core.Status statu public override void CompleteRMW(ref Output output, long ctx, Status status) { - byte* d = responseObject.obj.bufferPtr; - var dend = d + responseObject.obj.buffer.Length; + byte* d = responseObject.bufferPtr; + var dend = d + responseObject.buffer.Length; if ((int)(dend - dcurr) < 7 + maxSizeSettings.MaxOutputSize) SendAndReset(ref d, ref dend); @@ -185,8 +185,8 @@ private unsafe bool ProcessBatch(byte[] buf, int offset) fixed (byte* b = &buf[offset]) { - byte* d = responseObject.obj.bufferPtr; - var dend = d + responseObject.obj.buffer.Length; + byte* d = responseObject.bufferPtr; + var dend = d + responseObject.buffer.Length; dcurr = d; // reserve space for size var bytesAvailable = bytesRead - readHead; var _origReadHead = readHead; @@ -220,8 +220,7 @@ private unsafe bool ProcessBatch(byte[] buf, int offset) dcurr += response.Length; - SendResponse((int)(d - responseObject.obj.bufferPtr), (int)(dcurr - d)); - responseObject.obj = null; + SendResponse((int)(d - responseObject.bufferPtr), (int)(dcurr - d)); readHead = bytesRead; return completeWSCommand; @@ -535,6 +534,11 @@ private unsafe bool ProcessBatch(byte[] buf, int offset) // Send replies if (msgnum - start > 0) Send(d); + else + { + messageManager.Return(responseObject); + responseObject = null; + } } return completeWSCommand; @@ -569,8 +573,8 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r ref Key key = ref serializer.ReadKeyByRef(ref keyPtr); - byte* d = respObj.obj.bufferPtr; - var dend = d + respObj.obj.buffer.Length; + byte* d = respObj.bufferPtr; + var dend = d + respObj.buffer.Length; var dcurr = d + sizeof(int); // reserve space for size byte* outputDcurr; @@ -614,14 +618,14 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r Unsafe.AsRef(dstart).SeqNo = 0; int payloadSize = (int)(dcurr - d); // Set packet size in header - *(int*)respObj.obj.bufferPtr = -(payloadSize - sizeof(int)); + *(int*)respObj.bufferPtr = -(payloadSize - sizeof(int)); try { messageManager.Send(socket, respObj, 0, payloadSize); } catch { - respObj.Dispose(); + messageManager.Return(respObj); } } @@ -658,8 +662,8 @@ private void SendAndReset(ref byte* d, ref byte* dend) { Send(d); GetResponseObject(); - d = responseObject.obj.bufferPtr; - dend = d + responseObject.obj.buffer.Length; + d = responseObject.bufferPtr; + dend = d + responseObject.buffer.Length; dcurr = d; dcurr += 10; dcurr += sizeof(int); // reserve space for size @@ -680,8 +684,7 @@ private void Send(byte* d) *(int*)dtemp = (packetLen - sizeof(int)); *(int*)dstart = 0; *(int*)(dstart + sizeof(int)) = (msgnum - start); - SendResponse((int)(d - responseObject.obj.bufferPtr), (int)(dcurr - d)); - responseObject.obj = null; + SendResponse((int)(d - responseObject.bufferPtr), (int)(dcurr - d)); } } }