Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Dec 8, 2021
1 parent 0cfe372 commit ef5be45
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 50 deletions.
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void Flush()
// Set packet size in header
*(int*)networkSender.GetResponseObjectHead() = -(payloadSize - sizeof(int));

networkSender.SendResponse(payloadSize);
networkSender.SendResponse(0, payloadSize);
networkSender.GetResponseObject();
offset = networkSender.GetResponseObjectHead() + sizeof(int) + BatchHeader.Size;
numMessages = 0;
Expand Down
15 changes: 5 additions & 10 deletions cs/remote/src/FASTER.common/INetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,12 @@ public interface INetworkSender : IDisposable
unsafe byte* GetResponseObjectTail();

/// <summary>
/// Send payload stored at response object of from head ptr to ptr + size
/// Send payload stored at response object, from offset to offset + size
/// </summary>
/// <param name="size"></param>
void SendResponse(int size);

/// <summary>
/// Send payload stored at response object of from head ptr + offset to ptr + offset + size
/// </summary>
/// <param name="offset"></param>
/// <param name="size"></param>
void SendResponse(int offset, int size);
/// <param name="offset">Offset of response from which to start sending</param>
/// <param name="size">Number of bytes to send, starting from offset</param>
/// <returns>Whether the send succeeded</returns>
bool SendResponse(int offset, int size);

/// <summary>
/// Dispose, optionally waiting for ongoing outgoing calls to complete
Expand Down
5 changes: 1 addition & 4 deletions cs/remote/src/FASTER.common/NetworkSenderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ public NetworkSenderBase(int serverBufferSize)
public virtual unsafe byte* GetResponseObjectTail() { return null; }

/// <inheritdoc />
public abstract void SendResponse(int size);

/// <inheritdoc />
public abstract void SendResponse(int offset, int size);
public abstract bool SendResponse(int offset, int size);

/// <inheritdoc />
public abstract void Dispose();
Expand Down
23 changes: 5 additions & 18 deletions cs/remote/src/FASTER.common/TcpNetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,34 +93,21 @@ public override void ReturnResponseObject()

/// <inheritdoc />
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override void SendResponse(int size)
{
var _r = responseObject;
responseObject = null;
try
{
Send(socket, _r, 0, size);
}
catch
{
reusableSeaaBuffer.Return(_r);
}
}

/// <inheritdoc />
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override void SendResponse(int offset, int size)
public override bool SendResponse(int offset, int size)
{
var _r = responseObject;
if (_r == null) return false;
responseObject = null;
try
{
Send(socket, _r, offset, size);
}
catch
{
reusableSeaaBuffer.Return(_r);
reusableSeaaBuffer.Return(_r);
return false;
}
return true;
}

/// <inheritdoc />
Expand Down
4 changes: 2 additions & 2 deletions cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)networkSender.GetResponseObjectHead()= -(payloadSize - sizeof(int));
networkSender.SendResponse(payloadSize);
networkSender.SendResponse(0, payloadSize);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -341,7 +341,7 @@ private void Send(byte* d)
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)networkSender.GetResponseObjectHead() = -(payloadSize - sizeof(int));
networkSender.SendResponse(payloadSize);
networkSender.SendResponse(0, payloadSize);
}

private bool HandlePubSub(MessageType message, ref byte* src, ref byte* d, ref byte* dend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class FasterServerBase : IFasterServer

readonly string address;
readonly int port;
int networkBufferSize;
readonly int networkBufferSize;

/// <summary>
/// Server Address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
namespace FASTER.server
{
/// <summary>
/// TcpServer
/// FASTER server for TCP protocol
/// </summary>
public class TcpServer : FasterServerBase
public class FasterServerTcp : FasterServerBase
{
readonly SocketAsyncEventArgs acceptEventArg;
readonly Socket servSocket;

/// <summary>
///
/// Constructor for server
/// </summary>
/// <param name="address"></param>
/// <param name="port"></param>
/// <param name="networkBufferSize"></param>
public TcpServer(string address, int port, int networkBufferSize = default)
public FasterServerTcp(string address, int port, int networkBufferSize = default)
: base(address, port, networkBufferSize)
{
var ip = Address == null ? IPAddress.Any : IPAddress.Parse(Address);
Expand Down Expand Up @@ -202,7 +202,7 @@ private unsafe bool CreateSession(SocketAsyncEventArgs e)
return true;
}

private unsafe void ProcessRequest(SocketAsyncEventArgs e)
private static unsafe void ProcessRequest(SocketAsyncEventArgs e)
{
var connArgs = (ConnectionArgs)e.UserToken;
connArgs.session.AddBytesRead(e.BytesTransferred);
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.server/Servers/GenericServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public GenericServer(ServerOptions opts, Func<Functions> functionsGen, Parameter
// Create session provider for VarLen
provider = new FasterKVProvider<Key, Value, Input, Output, Functions, ParameterSerializer>(functionsGen, store, serializer, kvBroker, broker, opts.Recover, maxSizeSettings);

server = new TcpServer(opts.Address, opts.Port);
server = new FasterServerTcp(opts.Address, opts.Port);
server.Register(WireFormat.DefaultFixedLenKV, provider);
}

Expand Down
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.server/Servers/VarLenServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public VarLenServer(ServerOptions opts)
// Create session provider for VarLen
provider = new SpanByteFasterKVProvider(store, kvBroker, broker, opts.Recover);

server = new TcpServer(opts.Address, opts.Port);
server = new FasterServerTcp(opts.Address, opts.Port);
server.Register(WireFormat.DefaultVarLenKV, provider);
server.Register(WireFormat.WebSocket, provider);
}
Expand Down
16 changes: 9 additions & 7 deletions cs/remote/src/FASTER.server/WebsocketServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public WebsocketServerSession(
public override unsafe int TryConsumeMessages(byte* req_buf, int bytesReceived)
{
recvBufferPtr = req_buf;
while (TryReadMessages(req_buf, out var offset))
while (TryReadMessages(out var offset))
{
bool completeWSCommand = ProcessBatch(req_buf, bytesReceived, offset);
if (!completeWSCommand)
Expand Down Expand Up @@ -114,7 +114,7 @@ public override void CompleteRMW(ref Output output, long ctx, Status status)
CreateSendPacketHeader(ref d, packetLen);
}

private bool TryReadMessages(byte* buf, out int offset)
private bool TryReadMessages(out int offset)
{
offset = default;

Expand Down Expand Up @@ -304,10 +304,12 @@ private unsafe bool ProcessBatch(byte* buf, int length, int offset)
nextBufOffset += 8;
}

var nextDecoderInfo = new Decoder();
nextDecoderInfo.msgLen = nextMsgLen;
nextDecoderInfo.maskStart = nextBufOffset;
nextDecoderInfo.dataStart = nextBufOffset + 4;
var nextDecoderInfo = new Decoder
{
msgLen = nextMsgLen,
maskStart = nextBufOffset,
dataStart = nextBufOffset + 4
};
decoderInfoList.Add(nextDecoderInfo);
totalMsgLen += nextMsgLen;
offset += 4;
Expand Down Expand Up @@ -615,7 +617,7 @@ private unsafe void Publish(ref byte* keyPtr, int keyLength, ref byte* valPtr, r
int payloadSize = (int)(dcurr - d);
// Set packet size in header
*(int*)networkSender.GetResponseObjectHead() = -(payloadSize - sizeof(int));
networkSender.SendResponse(payloadSize);
networkSender.SendResponse(0, payloadSize);
}


Expand Down

0 comments on commit ef5be45

Please sign in to comment.