Skip to content

Commit

Permalink
[C#] SpanByteFasterKVProvider (#520)
Browse files Browse the repository at this point in the history
* Added SpanByteFasterKVProvider, and changed SpanByteSerializer to SpanByteServerSerializer and SpanByteClientSerializer

* Removed unnecessary functions / debugs

* fixed nit
  • Loading branch information
rohankadekodi-msr authored Jul 9, 2021
1 parent 1b319a5 commit 0504248
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cs/remote/samples/VarLenServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static void VarLenServer(string[] args)
if (opts.Recover) store.Recover();

// This variable-length session provider can be used with compatible clients such as VarLenClient
var provider = new FasterKVProvider<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteSerializer>(store, wp => new SpanByteFunctionsForServer<long>(wp), new SpanByteSerializer());
var provider = new SpanByteFasterKVProvider(store);

// Create server
var server = new FasterServer(opts.Address, opts.Port);
Expand Down
47 changes: 47 additions & 0 deletions cs/remote/src/FASTER.server/SpanByteClientSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using FASTER.common;
using FASTER.core;
using System.Buffers;

namespace FASTER.client
{
/// <summary>
/// Serializer for SpanByte (can be used on client side)
/// </summary>
public unsafe class SpanByteClientSerializer : IClientSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>
{
readonly MemoryPool<byte> memoryPool;

/// <summary>
/// Constructor
/// </summary>
/// <param name="memoryPool"></param>
public SpanByteClientSerializer(MemoryPool<byte> memoryPool = default)
{
this.memoryPool = memoryPool ?? MemoryPool<byte>.Shared;
}

/// <inheritdoc />
public SpanByteAndMemory ReadOutput(ref byte* src)
{
int length = *(int*)src;
var mem = memoryPool.Rent(length);
new ReadOnlySpan<byte>(src + sizeof(int), length).CopyTo(mem.Memory.Span);
src += length + sizeof(int);
return new SpanByteAndMemory(mem, length);
}

/// <inheritdoc />
public bool Write(ref SpanByte k, ref byte* dst, int length)
{
var len = k.TotalSize;
if (length < len) return false;
k.CopyTo(dst);
dst += len;
return true;
}
}
}
47 changes: 47 additions & 0 deletions cs/remote/src/FASTER.server/SpanByteFasterKVProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;
using FASTER.common;
using FASTER.core;

namespace FASTER.server
{

/// <summary>
/// Session provider for FasterKV store based on
/// [K, V, I, O, C] = [SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long]
/// </summary>
public sealed class SpanByteFasterKVProvider : ISessionProvider, IDisposable
{
readonly FasterKV<SpanByte, SpanByte> store;
readonly SpanByteServerSerializer serializer;
readonly MaxSizeSettings maxSizeSettings;

/// <summary>
/// Create SpanByte FasterKV backend
/// </summary>
/// <param name="store"></param>
/// <param name="maxSizeSettings"></param>
public SpanByteFasterKVProvider(FasterKV<SpanByte, SpanByte> store, MaxSizeSettings maxSizeSettings = default)
{
this.store = store;
this.serializer = new SpanByteServerSerializer();
this.maxSizeSettings = maxSizeSettings ?? new MaxSizeSettings();
}

/// <inheritdoc />
public IServerSession GetSession(WireFormat wireFormat, Socket socket)
{
return new BinaryServerSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteServerSerializer>
(socket, store, new SpanByteFunctionsForServer<long>(wireFormat), serializer, maxSizeSettings);
}

/// <inheritdoc />
public void Dispose()
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
namespace FASTER.server
{
/// <summary>
/// Serializer for SpanByte
/// Used only on server-side, but we add client-side serializer API for completeness
/// Serializer for SpanByte. Used only on server-side.
/// </summary>
public unsafe sealed class SpanByteSerializer : IServerSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>, IClientSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>
public unsafe sealed class SpanByteServerSerializer : IServerSerializer<SpanByte, SpanByte, SpanByte, SpanByteAndMemory>
{
readonly SpanByteVarLenStruct settings;
readonly int keyLength;
readonly int valueLength;

[ThreadStatic]
static SpanByteAndMemory output;

Expand All @@ -26,9 +24,8 @@ public unsafe sealed class SpanByteSerializer : IServerSerializer<SpanByte, Span
/// </summary>
/// <param name="maxKeyLength">Max key length</param>
/// <param name="maxValueLength">Max value length</param>
public SpanByteSerializer(int maxKeyLength = 512, int maxValueLength = 512)
public SpanByteServerSerializer(int maxKeyLength = 512, int maxValueLength = 512)
{
settings = new SpanByteVarLenStruct();
keyLength = maxKeyLength;
valueLength = maxValueLength;
}
Expand All @@ -38,7 +35,7 @@ public SpanByteSerializer(int maxKeyLength = 512, int maxValueLength = 512)
public ref SpanByte ReadKeyByRef(ref byte* src)
{
ref var ret = ref Unsafe.AsRef<SpanByte>(src);
src += settings.GetLength(ref ret);
src += ret.TotalSize;
return ref ret;
}

Expand All @@ -47,7 +44,7 @@ public ref SpanByte ReadKeyByRef(ref byte* src)
public ref SpanByte ReadValueByRef(ref byte* src)
{
ref var ret = ref Unsafe.AsRef<SpanByte>(src);
src += settings.GetLength(ref ret);
src += ret.TotalSize;
return ref ret;
}

Expand All @@ -56,20 +53,10 @@ public ref SpanByte ReadValueByRef(ref byte* src)
public ref SpanByte ReadInputByRef(ref byte* src)
{
ref var ret = ref Unsafe.AsRef<SpanByte>(src);
src += settings.GetLength(ref ret);
src += ret.TotalSize;
return ref ret;
}

/// <inheritdoc />
public bool Write(ref SpanByte k, ref byte* dst, int length)
{
var len = settings.GetLength(ref k);
if (length < len) return false;
Buffer.MemoryCopy(Unsafe.AsPointer(ref k), dst, len, len);
dst += len;
return true;
}

/// <inheritdoc />
public bool Write(ref SpanByteAndMemory k, ref byte* dst, int length)
{
Expand All @@ -94,15 +81,6 @@ public ref SpanByteAndMemory AsRefOutput(byte* src, int length)
/// <inheritdoc />
public void SkipOutput(ref byte* src) => src += (*(int*)src) + sizeof(int);

/// <inheritdoc />
public SpanByteAndMemory ReadOutput(ref byte* src)
{
int length = *(int*)src;
var _output = SpanByteAndMemory.FromFixedSpan(new Span<byte>(src, length + sizeof(int)));
src += length + sizeof(int);
return _output;
}

/// <inheritdoc />
public int GetLength(ref SpanByteAndMemory o) => o.Length;
}
Expand Down
2 changes: 1 addition & 1 deletion cs/remote/test/FASTER.remote.test/VarLenServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public VarLenServer(string folderName, string address = "127.0.0.1", int port =
store = new FasterKV<SpanByte, SpanByte>(indexSize, logSettings, checkpointSettings);

// Create session provider for VarLen
var provider = new FasterKVProvider<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, SpanByteFunctionsForServer<long>, SpanByteSerializer>(store, wp => new SpanByteFunctionsForServer<long>(wp), new SpanByteSerializer());
var provider = new SpanByteFasterKVProvider(store);

server = new FasterServer(address, port);
server.Register(WireFormat.DefaultVarLenKV, provider);
Expand Down
13 changes: 13 additions & 0 deletions cs/src/core/VarLen/SpanByteAndMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ public SpanByteAndMemory(IMemoryOwner<byte> memory)
Memory = memory;
}

/// <summary>
/// Constructor using given IMemoryOwner and length
/// </summary>
/// <param name="memory"></param>
/// <param name="length"></param>
public SpanByteAndMemory(IMemoryOwner<byte> memory, int length)
{
SpanByte = default;
SpanByte.Invalid = true;
Memory = memory;
SpanByte.Length = length;
}

/// <summary>
/// View a fixed Span&lt;byte&gt; as a SpanByteAndMemory
/// </summary>
Expand Down

0 comments on commit 0504248

Please sign in to comment.