Skip to content

Commit

Permalink
[C#] Client-server cleanup (#553)
Browse files Browse the repository at this point in the history
* Cleanup of server

* More cleanup

* Proper dispose of broker

* updates
  • Loading branch information
badrishc authored Aug 31, 2021
1 parent 6e13ce3 commit dc674cf
Show file tree
Hide file tree
Showing 33 changed files with 895 additions and 611 deletions.
45 changes: 6 additions & 39 deletions cs/remote/samples/FixedLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,30 @@

using System;
using System.Threading;
using ServerOptions;
using CommandLine;
using FASTER.core;
using FasterServerOptions;
using FASTER.server;
using FASTER.common;
using System.Diagnostics;

namespace FixedLenServer
namespace FasterFixedLenServer
{
/// <summary>
/// This sample creates a FASTER server for fixed-length (struct) keys and values
/// Sample server for fixed-length (blittable) keys and values.
/// Types are defined in Types.cs; they are 8-byte keys and values in the sample.
/// A binary wire protocol is used.
/// </summary>
class Program
{
static void Main(string[] args)
{
FixedLenServer(args);
}
Trace.Listeners.Add(new ConsoleTraceListener());

static void FixedLenServer(string[] args)
{
Console.WriteLine("FASTER fixed-length (binary) KV server");

ParserResult<Options> result = Parser.Default.ParseArguments<Options>(args);
if (result.Tag == ParserResultType.NotParsed) return;
var opts = result.MapResult(o => o, xs => new Options());

opts.GetSettings(out var logSettings, out var checkpointSettings, out var indexSize);

// We use blittable structs Key and Value to construct a customized server for fixed-length types
var store = new FasterKV<Key, Value>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

SubscribeKVBroker<Key, Value, Input, IKeyInputSerializer<Key, Input>> kvBroker = null;
SubscribeBroker<Key, Value, IKeySerializer<Key>> broker = null;

if (opts.EnablePubSub)
{
// Create a broker for pub-sub of key-value pairs in remote FASTER instance
kvBroker = new SubscribeKVBroker<Key, Value, Input, IKeyInputSerializer<Key, Input>>(new FixedLenKeyInputSerializer<Key, Input>(), opts.LogDir, true);
// Create a broker for pub-sub of key-value pairs
broker = new SubscribeBroker<Key, Value, IKeySerializer<Key>>(new FixedLenKeySerializer<Key>(), null, true);
}

// This fixed-length session provider can be used with compatible clients such as FixedLenClient and FASTER.benchmark
// Uses FixedLenSerializer as our in-built serializer for blittable (fixed length) types
var provider = new FasterKVProvider<Key, Value, Input, Output, Functions, FixedLenSerializer<Key, Value, Input, Output>>(store, e => new Functions(), kvBroker, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);

// Register session provider for WireFormat.DefaultFixedLenKV
// You can register multiple session providers with the same server, with different wire protocol specifications
server.Register(WireFormat.DefaultFixedLenKV, provider);

// Start server
using var server = new FixedLenServer<Key, Value, Input, Output, Functions>(opts.GetServerOptions(), e => new Functions());
server.Start();
Console.WriteLine("Started server");

Expand Down
2 changes: 1 addition & 1 deletion cs/remote/samples/FixedLenServer/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using System.Threading;
using FASTER.core;

namespace FixedLenServer
namespace FasterFixedLenServer
{
[StructLayout(LayoutKind.Explicit, Size = 8)]
public struct Key : IFasterEqualityComparer<Key>
Expand Down
132 changes: 15 additions & 117 deletions cs/remote/samples/VarLenServer/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
// Licensed under the MIT license.

using CommandLine;
using System;
using FASTER.core;
using FASTER.server;

namespace ServerOptions
namespace FasterServerOptions
{
class Options
{
Expand Down Expand Up @@ -39,122 +38,21 @@ class Options
[Option("pubsub", Required = false, Default = true, HelpText = "Enable pub/sub feature on server.")]
public bool EnablePubSub { get; set; }

public int MemorySizeBits()
public ServerOptions GetServerOptions()
{
long size = ParseSize(MemorySize);
int bits = (int)Math.Floor(Math.Log2(size));
Console.WriteLine($"Using log memory size of {PrettySize((long)Math.Pow(2, bits))}");
if (size != Math.Pow(2, bits))
Console.WriteLine($"Warning: using lower log memory size than specified (power of 2)");
return bits;
}

public int PageSizeBits()
{
long size = ParseSize(PageSize);
int bits = (int)Math.Ceiling(Math.Log2(size));
Console.WriteLine($"Using page size of {PrettySize((long)Math.Pow(2, bits))}");
if (size != Math.Pow(2, bits))
Console.WriteLine($"Warning: using lower page size than specified (power of 2)");
return bits;
}

public int SegmentSizeBits()
{
long size = ParseSize(SegmentSize);
int bits = (int)Math.Ceiling(Math.Log2(size));
Console.WriteLine($"Using disk segment size of {PrettySize((long)Math.Pow(2, bits))}");
if (size != Math.Pow(2, bits))
Console.WriteLine($"Warning: using lower disk segment size than specified (power of 2)");
return bits;
}

public int IndexSizeCachelines()
{
long size = ParseSize(IndexSize);
int bits = (int)Math.Ceiling(Math.Log2(size));
long adjustedSize = 1L << bits;
if (adjustedSize < 64) throw new Exception("Invalid index size");
Console.WriteLine($"Using hash index size of {PrettySize(adjustedSize)} ({PrettySize(adjustedSize/64)} cache lines)");
if (size != adjustedSize)
Console.WriteLine($"Warning: using lower hash index size than specified (power of 2)");
return 1 << (bits - 6);
}

public void GetSettings(out LogSettings logSettings, out CheckpointSettings checkpointSettings, out int indexSize)
{
logSettings = new LogSettings { PreallocateLog = false };

logSettings.PageSizeBits = PageSizeBits();
logSettings.MemorySizeBits = MemorySizeBits();
Console.WriteLine($"There are {PrettySize(1 << (logSettings.MemorySizeBits - logSettings.PageSizeBits))} log pages in memory");
logSettings.SegmentSizeBits = SegmentSizeBits();
indexSize = IndexSizeCachelines();

var device = LogDir == "" ? new NullDevice() : Devices.CreateLogDevice(LogDir + "/hlog", preallocateFile: false);
logSettings.LogDevice = device;

if (CheckpointDir == null && LogDir == null)
checkpointSettings = null;
else
checkpointSettings = new CheckpointSettings {
CheckPointType = CheckpointType.FoldOver,
CheckpointDir = CheckpointDir ?? (LogDir + "/checkpoints")
};
}

static long ParseSize(string value)
{
char[] suffix = new char[] { 'k', 'm', 'g', 't', 'p' };
long result = 0;
foreach (char c in value)
{
if (char.IsDigit(c))
{
result = result * 10 + (byte)c - '0';
}
else
{
for (int i = 0; i < suffix.Length; i++)
{
if (char.ToLower(c) == suffix[i])
{
result *= (long)Math.Pow(1024, i + 1);
return result;
}
}
}
}
return result;
}

public static string PrettySize(long value)
{
char[] suffix = new char[] { 'k', 'm', 'g', 't', 'p' };
double v = value;
int exp = 0;
while (v - Math.Floor(v) > 0)
{
if (exp >= 18)
break;
exp += 3;
v *= 1024;
v = Math.Round(v, 12);
}

while (Math.Floor(v).ToString().Length > 3)
return new ServerOptions
{
if (exp <= -18)
break;
exp -= 3;
v /= 1024;
v = Math.Round(v, 12);
}
if (exp > 0)
return v.ToString() + suffix[exp / 3 - 1];
else if (exp < 0)
return v.ToString() + suffix[-exp / 3 - 1];
return v.ToString();
Port = Port,
Address = Address,
MemorySize = MemorySize,
PageSize = PageSize,
IndexSize = IndexSize,
SegmentSize = SegmentSize,
LogDir = LogDir,
CheckpointDir = CheckpointDir,
Recover = Recover,
EnablePubSub = EnablePubSub,
};
}
}
}
47 changes: 6 additions & 41 deletions cs/remote/samples/VarLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,28 @@
using System;
using System.Threading;
using CommandLine;
using ServerOptions;
using FASTER.core;
using FasterServerOptions;
using FASTER.server;
using FASTER.common;
using System.Diagnostics;

namespace VarLenServer
namespace FasterVarLenServer
{
/// <summary>
/// Server for variable-length keys and values.
/// Sample server for variable-length keys and values.
/// </summary>
class Program
{
static void Main(string[] args)
{
VarLenServer(args);
}

Trace.Listeners.Add(new ConsoleTraceListener());

static void VarLenServer(string[] args)
{
Console.WriteLine("FASTER variable-length KV server");

ParserResult<Options> result = Parser.Default.ParseArguments<Options>(args);
if (result.Tag == ParserResultType.NotParsed) return;
var opts = result.MapResult(o => o, xs => new Options());

opts.GetSettings(out var logSettings, out var checkpointSettings, out var indexSize);

// Create a new instance of the FasterKV, customized for variable-length blittable data (represented by SpanByte)
// With SpanByte, keys and values are stored inline in the FASTER log as [ 4 byte length | payload ]
var store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);
if (opts.Recover) store.Recover();

SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker = null;
SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null;

if (opts.EnablePubSub)
{
// Create a broker for pub-sub of key value-pairs in FASTER instance
kvBroker = new SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>>(new SpanByteKeyInputSerializer(), opts.LogDir, true);
// Create a broker for topic-based pub-sub of key-value pairs
broker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, true);
}

// This variable-length session provider can be used with compatible clients such as VarLenClient
var provider = new SpanByteFasterKVProvider(store, kvBroker, broker);

// Create server
var server = new FasterServer(opts.Address, opts.Port);

// Register provider as backend provider for WireFormat.DefaultFixedLenKV
// You can register multiple providers with the same server, with different wire protocol specifications
server.Register(WireFormat.DefaultVarLenKV, provider);
// Register provider as backend provider for WireFormat.WebSocket
server.Register(WireFormat.WebSocket, provider);

// Start server
using var server = new VarLenServer(opts.GetServerOptions());
server.Start();
Console.WriteLine("Started server");

Expand Down
2 changes: 1 addition & 1 deletion cs/remote/src/FASTER.client/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ internal void ProcessReplies(byte[] buf, int offset)
var src = b;
var seqNo = ((BatchHeader*)src)->SeqNo;
var count = ((BatchHeader*)src)->NumMessages;
if (seqNo != lastSeqNo + 1)
if (seqNo != lastSeqNo + 1 && !subscriptionSession)
throw new Exception("Out of order message within session");
lastSeqNo = seqNo;

Expand Down
16 changes: 1 addition & 15 deletions cs/remote/src/FASTER.common/IKeyInputSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,13 @@ namespace FASTER.common
/// </summary>
/// <typeparam name="Key">Key</typeparam>
/// <typeparam name="Input">Input</typeparam>
public unsafe interface IKeyInputSerializer<Key, Input>
public unsafe interface IKeyInputSerializer<Key, Input> : IKeySerializer<Key>
{
/// <summary>
/// Read key by reference, from given location
/// </summary>
/// <param name="src">Memory location</param>
/// <returns>Key</returns>
ref Key ReadKeyByRef(ref byte* src);

/// <summary>
/// Read input by reference, from given location
/// </summary>
/// <param name="src">Memory location</param>
/// <returns>Input</returns>
ref Input ReadInputByRef(ref byte* src);

/// <summary>
/// Match pattern with key used for pub-sub
/// </summary>
/// <param name="k">key to be published</param>
/// <param name="pattern">pattern to check</param>
bool Match(ref Key k, ref Key pattern);
}
}
Loading

0 comments on commit dc674cf

Please sign in to comment.