Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing FASTER log as a first-class abstraction #177

Merged
merged 45 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
16edd83
Initial checkin
badrishc Sep 17, 2019
3077f52
Updates.
badrishc Sep 17, 2019
853b3ea
Updates
badrishc Sep 17, 2019
6315a14
Cleaned up epochs, improved fine grain scalability.
badrishc Sep 18, 2019
19d5d82
Fixing test change
badrishc Sep 18, 2019
88d7269
Added commit and recovery support.
badrishc Sep 19, 2019
ddcc338
Added TryAppend so users can implement log throttling.
badrishc Sep 19, 2019
2cd85e3
Fasterlog lowmem (#178)
badrishc Sep 26, 2019
ec2a3b5
Fasterlog TryAppend (#179)
badrishc Sep 30, 2019
bb4e357
minor fix
badrishc Sep 30, 2019
4504937
merge
badrishc Sep 30, 2019
b06d112
Fasterlog async (#180)
badrishc Oct 3, 2019
002b993
Merge branch 'master' into fasterlog
badrishc Oct 4, 2019
944504b
Added tailing iterator WaitAsync to wait for iteration to proceed.
badrishc Oct 4, 2019
540d1a5
Merge branch 'fasterlog' of https://github.com/Microsoft/FASTER into …
badrishc Oct 4, 2019
80a2aeb
Convert Span to ReadOnlySpan for appends
badrishc Oct 4, 2019
0050694
Added MemoryPool/IMemoryOwner variant of iterator
badrishc Oct 4, 2019
127e908
Updates
badrishc Oct 4, 2019
6dc7af6
Updated way to pin pooled memory
badrishc Oct 4, 2019
ff27448
Update azure-pipelines.yml
badrishc Oct 4, 2019
8e42a74
Support minimum buffer size of just 1 page!
badrishc Oct 5, 2019
c55de3f
Actually checking in support for 1 page in memory, added initial draf…
badrishc Oct 7, 2019
db68ae0
Added a test
badrishc Oct 7, 2019
5caea66
Improved sample, changed GetMemory to use byte[] instead of Span<byte>
badrishc Oct 7, 2019
0f33d4a
Update next address of iterator if GetNext fails early.
badrishc Oct 8, 2019
2e59b43
Added random read functionality (ReadAsync) for FasterLog. Moved GetM…
badrishc Oct 9, 2019
aa4fef3
Ensure begin addresses commit if needed, even when tail addresses do …
badrishc Oct 9, 2019
dfd683f
changed test project target
badrishc Oct 9, 2019
8dbba0a
reverting test nuget version
badrishc Oct 9, 2019
4d1c9ea
Updated random read example
badrishc Oct 10, 2019
66ee5d3
Merge branch 'master' into fasterlog
badrishc Oct 10, 2019
fd15349
Use TrySetResult instead of SetResult, since log closure moves the ta…
badrishc Oct 10, 2019
f120778
Merge branch 'fasterlog' of https://github.com/Microsoft/FASTER into …
badrishc Oct 10, 2019
15c418b
Added simple version/checksum to commit info.
badrishc Oct 15, 2019
4751080
Added opt-in support for per-entry 8-byte checksum (xor) in header of…
badrishc Oct 16, 2019
70b4c72
Fixing issue with async enqueue.
badrishc Oct 21, 2019
20a7536
Fixed testcase since thread abort not supported on some platforms.
badrishc Oct 21, 2019
64bbe14
Fixing concurrency issue with contiguous partial flush requests. Remo…
badrishc Oct 26, 2019
9435033
Fasterlog exceptions (#189)
badrishc Oct 30, 2019
e940a0a
Added async iterator support
badrishc Oct 30, 2019
78fd56b
Merging
badrishc Oct 30, 2019
8e175e0
Added support for persistent/recoverable named iterators.
badrishc Oct 30, 2019
147006c
Merge branch 'master' into fasterlog
badrishc Oct 30, 2019
53ad95a
Merge branch 'master' into fasterlog
badrishc Oct 30, 2019
819cee0
Merge branch 'master' into fasterlog
badrishc Oct 30, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ variables:
jobs:
- job: 'csharpWindows'
pool:
vmImage: vs2017-win2016
vmImage: windows-latest
displayName: 'C# (Windows)'

strategy:
Expand Down
9 changes: 9 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B1
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -156,6 +158,12 @@ Global
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|Any CPU.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.Build.0 = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -176,6 +184,7 @@ Global
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
{E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
2 changes: 1 addition & 1 deletion cs/benchmark/FASTER.benchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.3.0" />
<PackageReference Include="CommandLineParser" Version="2.6.0" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 6 additions & 0 deletions cs/playground/FasterLogSample/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6" />
</startup>
</configuration>
39 changes: 39 additions & 0 deletions cs/playground/FasterLogSample/FasterLogSample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Platforms>x64</Platforms>
<LangVersion>preview</LangVersion>
<RuntimeIdentifiers>win7-x64;linux-x64</RuntimeIdentifiers>
</PropertyGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>StructSample</RootNamespace>
<ErrorReport>prompt</ErrorReport>
<RestoreProjectStyle>PackageReference</RestoreProjectStyle>
<Prefer32Bit>true</Prefer32Bit>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Debug'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\x64\Debug\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)' == 'Release'">
<DefineConstants>TRACE</DefineConstants>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\x64\Release\</OutputPath>
</PropertyGroup>

<ItemGroup>
<None Include="App.config" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>
</Project>
267 changes: 267 additions & 0 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;

namespace FasterLogSample
{
public class Program
{
// Entry length can be between 1 and ((1 << FasterLogSettings.PageSizeBits) - 4)
const int entryLength = 1 << 10;
static readonly byte[] staticEntry = new byte[entryLength];
static FasterLog log;
static FasterLogScanIterator iter;

/// <summary>
/// Main program entry point
/// </summary>
static void Main()
{
bool sync = true;
var device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
log = new FasterLog(new FasterLogSettings { LogDevice = device });

// Populate entry being inserted
for (int i = 0; i < entryLength; i++)
{
staticEntry[i] = (byte)i;
}

if (sync)
{
// Log writer thread: create as many as needed
new Thread(new ThreadStart(LogWriterThread)).Start();

// Threads for scan, reporting, commit
new Thread(new ThreadStart(ScanThread)).Start();
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();
}
else
{
// Async version of demo: expect lower performance
// particularly for small payload sizes

const int NumParallelTasks = 10_000;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
int local = i;
tasks[i] = Task.Run(() => AsyncLogWriter(local));
}

var scan = Task.Run(() => AsyncScan());

// Threads for reporting, commit
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();

Task.WaitAll(tasks);
Task.WaitAll(scan);
}
}


static void LogWriterThread()
{
while (true)
{
// TryEnqueue - can be used with throttling/back-off
// Accepts byte[] and ReadOnlySpan<byte>
while (!log.TryEnqueue(staticEntry, out _)) ;

// Synchronous blocking enqueue
// Accepts byte[] and ReadOnlySpan<byte>
// log.Enqueue(entry);

// Batched enqueue - batch must fit on one page
// Add this to class:
// static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);
// while (!log.TryEnqueue(spanBatch, out _)) ;
}
}

/// <summary>
/// Async version of enqueue
/// </summary>
static async Task AsyncLogWriter(int id)
{
bool batched = false;

await Task.Yield();

if (!batched)
{
// Single commit version - append each item and wait for commit
// Needs high parallelism (NumParallelTasks) for perf
// Needs separate commit thread to perform regular commit
// Otherwise we commit only at page boundaries
while (true)
{
try
{
await log.EnqueueAndWaitForCommitAsync(staticEntry);
}
catch (Exception ex)
{
Console.WriteLine($"{nameof(AsyncLogWriter)}({id}): {ex}");
}
}
}
else
{
// Batched version - we enqueue many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
{
await log.EnqueueAsync(staticEntry);
if (count++ % 100 == 0)
{
await log.WaitForCommitAsync();
}
}
}
}

static void ScanThread()
{
Random r = new Random();
byte[] result;

using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result, out int length))
{
// For finite end address, check if iteration ended
// if (iter.CurrentAddress >= endAddress) return;
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (Different(result, staticEntry, out int location))
throw new Exception("Invalid entry found");

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Enqueue(result);
}

// Example of random read from given address
// (result, _) = log.ReadAsync(iter.CurrentAddress).GetAwaiter().GetResult();

log.TruncateUntil(iter.NextAddress);
}
}

// Example of recoverable (named) iterator:
// using (iter = log.Scan(log.BeginAddress, long.MaxValue, "foo"))
}

static async Task AsyncScan()
{
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
{
if (Different(result, staticEntry, out int location))
throw new Exception("Invalid entry found");
log.TruncateUntil(iter.NextAddress);
}
}

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

static void CommitThread()
{
//Task<LinkedCommitInfo> prevCommitTask = null;
while (true)
{
Thread.Sleep(5);
log.Commit(true);

// Async version
// await log.CommitAsync();

// Async version that catches all commit failures in between
//try
//{
// prevCommitTask = await log.CommitAsync(prevCommitTask);
//}
//catch (CommitFailureException e)
//{
// Console.WriteLine(e);
// prevCommitTask = e.LinkedCommitInfo.nextTcs.Task;
//}
}
}

private static bool Different(byte[] b1, byte[] b2, out int location)
{
location = 0;
if (b1.Length != b2.Length) return true;
for (location = 0; location < b1.Length; location++)
{
if (b1[location] != b2[location])
{
return true;
}
}
return false;
}

private struct ReadOnlySpanBatch : IReadOnlySpanBatch
{
private readonly int batchSize;
public ReadOnlySpanBatch(int batchSize) => this.batchSize = batchSize;
public ReadOnlySpan<byte> Get(int index) => staticEntry;
public int TotalEntries() => batchSize;
}
}
}
22 changes: 22 additions & 0 deletions cs/playground/FasterLogSample/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyDescription("")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")]
2 changes: 1 addition & 1 deletion cs/playground/StructSampleCore/StructSampleCore.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifiers>win7-x64;linux-x64</RuntimeIdentifiers>
<HighEntropyVA>true</HighEntropyVA>
Expand Down
Loading