Skip to content

Commit

Permalink
[C#] Sample as a memory-only cache (#396)
Browse files Browse the repository at this point in the history
* Initial sample + minor mods to support use case
* cleanup
* added zipf distribution to example. moved to samples folder.
* updates
* Change CopyReadsToTail from bool to enum.
* Add support for CopyReadsToTail to copy to tail when read from ReadOnly region.
* updated sample, use container to store value being reinserted.
* Added support for SubscribeEvictions
* updated sample to ignore tombstones
  • Loading branch information
badrishc authored and darrenge committed Feb 10, 2021
1 parent 8c7963f commit 5aff171
Show file tree
Hide file tree
Showing 21 changed files with 642 additions and 85 deletions.
11 changes: 11 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SecondaryReaderStore", "sam
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VersionedRead", "samples\ReadAddress\VersionedRead.csproj", "{33ED9E1B-1EF0-4984-A07A-7A26C205A446}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MemOnlyCache", "samples\MemOnlyCache\MemOnlyCache.csproj", "{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -225,6 +227,14 @@ Global
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|Any CPU.Build.0 = Release|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|x64.ActiveCfg = Release|Any CPU
{33ED9E1B-1EF0-4984-A07A-7A26C205A446}.Release|x64.Build.0 = Release|Any CPU
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|Any CPU.ActiveCfg = Debug|x64
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|Any CPU.Build.0 = Debug|x64
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|x64.ActiveCfg = Debug|x64
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Debug|x64.Build.0 = Debug|x64
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|Any CPU.ActiveCfg = Release|x64
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|Any CPU.Build.0 = Release|x64
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|x64.ActiveCfg = Release|x64
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -252,6 +262,7 @@ Global
{E2A1C205-4D35-448C-A72F-B9A4AE28EB4E} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{EBE313E5-22D2-4C74-BA1F-16B60404B335} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{33ED9E1B-1EF0-4984-A07A-7A26C205A446} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
15 changes: 15 additions & 0 deletions cs/samples/MemOnlyCache/MemOnlyCache.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFramework>netcoreapp3.1</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifier>win7-x64</RuntimeIdentifier>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>

</Project>
208 changes: 208 additions & 0 deletions cs/samples/MemOnlyCache/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using System;
using System.Diagnostics;
using System.Threading;

#pragma warning disable IDE0079 // Remove unnecessary suppression
#pragma warning disable CS0162 // Unreachable code detected

namespace MemOnlyCache
{
class Program
{
/// <summary>
/// Total database size
/// </summary>
const int DbSize = 10_000_000;

/// <summary>
/// Number of threads accessing FASTER instances
/// </summary>
const int kNumThreads = 1;

/// <summary>
/// Percentage of writes in incoming workload requests (remaining are reads)
/// </summary>
const int WritePercent = 0;

/// <summary>
/// Uniform random distribution (true) or Zipf distribution (false) of requests
/// </summary>
const bool UseUniform = false;

/// <summary>
/// Skew factor (theta) of Zipf distribution
/// </summary>
const double Theta = 0.99;

/// <summary>
/// Whether to upsert the data automatically on a cache miss
/// </summary>
const bool UpsertOnCacheMiss = true;

static FasterKV<CacheKey, CacheValue> h;
static long totalReads = 0;

static void Main()
{
// This sample shows the use of FASTER as a concurrent pure in-memory cache

var log = new NullDevice(); // no storage involved

// Define settings for log
var logSettings = new LogSettings
{
LogDevice = log, ObjectLogDevice = log,
MutableFraction = 0.9, // 10% of memory log is "read-only region"
CopyReadsToTail = CopyReadsToTail.FromReadOnly, // reads in read-only region are copied to tail
PageSizeBits = 14, // Each page is sized at 2^14 bytes
MemorySizeBits = 25, // (2^25 / 24) = ~1.39M key-value pairs (log uses 24 bytes per KV pair)
};

// Number of records in memory, assuming class keys and values and x64 platform
// (8-byte key + 8-byte value + 8-byte header = 24 bytes per record)
int numRecords = (int)(Math.Pow(2, logSettings.MemorySizeBits) / 24);

// Targeting 1 record per bucket
var numBucketBits = (int)Math.Ceiling(Math.Log2(numRecords));

h = new FasterKV<CacheKey, CacheValue>(1L << numBucketBits, logSettings, comparer: new CacheKey());

// Register subscriber to receive notifications of log evictions from memory
h.Log.SubscribeEvictions(new LogObserver());

PopulateStore(numRecords);
ContinuousRandomWorkload();

h.Dispose();

Console.WriteLine("Press <ENTER> to end");
Console.ReadLine();
}

private static void PopulateStore(int count)
{
using var s = h.For(new CacheFunctions()).NewSession<CacheFunctions>();

Random r = new Random(0);
Console.WriteLine("Writing random keys to fill cache");

for (int i = 0; i < count; i++)
{
int k = r.Next(DbSize);
var key = new CacheKey(k);
var value = new CacheValue(k);
s.Upsert(ref key, ref value);
}
}

private static void ContinuousRandomWorkload()
{
var threads = new Thread[kNumThreads];
for (int i = 0; i < kNumThreads; i++)
{
var x = i;
threads[i] = new Thread(() => RandomWorkload(x));
}
for (int i = 0; i < kNumThreads; i++)
threads[i].Start();

Stopwatch sw = new Stopwatch();
sw.Start();
var _lastReads = totalReads;
var _lastTime = sw.ElapsedMilliseconds;
while (true)
{
Thread.Sleep(1000);
var tmp = totalReads;
var tmp2 = sw.ElapsedMilliseconds;

Console.WriteLine("Throughput: {0:0.00}K ops/sec", (_lastReads - tmp) / (double)(_lastTime - tmp2));
_lastReads = tmp;
_lastTime = tmp2;
}
}

private static void RandomWorkload(int threadid)
{
Console.WriteLine("Issuing {0} random read workload of {1} reads from thread {2}", UseUniform ? "uniform" : "zipf", DbSize, threadid);

using var session = h.For(new CacheFunctions()).NewSession<CacheFunctions>();

var rnd = new Random(threadid);
var zipf = new ZipfGenerator(rnd, DbSize, Theta);

int statusNotFound = 0;
int statusFound = 0;
CacheValue output = default;

int i = 0;
while (true)
{
if ((i % 256 == 0) && (i > 0))
{
Interlocked.Add(ref totalReads, 256);
if (i % (1024 * 1024 * 16) == 0) // report after every 16M ops
Console.WriteLine("Hit rate: {0:N2}; Evict count: {1}", statusFound / (double)(statusFound + statusNotFound), LogObserver.EvictCount);
}
int op = WritePercent == 0 ? 0 : rnd.Next(100);
long k = UseUniform ? rnd.Next(DbSize) : zipf.Next();

var key = new CacheKey(k);

if (op < WritePercent)
{
var value = new CacheValue(k);
session.Upsert(ref key, ref value);
}
else
{
var status = session.Read(ref key, ref output);

switch (status)
{
case Status.NOTFOUND:
statusNotFound++;
if (UpsertOnCacheMiss)
{
var value = new CacheValue(k);
session.Upsert(ref key, ref value);
}
break;
case Status.OK:
statusFound++;
if (output.value != key.key)
throw new Exception("Read error!");
break;
default:
throw new Exception("Error!");
}
}
i++;
}
}
}

class LogObserver : IObserver<IFasterScanIterator<CacheKey, CacheValue>>
{
public static int EvictCount = 0;

public void OnCompleted() { }

public void OnError(Exception error) { }

public void OnNext(IFasterScanIterator<CacheKey, CacheValue> iter)
{
int cnt = 0;
while (iter.GetNext(out RecordInfo info, out CacheKey _, out CacheValue _))
{
if (!info.Tombstone) // ignore deleted records being evicted
cnt++;
}
Interlocked.Add(ref EvictCount, cnt);
}
}
}
38 changes: 38 additions & 0 deletions cs/samples/MemOnlyCache/Types.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;

namespace MemOnlyCache
{
public sealed class CacheKey : IFasterEqualityComparer<CacheKey>
{
public long key;

public CacheKey() { }

public CacheKey(long first)
{
key = first;
}

public long GetHashCode64(ref CacheKey key) => Utility.GetHashCode(key.key);

public bool Equals(ref CacheKey k1, ref CacheKey k2) => k1.key == k2.key;
}

public sealed class CacheValue
{
public long value;

public CacheValue(long first)
{
value = first;
}
}

/// <summary>
/// Callback for FASTER operations
/// </summary>
public sealed class CacheFunctions : SimpleFunctions<CacheKey, CacheValue> { }
}
48 changes: 48 additions & 0 deletions cs/samples/MemOnlyCache/ZipfGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;

namespace MemOnlyCache
{
public class ZipfGenerator
{
// Based on "Quickly Generating Billion-Record Synthetic Databases", Jim Gray et al., SIGMOD 1994.
readonly Random rng;
readonly private int size;
readonly double theta;
readonly double zetaN, alpha, cutoff2, eta;

public ZipfGenerator(Random rng, int size, double theta = 0.99)
{
this.rng = rng;
this.size = size;
this.theta = theta;

zetaN = Zeta(size, this.theta);
alpha = 1.0 / (1.0 - this.theta);
cutoff2 = Math.Pow(0.5, this.theta);
var zeta2 = Zeta(2, this.theta);
eta = (1.0 - Math.Pow(2.0 / size, 1.0 - this.theta)) / (1.0 - zeta2 / zetaN);
}

private static double Zeta(int count, double theta)
{
double zetaN = 0.0;
for (var ii = 1; ii <= count; ++ii)
zetaN += 1.0 / Math.Pow(ii, theta);
return zetaN;
}

public int Next()
{
double u = (double)rng.Next(int.MaxValue) / int.MaxValue;
double uz = u * zetaN;
if (uz < 1)
return 0;
if (uz < 1 + cutoff2)
return 1;
return (int)(size * Math.Pow(eta * u - eta + 1, alpha));
}
}
}
Loading

0 comments on commit 5aff171

Please sign in to comment.