Skip to content

Commit

Permalink
Merge from master
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Oct 9, 2021
2 parents 19eb229 + 28f5494 commit ffe14a0
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 65 deletions.
2 changes: 0 additions & 2 deletions cs/remote/src/FASTER.server/FasterKVProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net.Sockets;
using FASTER.common;
using FASTER.core;
Expand Down
14 changes: 11 additions & 3 deletions cs/remote/src/FASTER.server/FasterServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public FasterServer(string address, int port, int networkBufferSize = default)
if (networkBufferSize == default)
this.networkBufferSize = BufferSizeUtils.ClientBufferSize(new MaxSizeSettings());

var ip = IPAddress.Parse(address);
var ip = address == null ? IPAddress.Any : IPAddress.Parse(address);
var endPoint = new IPEndPoint(ip, port);
servSocket = new Socket(ip.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
servSocket.Bind(endPoint);
Expand Down Expand Up @@ -98,7 +98,7 @@ private bool HandleNewConnection(SocketAsyncEventArgs e)
return false;
}

// Ok to create new event args on accept because we assume a connection to be long-running
// Ok to create new event args on accept because we assume a connection to be long-running
var receiveEventArgs = new SocketAsyncEventArgs();
var buffer = new byte[networkBufferSize];
receiveEventArgs.SetBuffer(buffer, 0, networkBufferSize);
Expand Down Expand Up @@ -247,7 +247,15 @@ private unsafe bool CreateSession(SocketAsyncEventArgs e)

connArgs.session.AddBytesRead(connArgs.bytesRead);
var _newHead = connArgs.session.TryConsumeMessages(e.Buffer);
e.SetBuffer(_newHead, e.Buffer.Length - _newHead);
if (_newHead == e.Buffer.Length)
{
// Need to grow input buffer
var newBuffer = new byte[e.Buffer.Length * 2];
Array.Copy(e.Buffer, newBuffer, e.Buffer.Length);
e.SetBuffer(newBuffer, _newHead, newBuffer.Length - _newHead);
}
else
e.SetBuffer(_newHead, e.Buffer.Length - _newHead);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace FASTER.server
/// </summary>
public sealed class SpanByteFasterKVProvider : ISessionProvider
{
readonly StoreWrapper<SpanByte, SpanByte> storeWrapper;
readonly FasterKV<SpanByte, SpanByte> store;
readonly SpanByteServerSerializer serializer;
readonly SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker;
readonly SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker;
Expand All @@ -22,11 +22,20 @@ public sealed class SpanByteFasterKVProvider : ISessionProvider
/// <param name="store"></param>
/// <param name="kvBroker"></param>
/// <param name="broker"></param>
/// <param name="tryRecover"></param>
/// <param name="serverOptions"></param>
/// <param name="maxSizeSettings"></param>
public SpanByteFasterKVProvider(FasterKV<SpanByte, SpanByte> store, SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker = null, SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null, bool tryRecover = true, MaxSizeSettings maxSizeSettings = default)
public SpanByteFasterKVProvider(FasterKV<SpanByte, SpanByte> store, SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker = null, SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null, ServerOptions serverOptions = null, MaxSizeSettings maxSizeSettings = default)
{
this.storeWrapper = new StoreWrapper<SpanByte, SpanByte>(store, tryRecover);
this.store = store;
if ((serverOptions ?? new ServerOptions()).Recover)
{
try
{
store.Recover();
}
catch
{ }
}
this.kvBroker = kvBroker;
this.broker = broker;
this.serializer = new SpanByteServerSerializer();
Expand All @@ -40,10 +49,10 @@ public IServerSession GetSession(WireFormat wireFormat, Socket socket)
{
case WireFormat.WebSocket:
return new WebsocketServerSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteServerSerializer>
(socket, storeWrapper.store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
(socket, store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
default:
return new BinaryServerSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteServerSerializer>
(socket, storeWrapper.store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
(socket, store, new SpanByteFunctionsForServer<long>(), serializer, maxSizeSettings, kvBroker, broker);
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions cs/remote/src/FASTER.server/PubSub/SubscribeBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public sealed class SubscribeBroker<Key, Value, KeyValueSerializer> : IDisposabl
private AsyncQueue<(byte[], byte[])> publishQueue;
readonly IKeySerializer<Key> keySerializer;
readonly FasterLog log;
readonly IDevice device;
readonly CancellationTokenSource cts = new();
readonly ManualResetEvent done = new(true);
bool disposed = false;
Expand All @@ -41,7 +42,7 @@ public sealed class SubscribeBroker<Key, Value, KeyValueSerializer> : IDisposabl
public SubscribeBroker(IKeySerializer<Key> keySerializer, string logDir, bool startFresh = true)
{
this.keySerializer = keySerializer;
var device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device.Initialize((long)(1 << 30) * 64);
log = new FasterLog(new FasterLogSettings { LogDevice = device });
if (startFresh)
Expand Down Expand Up @@ -139,8 +140,8 @@ private async Task Start(CancellationToken cancellationToken = default)
if (disposed)
break;

var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
await iter.WaitAsync(cts.Token);
using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
await iter.WaitAsync(cancellationToken);
while (iter.GetNext(out byte[] subscriptionKey, out _, out _, out _))
{
if (!iter.GetNext(out byte[] subscriptionValue, out _, out long currentAddress, out long nextAddress))
Expand Down Expand Up @@ -376,6 +377,7 @@ public void Dispose()
subscriptions?.Clear();
prefixSubscriptions?.Clear();
log.Dispose();
device.Dispose();
}
}
}
6 changes: 4 additions & 2 deletions cs/remote/src/FASTER.server/PubSub/SubscribeKVBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public sealed class SubscribeKVBroker<Key, Value, Input, KeyInputSerializer> : I
private AsyncQueue<byte[]> publishQueue;
readonly IKeyInputSerializer<Key, Input> keyInputSerializer;
readonly FasterLog log;
readonly IDevice device;
readonly CancellationTokenSource cts = new();
readonly ManualResetEvent done = new(true);
bool disposed = false;
Expand All @@ -41,7 +42,7 @@ public sealed class SubscribeKVBroker<Key, Value, Input, KeyInputSerializer> : I
public SubscribeKVBroker(IKeyInputSerializer<Key, Input> keyInputSerializer, string logDir, bool startFresh = true)
{
this.keyInputSerializer = keyInputSerializer;
var device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device.Initialize((long)(1 << 30) * 64);
log = new FasterLog(new FasterLogSettings { LogDevice = device });
if (startFresh)
Expand Down Expand Up @@ -101,7 +102,7 @@ internal async Task Start(CancellationToken cancellationToken = default)
if (disposed)
break;

var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
await iter.WaitAsync(cancellationToken);
while (iter.GetNext(out byte[] subscriptionKey, out int entryLength, out long currentAddress, out long nextAddress))
{
Expand Down Expand Up @@ -255,6 +256,7 @@ public void Dispose()
subscriptions?.Clear();
prefixSubscriptions?.Clear();
log.Dispose();
device.Dispose();
}
}
}
90 changes: 73 additions & 17 deletions cs/remote/src/FASTER.server/Servers/ServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.IO;
using FASTER.core;

namespace FASTER.server
Expand Down Expand Up @@ -62,42 +63,48 @@ public class ServerOptions
/// </summary>
public bool EnablePubSub = true;

/// <summary>
/// Constructor
/// </summary>
public ServerOptions()
{
}

internal int MemorySizeBits()
{
long size = ParseSize(MemorySize);
int bits = (int)Math.Floor(Math.Log(size, 2));
if (size != Math.Pow(2, bits))
long adjustedSize = PreviousPowerOf2(size);
if (size != adjustedSize)
Trace.WriteLine($"Warning: using lower log memory size than specified (power of 2)");
return bits;
return (int)Math.Log(adjustedSize, 2);
}

internal int PageSizeBits()
{
long size = ParseSize(PageSize);
int bits = (int)Math.Ceiling(Math.Log(size, 2));
if (size != Math.Pow(2, bits))
long adjustedSize = PreviousPowerOf2(size);
if (size != adjustedSize)
Trace.WriteLine($"Warning: using lower page size than specified (power of 2)");
return bits;
return (int)Math.Log(adjustedSize, 2);
}

internal int SegmentSizeBits()
{
long size = ParseSize(SegmentSize);
int bits = (int)Math.Ceiling(Math.Log(size, 2));
if (size != Math.Pow(2, bits))
long adjustedSize = PreviousPowerOf2(size);
if (size != adjustedSize)
Trace.WriteLine($"Warning: using lower disk segment size than specified (power of 2)");
return bits;
return (int)Math.Log(adjustedSize, 2);
}

internal int IndexSizeCachelines()
{
long size = ParseSize(IndexSize);
int bits = (int)Math.Ceiling(Math.Log(size, 2));
long adjustedSize = 1L << bits;
if (adjustedSize < 64) throw new Exception("Invalid index size");
long adjustedSize = PreviousPowerOf2(size);
if (adjustedSize < 64 || adjustedSize > (1L << 37)) throw new Exception("Invalid index size");
if (size != adjustedSize)
Trace.WriteLine($"Warning: using lower hash index size than specified (power of 2)");
return 1 << (bits - 6);
return (int)(adjustedSize / 64);
}

internal void GetSettings(out LogSettings logSettings, out CheckpointSettings checkpointSettings, out int indexSize)
Expand All @@ -118,12 +125,50 @@ internal void GetSettings(out LogSettings logSettings, out CheckpointSettings ch
indexSize = IndexSizeCachelines();
Trace.WriteLine($"[Store] Using hash index size of {PrettySize(indexSize * 64L)} ({PrettySize(indexSize)} cache lines)");

var device = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "Store/hlog", preallocateFile: false);
if (LogDir == null)
LogDir = Directory.GetCurrentDirectory();

var device = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "/Store/hlog");
logSettings.LogDevice = device;

checkpointSettings = new CheckpointSettings {
CheckPointType = CheckpointType.Snapshot,
CheckpointDir = CheckpointDir ?? (LogDir + "Store/checkpoints"),
checkpointSettings = new CheckpointSettings
{
CheckPointType = CheckpointType.Snapshot,
CheckpointDir = CheckpointDir ?? (LogDir + "/Store/checkpoints"),
RemoveOutdated = true,
};
}

internal void GetObjectStoreSettings(out LogSettings objLogSettings, out CheckpointSettings objCheckpointSettings, out int objIndexSize)
{
objLogSettings = new LogSettings { PreallocateLog = false };

objLogSettings.PageSizeBits = PageSizeBits();
Trace.WriteLine($"[Object Store] Using page size of {PrettySize((long)Math.Pow(2, objLogSettings.PageSizeBits))}");

objLogSettings.MemorySizeBits = MemorySizeBits();
Trace.WriteLine($"[Object Store] Using log memory size of {PrettySize((long)Math.Pow(2, objLogSettings.MemorySizeBits))}");

Trace.WriteLine($"[Object Store] There are {PrettySize(1 << (objLogSettings.MemorySizeBits - objLogSettings.PageSizeBits))} log pages in memory");

objLogSettings.SegmentSizeBits = SegmentSizeBits();
Trace.WriteLine($"[Object Store] Using disk segment size of {PrettySize((long)Math.Pow(2, objLogSettings.SegmentSizeBits))}");

objIndexSize = IndexSizeCachelines() / 64;
Trace.WriteLine($"[Object Store] Using hash index size of {PrettySize(objIndexSize * 64L)} ({PrettySize(objIndexSize)} cache lines)");

if (LogDir == null)
LogDir = Directory.GetCurrentDirectory();

var device = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "/ObjectStore/hlog");
objLogSettings.LogDevice = device;
var objdevice = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "/ObjectStore/hlog.obj");
objLogSettings.ObjectLogDevice = objdevice;

objCheckpointSettings = new CheckpointSettings
{
CheckPointType = CheckpointType.Snapshot,
CheckpointDir = CheckpointDir ?? (LogDir + "/ObjectStore/checkpoints"),
RemoveOutdated = true,
};
}
Expand Down Expand Up @@ -181,5 +226,16 @@ private static string PrettySize(long value)
return v.ToString() + suffix[-exp / 3 - 1];
return v.ToString();
}

private long PreviousPowerOf2(long v)
{
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
return v - (v >> 1);
}
}
}
6 changes: 4 additions & 2 deletions cs/remote/src/FASTER.server/Servers/VarLenServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public sealed class VarLenServer : IDisposable
readonly SpanByteFasterKVProvider provider;
readonly SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker;
readonly SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker;
readonly LogSettings logSettings;

/// <summary>
/// Create server instance; use Start to start the server.
Expand All @@ -34,7 +35,7 @@ public VarLenServer(ServerOptions opts)
if (opts.CheckpointDir != null && !Directory.Exists(opts.CheckpointDir))
Directory.CreateDirectory(opts.CheckpointDir);

opts.GetSettings(out var logSettings, out var checkpointSettings, out var indexSize);
opts.GetSettings(out logSettings, out var checkpointSettings, out var indexSize);
store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);

if (opts.EnablePubSub)
Expand All @@ -44,7 +45,7 @@ public VarLenServer(ServerOptions opts)
}

// Create session provider for VarLen
provider = new SpanByteFasterKVProvider(store, kvBroker, broker, opts.Recover);
provider = new SpanByteFasterKVProvider(store, kvBroker, broker, opts);

server = new FasterServer(opts.Address, opts.Port);
server.Register(WireFormat.DefaultVarLenKV, provider);
Expand Down Expand Up @@ -82,6 +83,7 @@ private void InternalDispose()
broker?.Dispose();
kvBroker?.Dispose();
store.Dispose();
logSettings.LogDevice.Dispose();
}

private static void DeleteDirectory(string path)
Expand Down
29 changes: 0 additions & 29 deletions cs/remote/src/FASTER.server/StoreWrapper.cs

This file was deleted.

2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.server/WebsocketServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal unsafe sealed class WebsocketServerSession<Key, Value, Input, Output, F
byte* recvBufferPtr;
int readHead;

int seqNo, pendingSeqNo, msgnum, start;
int pendingSeqNo, msgnum, start;
byte* dcurr;

readonly SubscribeKVBroker<Key, Value, Input, IKeyInputSerializer<Key, Input>> subscribeKVBroker;
Expand Down
Loading

0 comments on commit ffe14a0

Please sign in to comment.