Skip to content

Commit

Permalink
[C#] Misc remote cleanup (#569)
Browse files Browse the repository at this point in the history
* Misc remote cleanup

* nit
  • Loading branch information
badrishc authored Oct 9, 2021
1 parent e0e148f commit 28f5494
Show file tree
Hide file tree
Showing 21 changed files with 324 additions and 100 deletions.
20 changes: 14 additions & 6 deletions cs/remote/samples/FixedLenServer/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public struct Output
}


public struct Functions : IFunctions<Key, Value, Input, Output, long>
public struct Functions : IAdvancedFunctions<Key, Value, Input, Output, long>
{
// No locking needed for atomic types such as Value
public bool SupportsLocking => false;

// Callbacks
public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { }

public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status) { }
public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status, RecordInfo recordInfo) { }

public void UpsertCompletionCallback(ref Key key, ref Value value, long ctx) { }

Expand All @@ -72,17 +72,23 @@ public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoi

// Read functions
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst) => dst.value = value;
public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, long address)
{
dst.value = value;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst) => dst.value = value;
public void ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address)
{
dst.value = value;
}

// Upsert functions
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SingleWriter(ref Key key, ref Value src, ref Value dst) => dst = src;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst)
public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address)
{
dst = src;
return true;
Expand All @@ -97,7 +103,7 @@ public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Ou
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output)
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
{
Interlocked.Add(ref value.value, input.value);
output.value = value;
Expand All @@ -114,5 +120,7 @@ public void CopyUpdater(ref Key key, ref Input input, ref Value oldValue, ref Va
public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) { }

public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) => true;

public void ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) { }
}
}
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.server/BinaryServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace FASTER.server
{
internal unsafe sealed class BinaryServerSession<Key, Value, Input, Output, Functions, ParameterSerializer>
: FasterKVServerSessionBase<Key, Value, Input, Output, Functions, ParameterSerializer>
where Functions : IFunctions<Key, Value, Input, Output, long>
where Functions : IAdvancedFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
{
readonly HeaderReaderWriter hrw;
Expand Down
4 changes: 1 addition & 3 deletions cs/remote/src/FASTER.server/FasterKVProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net.Sockets;
using FASTER.common;
using FASTER.core;
Expand All @@ -17,7 +15,7 @@ namespace FASTER.server
/// <typeparam name="Functions"></typeparam>
/// <typeparam name="ParameterSerializer"></typeparam>
public sealed class FasterKVProvider<Key, Value, Input, Output, Functions, ParameterSerializer> : ISessionProvider
where Functions : IFunctions<Key, Value, Input, Output, long>
where Functions : IAdvancedFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
{
readonly FasterKV<Key, Value> store;
Expand Down
4 changes: 2 additions & 2 deletions cs/remote/src/FASTER.server/FasterKVServerSessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
namespace FASTER.server
{
internal abstract class FasterKVServerSessionBase<Key, Value, Input, Output, Functions, ParameterSerializer> : FasterKVServerSessionBase<Output>
where Functions : IFunctions<Key, Value, Input, Output, long>
where Functions : IAdvancedFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
{
protected readonly ClientSession<Key, Value, Input, Output, long, ServerKVFunctions<Key, Value, Input, Output, Functions>> session;
protected readonly AdvancedClientSession<Key, Value, Input, Output, long, ServerKVFunctions<Key, Value, Input, Output, Functions>> session;
protected readonly ParameterSerializer serializer;

public FasterKVServerSessionBase(Socket socket, FasterKV<Key, Value> store, Functions functions,
Expand Down
14 changes: 11 additions & 3 deletions cs/remote/src/FASTER.server/FasterServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public FasterServer(string address, int port, int networkBufferSize = default)
if (networkBufferSize == default)
this.networkBufferSize = BufferSizeUtils.ClientBufferSize(new MaxSizeSettings());

var ip = IPAddress.Parse(address);
var ip = address == null ? IPAddress.Any : IPAddress.Parse(address);
var endPoint = new IPEndPoint(ip, port);
servSocket = new Socket(ip.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
servSocket.Bind(endPoint);
Expand Down Expand Up @@ -98,7 +98,7 @@ private bool HandleNewConnection(SocketAsyncEventArgs e)
return false;
}

// Ok to create new event args on accept because we assume a connection to be long-running
// Ok to create new event args on accept because we assume a connection to be long-running
var receiveEventArgs = new SocketAsyncEventArgs();
var buffer = new byte[networkBufferSize];
receiveEventArgs.SetBuffer(buffer, 0, networkBufferSize);
Expand Down Expand Up @@ -247,7 +247,15 @@ private unsafe bool CreateSession(SocketAsyncEventArgs e)

connArgs.session.AddBytesRead(connArgs.bytesRead);
var _newHead = connArgs.session.TryConsumeMessages(e.Buffer);
e.SetBuffer(_newHead, e.Buffer.Length - _newHead);
if (_newHead == e.Buffer.Length)
{
// Need to grow input buffer
var newBuffer = new byte[e.Buffer.Length * 2];
Array.Copy(e.Buffer, newBuffer, e.Buffer.Length);
e.SetBuffer(newBuffer, _newHead, newBuffer.Length - _newHead);
}
else
e.SetBuffer(_newHead, e.Buffer.Length - _newHead);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace FASTER.server
/// </summary>
public sealed class SpanByteFasterKVProvider : ISessionProvider
{
readonly StoreWrapper<SpanByte, SpanByte> storeWrapper;
readonly FasterKV<SpanByte, SpanByte> store;
readonly SpanByteServerSerializer serializer;
readonly SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker;
readonly SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker;
Expand All @@ -22,11 +22,20 @@ public sealed class SpanByteFasterKVProvider : ISessionProvider
/// <param name="store"></param>
/// <param name="kvBroker"></param>
/// <param name="broker"></param>
/// <param name="tryRecover"></param>
/// <param name="serverOptions"></param>
/// <param name="maxSizeSettings"></param>
public SpanByteFasterKVProvider(FasterKV<SpanByte, SpanByte> store, SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker = null, SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null, bool tryRecover = true, MaxSizeSettings maxSizeSettings = default)
public SpanByteFasterKVProvider(FasterKV<SpanByte, SpanByte> store, SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker = null, SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null, ServerOptions serverOptions = null, MaxSizeSettings maxSizeSettings = default)
{
this.storeWrapper = new StoreWrapper<SpanByte, SpanByte>(store, tryRecover);
this.store = store;
if ((serverOptions ?? new ServerOptions()).Recover)
{
try
{
store.Recover();
}
catch
{ }
}
this.kvBroker = kvBroker;
this.broker = broker;
this.serializer = new SpanByteServerSerializer();
Expand All @@ -40,10 +49,10 @@ public IServerSession GetSession(WireFormat wireFormat, Socket socket)
{
case WireFormat.WebSocket:
return new WebsocketServerSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteServerSerializer>
(socket, storeWrapper.store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
(socket, store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
default:
return new BinaryServerSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteServerSerializer>
(socket, storeWrapper.store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
(socket, store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions cs/remote/src/FASTER.server/PubSub/SubscribeBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public sealed class SubscribeBroker<Key, Value, KeyValueSerializer> : IDisposabl
private AsyncQueue<(byte[], byte[])> publishQueue;
readonly IKeySerializer<Key> keySerializer;
readonly FasterLog log;
readonly IDevice device;
readonly CancellationTokenSource cts = new();
readonly ManualResetEvent done = new(true);
bool disposed = false;
Expand All @@ -41,7 +42,7 @@ public sealed class SubscribeBroker<Key, Value, KeyValueSerializer> : IDisposabl
public SubscribeBroker(IKeySerializer<Key> keySerializer, string logDir, bool startFresh = true)
{
this.keySerializer = keySerializer;
var device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device.Initialize((long)(1 << 30) * 64);
log = new FasterLog(new FasterLogSettings { LogDevice = device });
if (startFresh)
Expand Down Expand Up @@ -139,8 +140,8 @@ private async Task Start(CancellationToken cancellationToken = default)
if (disposed)
break;

var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
await iter.WaitAsync(cts.Token);
using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
await iter.WaitAsync(cancellationToken);
while (iter.GetNext(out byte[] subscriptionKey, out _, out _, out _))
{
if (!iter.GetNext(out byte[] subscriptionValue, out _, out long currentAddress, out long nextAddress))
Expand Down Expand Up @@ -376,6 +377,7 @@ public void Dispose()
subscriptions?.Clear();
prefixSubscriptions?.Clear();
log.Dispose();
device.Dispose();
}
}
}
6 changes: 4 additions & 2 deletions cs/remote/src/FASTER.server/PubSub/SubscribeKVBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public sealed class SubscribeKVBroker<Key, Value, Input, KeyInputSerializer> : I
private AsyncQueue<byte[]> publishQueue;
readonly IKeyInputSerializer<Key, Input> keyInputSerializer;
readonly FasterLog log;
readonly IDevice device;
readonly CancellationTokenSource cts = new();
readonly ManualResetEvent done = new(true);
bool disposed = false;
Expand All @@ -41,7 +42,7 @@ public sealed class SubscribeKVBroker<Key, Value, Input, KeyInputSerializer> : I
public SubscribeKVBroker(IKeyInputSerializer<Key, Input> keyInputSerializer, string logDir, bool startFresh = true)
{
this.keyInputSerializer = keyInputSerializer;
var device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device.Initialize((long)(1 << 30) * 64);
log = new FasterLog(new FasterLogSettings { LogDevice = device });
if (startFresh)
Expand Down Expand Up @@ -101,7 +102,7 @@ internal async Task Start(CancellationToken cancellationToken = default)
if (disposed)
break;

var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
await iter.WaitAsync(cancellationToken);
while (iter.GetNext(out byte[] subscriptionKey, out int entryLength, out long currentAddress, out long nextAddress))
{
Expand Down Expand Up @@ -255,6 +256,7 @@ public void Dispose()
subscriptions?.Clear();
prefixSubscriptions?.Clear();
log.Dispose();
device.Dispose();
}
}
}
27 changes: 15 additions & 12 deletions cs/remote/src/FASTER.server/ServerKVFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

namespace FASTER.server
{
internal struct ServerKVFunctions<Key, Value, Input, Output, Functions> : IFunctions<Key, Value, Input, Output, long>
where Functions : IFunctions<Key, Value, Input, Output, long>
internal struct ServerKVFunctions<Key, Value, Input, Output, Functions> : IAdvancedFunctions<Key, Value, Input, Output, long>
where Functions : IAdvancedFunctions<Key, Value, Input, Output, long>
{
private readonly Functions functions;
private readonly FasterKVServerSessionBase<Output> serverNetworkSession;
Expand All @@ -22,11 +22,14 @@ public ServerKVFunctions(Functions functions, FasterKVServerSessionBase<Output>
public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint)
=> functions.CheckpointCompletionCallback(sessionId, commitPoint);

public void ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst)
=> functions.ConcurrentReader(ref key, ref input, ref value, ref dst);
public void ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address)
=> functions.ConcurrentDeleter(ref key, ref value, ref recordInfo, address);

public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst)
=> functions.ConcurrentWriter(ref key, ref src, ref dst);
public void ConcurrentReader(ref Key key, ref Input input, ref Value value, ref Output dst, ref RecordInfo recordInfo, long address)
=> functions.ConcurrentReader(ref key, ref input, ref value, ref dst, ref recordInfo, address);

public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address)
=> functions.ConcurrentWriter(ref key, ref src, ref dst, ref recordInfo, address);

public bool NeedCopyUpdate(ref Key key, ref Input input, ref Value oldValue, ref Output output)
=> functions.NeedCopyUpdate(ref key, ref input, ref oldValue, ref output);
Expand All @@ -40,13 +43,13 @@ public void DeleteCompletionCallback(ref Key key, long ctx)
public void InitialUpdater(ref Key key, ref Input input, ref Value value, ref Output output)
=> functions.InitialUpdater(ref key, ref input, ref value, ref output);

public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output)
=> functions.InPlaceUpdater(ref key, ref input, ref value, ref output);
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Output output, ref RecordInfo recordInfo, long address)
=> functions.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address);

public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status)
public void ReadCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status, RecordInfo recordInfo)
{
serverNetworkSession.CompleteRead(ref output, ctx, status);
functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status);
functions.ReadCompletionCallback(ref key, ref input, ref output, ctx, status, recordInfo);
}

public void RMWCompletionCallback(ref Key key, ref Input input, ref Output output, long ctx, Status status)
Expand All @@ -55,8 +58,8 @@ public void RMWCompletionCallback(ref Key key, ref Input input, ref Output outpu
functions.RMWCompletionCallback(ref key, ref input, ref output, ctx, status);
}

public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst)
=> functions.SingleReader(ref key, ref input, ref value, ref dst);
public void SingleReader(ref Key key, ref Input input, ref Value value, ref Output dst, long address)
=> functions.SingleReader(ref key, ref input, ref value, ref dst, address);

public void SingleWriter(ref Key key, ref Value src, ref Value dst)
=> functions.SingleWriter(ref key, ref src, ref dst);
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.server/Servers/FixedLenServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public sealed class FixedLenServer<Key, Value, Input, Output, Functions> : Gener
where Value : unmanaged
where Input : unmanaged
where Output : unmanaged
where Functions : IFunctions<Key, Value, Input, Output, long>
where Functions : IAdvancedFunctions<Key, Value, Input, Output, long>
{
/// <summary>
/// Create server instance; use Start to start the server.
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.server/Servers/GenericServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace FASTER.server
/// FASTER server for generic types
/// </summary>
public class GenericServer<Key, Value, Input, Output, Functions, ParameterSerializer> : IDisposable
where Functions : IFunctions<Key, Value, Input, Output, long>
where Functions : IAdvancedFunctions<Key, Value, Input, Output, long>
where ParameterSerializer : IServerSerializer<Key, Value, Input, Output>
{
readonly ServerOptions opts;
Expand Down
Loading

0 comments on commit 28f5494

Please sign in to comment.