-
Notifications
You must be signed in to change notification settings - Fork 569
/
Program.cs
140 lines (119 loc) · 4.88 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
namespace FasterKVAsyncSample
{
public class Program
{
static FasterKV<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions> faster;
static int numOps = 0;
/// <summary>
/// Main program entry point
/// </summary>
static void Main()
{
var path = "FasterKVAsyncSample";
var log = Devices.CreateLogDevice(path + "hlog.log", deleteOnClose: true);
var objlog = Devices.CreateLogDevice(path + "hlog.obj.log", deleteOnClose: true);
var logSettings = new LogSettings { LogDevice = log, ObjectLogDevice = objlog };
var checkpointSettings = new CheckpointSettings { CheckpointDir = path, CheckPointType = CheckpointType.FoldOver };
var serializerSettings = new SerializerSettings<CacheKey, CacheValue> { keySerializer = () => new CacheKeySerializer(), valueSerializer = () => new CacheValueSerializer() };
faster = new FasterKV<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions>
(1L << 20, new CacheFunctions(), logSettings, checkpointSettings, serializerSettings);
const int NumParallelTasks = 1;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};
Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
int local = i;
tasks[i] = Task.Run(() => AsyncOperator(local));
}
// Threads for reporting, commit
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();
Task.WaitAll(tasks);
}
/// <summary>
/// Async operations on FasterKV
/// </summary>
static async Task AsyncOperator(int id)
{
using var session = faster.NewSession(id.ToString());
Random rand = new Random(id);
bool batched = true;
await Task.Yield();
if (!batched)
{
// Single commit version - append each item and wait for commit
// Needs high parallelism (NumParallelTasks) for perf
// Needs separate commit thread to perform regular commit
// Otherwise we commit only at page boundaries
while (true)
{
try
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()), true);
Interlocked.Increment(ref numOps);
}
catch (Exception ex)
{
Console.WriteLine($"{nameof(AsyncOperator)}({id}): {ex}");
}
}
}
else
{
// Batched version - we enqueue many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()));
if (count++ % 100 == 0)
{
await session.WaitForCommitAsync();
Interlocked.Add(ref numOps, 100);
}
}
}
}
static void ReportThread()
{
long lastTime = 0;
long lastValue = numOps;
Stopwatch sw = new Stopwatch();
sw.Start();
while (true)
{
Thread.Sleep(5000);
var nowTime = sw.ElapsedMilliseconds;
var nowValue = numOps;
Console.WriteLine("Operation Throughput: {0} ops/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), faster.Log.TailAddress);
lastValue = nowValue;
lastTime = nowTime;
}
}
static void CommitThread()
{
//Task<LinkedCommitInfo> prevCommitTask = null;
while (true)
{
Thread.Sleep(5000);
faster.TakeFullCheckpoint(out _);
faster.CompleteCheckpointAsync().GetAwaiter().GetResult();
}
}
}
}