diff --git a/cs/remote/src/FASTER.server/FasterKVProvider.cs b/cs/remote/src/FASTER.server/FasterKVProvider.cs index d79e7f4de..f1e056e7c 100644 --- a/cs/remote/src/FASTER.server/FasterKVProvider.cs +++ b/cs/remote/src/FASTER.server/FasterKVProvider.cs @@ -1,6 +1,4 @@ using System; -using System.Collections.Concurrent; -using System.Collections.Generic; using System.Net.Sockets; using FASTER.common; using FASTER.core; diff --git a/cs/remote/src/FASTER.server/FasterServer.cs b/cs/remote/src/FASTER.server/FasterServer.cs index e9c743cd4..56d52aece 100644 --- a/cs/remote/src/FASTER.server/FasterServer.cs +++ b/cs/remote/src/FASTER.server/FasterServer.cs @@ -43,7 +43,7 @@ public FasterServer(string address, int port, int networkBufferSize = default) if (networkBufferSize == default) this.networkBufferSize = BufferSizeUtils.ClientBufferSize(new MaxSizeSettings()); - var ip = IPAddress.Parse(address); + var ip = address == null ? IPAddress.Any : IPAddress.Parse(address); var endPoint = new IPEndPoint(ip, port); servSocket = new Socket(ip.AddressFamily, SocketType.Stream, ProtocolType.Tcp); servSocket.Bind(endPoint); @@ -98,7 +98,7 @@ private bool HandleNewConnection(SocketAsyncEventArgs e) return false; } - // Ok to create new event args on accept because we assume a connection to be long-running + // Ok to create new event args on accept because we assume a connection to be long-running var receiveEventArgs = new SocketAsyncEventArgs(); var buffer = new byte[networkBufferSize]; receiveEventArgs.SetBuffer(buffer, 0, networkBufferSize); @@ -247,7 +247,15 @@ private unsafe bool CreateSession(SocketAsyncEventArgs e) connArgs.session.AddBytesRead(connArgs.bytesRead); var _newHead = connArgs.session.TryConsumeMessages(e.Buffer); - e.SetBuffer(_newHead, e.Buffer.Length - _newHead); + if (_newHead == e.Buffer.Length) + { + // Need to grow input buffer + var newBuffer = new byte[e.Buffer.Length * 2]; + Array.Copy(e.Buffer, newBuffer, e.Buffer.Length); + e.SetBuffer(newBuffer, _newHead, newBuffer.Length - _newHead); + } + else + e.SetBuffer(_newHead, e.Buffer.Length - _newHead); return true; } diff --git a/cs/remote/src/FASTER.server/SpanByteFasterKVProvider.cs b/cs/remote/src/FASTER.server/Providers/SpanByteFasterKVProvider.cs similarity index 74% rename from cs/remote/src/FASTER.server/SpanByteFasterKVProvider.cs rename to cs/remote/src/FASTER.server/Providers/SpanByteFasterKVProvider.cs index b57163cfc..482635371 100644 --- a/cs/remote/src/FASTER.server/SpanByteFasterKVProvider.cs +++ b/cs/remote/src/FASTER.server/Providers/SpanByteFasterKVProvider.cs @@ -10,7 +10,7 @@ namespace FASTER.server /// public sealed class SpanByteFasterKVProvider : ISessionProvider { - readonly StoreWrapper storeWrapper; + readonly FasterKV store; readonly SpanByteServerSerializer serializer; readonly SubscribeKVBroker> kvBroker; readonly SubscribeBroker> broker; @@ -22,11 +22,20 @@ public sealed class SpanByteFasterKVProvider : ISessionProvider /// /// /// - /// + /// /// - public SpanByteFasterKVProvider(FasterKV store, SubscribeKVBroker> kvBroker = null, SubscribeBroker> broker = null, bool tryRecover = true, MaxSizeSettings maxSizeSettings = default) + public SpanByteFasterKVProvider(FasterKV store, SubscribeKVBroker> kvBroker = null, SubscribeBroker> broker = null, ServerOptions serverOptions = null, MaxSizeSettings maxSizeSettings = default) { - this.storeWrapper = new StoreWrapper(store, tryRecover); + this.store = store; + if ((serverOptions ?? new ServerOptions()).Recover) + { + try + { + store.Recover(); + } + catch + { } + } this.kvBroker = kvBroker; this.broker = broker; this.serializer = new SpanByteServerSerializer(); @@ -40,10 +49,10 @@ public IServerSession GetSession(WireFormat wireFormat, Socket socket) { case WireFormat.WebSocket: return new WebsocketServerSession, SpanByteServerSerializer> - (socket, storeWrapper.store, new SpanByteFunctionsForServer(), serializer, maxSizeSettings, kvBroker, broker); + (socket, store, new SpanByteFunctionsForServer(), serializer, maxSizeSettings, kvBroker, broker); default: return new BinaryServerSession, SpanByteServerSerializer> - (socket, storeWrapper.store, new SpanByteFunctionsForServer(), serializer, maxSizeSettings, kvBroker, broker); + (socket, store, new SpanByteFunctionsForServer(), serializer, maxSizeSettings, kvBroker, broker); } } } diff --git a/cs/remote/src/FASTER.server/PubSub/SubscribeBroker.cs b/cs/remote/src/FASTER.server/PubSub/SubscribeBroker.cs index b0166222f..4d5881595 100644 --- a/cs/remote/src/FASTER.server/PubSub/SubscribeBroker.cs +++ b/cs/remote/src/FASTER.server/PubSub/SubscribeBroker.cs @@ -28,6 +28,7 @@ public sealed class SubscribeBroker : IDisposabl private AsyncQueue<(byte[], byte[])> publishQueue; readonly IKeySerializer keySerializer; readonly FasterLog log; + readonly IDevice device; readonly CancellationTokenSource cts = new(); readonly ManualResetEvent done = new(true); bool disposed = false; @@ -41,7 +42,7 @@ public sealed class SubscribeBroker : IDisposabl public SubscribeBroker(IKeySerializer keySerializer, string logDir, bool startFresh = true) { this.keySerializer = keySerializer; - var device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false); + device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false); device.Initialize((long)(1 << 30) * 64); log = new FasterLog(new FasterLogSettings { LogDevice = device }); if (startFresh) @@ -139,8 +140,8 @@ private async Task Start(CancellationToken cancellationToken = default) if (disposed) break; - var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true); - await iter.WaitAsync(cts.Token); + using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true); + await iter.WaitAsync(cancellationToken); while (iter.GetNext(out byte[] subscriptionKey, out _, out _, out _)) { if (!iter.GetNext(out byte[] subscriptionValue, out _, out long currentAddress, out long nextAddress)) @@ -376,6 +377,7 @@ public void Dispose() subscriptions?.Clear(); prefixSubscriptions?.Clear(); log.Dispose(); + device.Dispose(); } } } diff --git a/cs/remote/src/FASTER.server/PubSub/SubscribeKVBroker.cs b/cs/remote/src/FASTER.server/PubSub/SubscribeKVBroker.cs index 2f03d00ab..b3ebe90be 100644 --- a/cs/remote/src/FASTER.server/PubSub/SubscribeKVBroker.cs +++ b/cs/remote/src/FASTER.server/PubSub/SubscribeKVBroker.cs @@ -28,6 +28,7 @@ public sealed class SubscribeKVBroker : I private AsyncQueue publishQueue; readonly IKeyInputSerializer keyInputSerializer; readonly FasterLog log; + readonly IDevice device; readonly CancellationTokenSource cts = new(); readonly ManualResetEvent done = new(true); bool disposed = false; @@ -41,7 +42,7 @@ public sealed class SubscribeKVBroker : I public SubscribeKVBroker(IKeyInputSerializer keyInputSerializer, string logDir, bool startFresh = true) { this.keyInputSerializer = keyInputSerializer; - var device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false); + device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false); device.Initialize((long)(1 << 30) * 64); log = new FasterLog(new FasterLogSettings { LogDevice = device }); if (startFresh) @@ -101,7 +102,7 @@ internal async Task Start(CancellationToken cancellationToken = default) if (disposed) break; - var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true); + using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true); await iter.WaitAsync(cancellationToken); while (iter.GetNext(out byte[] subscriptionKey, out int entryLength, out long currentAddress, out long nextAddress)) { @@ -255,6 +256,7 @@ public void Dispose() subscriptions?.Clear(); prefixSubscriptions?.Clear(); log.Dispose(); + device.Dispose(); } } } diff --git a/cs/remote/src/FASTER.server/Servers/ServerOptions.cs b/cs/remote/src/FASTER.server/Servers/ServerOptions.cs index af8a0a39f..0c5647ebe 100644 --- a/cs/remote/src/FASTER.server/Servers/ServerOptions.cs +++ b/cs/remote/src/FASTER.server/Servers/ServerOptions.cs @@ -3,6 +3,7 @@ using System; using System.Diagnostics; +using System.IO; using FASTER.core; namespace FASTER.server @@ -62,42 +63,48 @@ public class ServerOptions /// public bool EnablePubSub = true; + /// + /// Constructor + /// + public ServerOptions() + { + } + internal int MemorySizeBits() { long size = ParseSize(MemorySize); - int bits = (int)Math.Floor(Math.Log(size, 2)); - if (size != Math.Pow(2, bits)) + long adjustedSize = PreviousPowerOf2(size); + if (size != adjustedSize) Trace.WriteLine($"Warning: using lower log memory size than specified (power of 2)"); - return bits; + return (int)Math.Log(adjustedSize, 2); } internal int PageSizeBits() { long size = ParseSize(PageSize); - int bits = (int)Math.Ceiling(Math.Log(size, 2)); - if (size != Math.Pow(2, bits)) + long adjustedSize = PreviousPowerOf2(size); + if (size != adjustedSize) Trace.WriteLine($"Warning: using lower page size than specified (power of 2)"); - return bits; + return (int)Math.Log(adjustedSize, 2); } internal int SegmentSizeBits() { long size = ParseSize(SegmentSize); - int bits = (int)Math.Ceiling(Math.Log(size, 2)); - if (size != Math.Pow(2, bits)) + long adjustedSize = PreviousPowerOf2(size); + if (size != adjustedSize) Trace.WriteLine($"Warning: using lower disk segment size than specified (power of 2)"); - return bits; + return (int)Math.Log(adjustedSize, 2); } internal int IndexSizeCachelines() { long size = ParseSize(IndexSize); - int bits = (int)Math.Ceiling(Math.Log(size, 2)); - long adjustedSize = 1L << bits; - if (adjustedSize < 64) throw new Exception("Invalid index size"); + long adjustedSize = PreviousPowerOf2(size); + if (adjustedSize < 64 || adjustedSize > (1L << 37)) throw new Exception("Invalid index size"); if (size != adjustedSize) Trace.WriteLine($"Warning: using lower hash index size than specified (power of 2)"); - return 1 << (bits - 6); + return (int)(adjustedSize / 64); } internal void GetSettings(out LogSettings logSettings, out CheckpointSettings checkpointSettings, out int indexSize) @@ -118,12 +125,50 @@ internal void GetSettings(out LogSettings logSettings, out CheckpointSettings ch indexSize = IndexSizeCachelines(); Trace.WriteLine($"[Store] Using hash index size of {PrettySize(indexSize * 64L)} ({PrettySize(indexSize)} cache lines)"); - var device = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "Store/hlog", preallocateFile: false); + if (LogDir == null) + LogDir = Directory.GetCurrentDirectory(); + + var device = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "/Store/hlog"); logSettings.LogDevice = device; - checkpointSettings = new CheckpointSettings { - CheckPointType = CheckpointType.Snapshot, - CheckpointDir = CheckpointDir ?? (LogDir + "Store/checkpoints"), + checkpointSettings = new CheckpointSettings + { + CheckPointType = CheckpointType.Snapshot, + CheckpointDir = CheckpointDir ?? (LogDir + "/Store/checkpoints"), + RemoveOutdated = true, + }; + } + + internal void GetObjectStoreSettings(out LogSettings objLogSettings, out CheckpointSettings objCheckpointSettings, out int objIndexSize) + { + objLogSettings = new LogSettings { PreallocateLog = false }; + + objLogSettings.PageSizeBits = PageSizeBits(); + Trace.WriteLine($"[Object Store] Using page size of {PrettySize((long)Math.Pow(2, objLogSettings.PageSizeBits))}"); + + objLogSettings.MemorySizeBits = MemorySizeBits(); + Trace.WriteLine($"[Object Store] Using log memory size of {PrettySize((long)Math.Pow(2, objLogSettings.MemorySizeBits))}"); + + Trace.WriteLine($"[Object Store] There are {PrettySize(1 << (objLogSettings.MemorySizeBits - objLogSettings.PageSizeBits))} log pages in memory"); + + objLogSettings.SegmentSizeBits = SegmentSizeBits(); + Trace.WriteLine($"[Object Store] Using disk segment size of {PrettySize((long)Math.Pow(2, objLogSettings.SegmentSizeBits))}"); + + objIndexSize = IndexSizeCachelines() / 64; + Trace.WriteLine($"[Object Store] Using hash index size of {PrettySize(objIndexSize * 64L)} ({PrettySize(objIndexSize)} cache lines)"); + + if (LogDir == null) + LogDir = Directory.GetCurrentDirectory(); + + var device = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "/ObjectStore/hlog"); + objLogSettings.LogDevice = device; + var objdevice = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "/ObjectStore/hlog.obj"); + objLogSettings.ObjectLogDevice = objdevice; + + objCheckpointSettings = new CheckpointSettings + { + CheckPointType = CheckpointType.Snapshot, + CheckpointDir = CheckpointDir ?? (LogDir + "/ObjectStore/checkpoints"), RemoveOutdated = true, }; } @@ -181,5 +226,16 @@ private static string PrettySize(long value) return v.ToString() + suffix[-exp / 3 - 1]; return v.ToString(); } + + private long PreviousPowerOf2(long v) + { + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + return v - (v >> 1); + } } } \ No newline at end of file diff --git a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs index 83b2bbeb9..60c06a731 100644 --- a/cs/remote/src/FASTER.server/Servers/VarLenServer.cs +++ b/cs/remote/src/FASTER.server/Servers/VarLenServer.cs @@ -19,6 +19,7 @@ public sealed class VarLenServer : IDisposable readonly SpanByteFasterKVProvider provider; readonly SubscribeKVBroker> kvBroker; readonly SubscribeBroker> broker; + readonly LogSettings logSettings; /// /// Create server instance; use Start to start the server. @@ -34,7 +35,7 @@ public VarLenServer(ServerOptions opts) if (opts.CheckpointDir != null && !Directory.Exists(opts.CheckpointDir)) Directory.CreateDirectory(opts.CheckpointDir); - opts.GetSettings(out var logSettings, out var checkpointSettings, out var indexSize); + opts.GetSettings(out logSettings, out var checkpointSettings, out var indexSize); store = new FasterKV(indexSize, logSettings, checkpointSettings); if (opts.EnablePubSub) @@ -44,7 +45,7 @@ public VarLenServer(ServerOptions opts) } // Create session provider for VarLen - provider = new SpanByteFasterKVProvider(store, kvBroker, broker, opts.Recover); + provider = new SpanByteFasterKVProvider(store, kvBroker, broker, opts); server = new FasterServer(opts.Address, opts.Port); server.Register(WireFormat.DefaultVarLenKV, provider); @@ -82,6 +83,7 @@ private void InternalDispose() broker?.Dispose(); kvBroker?.Dispose(); store.Dispose(); + logSettings.LogDevice.Dispose(); } private static void DeleteDirectory(string path) diff --git a/cs/remote/src/FASTER.server/StoreWrapper.cs b/cs/remote/src/FASTER.server/StoreWrapper.cs deleted file mode 100644 index b7f96b194..000000000 --- a/cs/remote/src/FASTER.server/StoreWrapper.cs +++ /dev/null @@ -1,29 +0,0 @@ -using FASTER.core; - -namespace FASTER.server -{ - /// - /// Wrapper for store and store-specific information - /// - /// - /// - internal class StoreWrapper - { - public readonly FasterKV store; - - public StoreWrapper(FasterKV store, bool tryRecover) - { - this.store = store; - - if (tryRecover) - { - try - { - store.Recover(); - } - catch - { } - } - } - } -} \ No newline at end of file diff --git a/cs/remote/src/FASTER.server/WebsocketServerSession.cs b/cs/remote/src/FASTER.server/WebsocketServerSession.cs index c23d4bf43..1303cb3cc 100644 --- a/cs/remote/src/FASTER.server/WebsocketServerSession.cs +++ b/cs/remote/src/FASTER.server/WebsocketServerSession.cs @@ -32,7 +32,7 @@ internal unsafe sealed class WebsocketServerSession> subscribeKVBroker; diff --git a/cs/remote/test/FASTER.remote.test/FixedLenBinaryPubSubTests.cs b/cs/remote/test/FASTER.remote.test/FixedLenBinaryPubSubTests.cs index b126e61a7..006c9a464 100644 --- a/cs/remote/test/FASTER.remote.test/FixedLenBinaryPubSubTests.cs +++ b/cs/remote/test/FASTER.remote.test/FixedLenBinaryPubSubTests.cs @@ -26,6 +26,7 @@ public void TearDown() } [Test] + [Repeat(10)] public void SubscribeKVTest() { var f = new FixedLenClientFunctions(); @@ -41,6 +42,7 @@ public void SubscribeKVTest() } [Test] + [Repeat(10)] public void PrefixSubscribeKVTest() { var f = new FixedLenClientFunctions(); @@ -56,6 +58,7 @@ public void PrefixSubscribeKVTest() } [Test] + [Repeat(10)] public void SubscribeTest() { var f = new FixedLenClientFunctions(); @@ -71,6 +74,7 @@ public void SubscribeTest() } [Test] + [Repeat(10)] public void PrefixSubscribeTest() { var f = new FixedLenClientFunctions(); diff --git a/cs/remote/test/FASTER.remote.test/VarLenBinaryPubSubTests.cs b/cs/remote/test/FASTER.remote.test/VarLenBinaryPubSubTests.cs index b58ae0900..db49151f4 100644 --- a/cs/remote/test/FASTER.remote.test/VarLenBinaryPubSubTests.cs +++ b/cs/remote/test/FASTER.remote.test/VarLenBinaryPubSubTests.cs @@ -26,6 +26,7 @@ public void TearDown() } [Test] + [Repeat(10)] public void SubscribeKVTest() { Random r = new Random(23); @@ -48,6 +49,7 @@ public void SubscribeKVTest() } [Test] + [Repeat(10)] public void PSubscribeKVTest() { Random r = new Random(23); @@ -74,6 +76,7 @@ public void PSubscribeKVTest() } [Test] + [Repeat(10)] public void SubscribeTest() { Random r = new Random(23); @@ -95,6 +98,7 @@ public void SubscribeTest() } [Test] + [Repeat(10)] public void PSubscribeTest() { Random r = new Random(23);