Skip to content

Commit

Permalink
Exposing FASTER log as a first-class abstraction (#177)
Browse files Browse the repository at this point in the history
* Initial checkin
* Cleaned up epochs, improved fine grain scalability.
* Added commit and recovery support.
* Added TryAppend so users can implement log throttling.
* Fasterlog lowmem (#178)
Adding support for low memory footprint (4 pages)
Added support for odd-sized payloads in presence of holes in log
Fixed concurrency issue that occurs with low num of pages
Improved max throughput by eliminating a 10ms sleep in BlockAllocate
Misc cleanup of logic to track flush and close addresses in log
* Fasterlog TryAppend (#179)
Adding truly non-blocking TryAppend functionality. See sample for how this is used.
* Fasterlog async (#180)
* Added support for TryAppend. Removed List-based batch support.
* Added non-blocking TryAppend
* Added span variant
* Fix definition of SecondChanceFraction for read cache, to be 1 - MutableFraction of the log.
* Added async FlushAndCommit
* Added batched version by separating out in-memory append and wait for commit - gives better perf as the first operation is usually sync
* Tweak async sample to get back to 2GB/sec
* Other updates:
1) Allocations can handle thousands of parallel tasks
2) Removed concept of negative address - allocations are always over available pages
3) Improved scan interface to allow user memory pooling
4) Exposed commit task
5) Cleaned up sample
* Added check for entry fitting on single page
* Added batch interface (sync and async) to log append.
* Added tailing iterator WaitAsync to wait for iteration to proceed.
* Convert Span to ReadOnlySpan for appends
* Added MemoryPool/IMemoryOwner variant of iterator
* Updated way to pin pooled memory
* Update azure-pipelines.yml
* Support minimum buffer size of just 1 page!
* Actually checking in support for 1 page in memory, added initial draft of disposing task
* Added a test
* Improved sample, changed GetMemory to use byte[] instead of Span<byte>
* Update next address of iterator if GetNext fails early.
* Added random read functionality (ReadAsync) for FasterLog. Moved GetMemory to FasterLogSettings instead of Scan. Speed up TruncateUntil. Updated nuspec.
* Ensure begin addresses commit if needed, even when tail addresses do not change. Added CommittedBeginAddress metric.
* changed test project target
* Updated random read example
* Use TrySetResult instead of SetResult, since log closure moves the task to completed state.
* Added simple version/checksum to commit info.
* Added opt-in support for per-entry 8-byte checksum (xor) in header of entry.
* Fixing issue with async enqueue.
* Fixed testcase since thread abort not supported on some platforms.
* Fixing concurrency issue with contiguous partial flush requests. Removed spin-wait for adjacent flush completion.
* Fasterlog exceptions (#189)
* Added storage exception handling, connecting to tasks.
* Cleanup of error handling, control when exception is bubbled up to user.
* Added yield in NeedToWait
* Improved iterator support in case of exception
* Added async iterator support
* Added support for persistent/recoverable named iterators.
  • Loading branch information
badrishc authored Oct 30, 2019
1 parent cedd263 commit 4d90dac
Show file tree
Hide file tree
Showing 43 changed files with 3,290 additions and 581 deletions.
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ variables:
jobs:
- job: 'csharpWindows'
pool:
vmImage: vs2017-win2016
vmImage: windows-latest
displayName: 'C# (Windows)'

strategy:
Expand Down
9 changes: 9 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B1
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -156,6 +158,12 @@ Global
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|Any CPU.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.Build.0 = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -176,6 +184,7 @@ Global
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
{E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
2 changes: 1 addition & 1 deletion cs/benchmark/FASTER.benchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.3.0" />
<PackageReference Include="CommandLineParser" Version="2.6.0" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 6 additions & 0 deletions cs/playground/FasterLogSample/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6" />
</startup>
</configuration>
39 changes: 39 additions & 0 deletions cs/playground/FasterLogSample/FasterLogSample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Platforms>x64</Platforms>
<LangVersion>preview</LangVersion>
<RuntimeIdentifiers>win7-x64;linux-x64</RuntimeIdentifiers>
</PropertyGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>StructSample</RootNamespace>
<ErrorReport>prompt</ErrorReport>
<RestoreProjectStyle>PackageReference</RestoreProjectStyle>
<Prefer32Bit>true</Prefer32Bit>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Debug'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\x64\Debug\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)' == 'Release'">
<DefineConstants>TRACE</DefineConstants>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\x64\Release\</OutputPath>
</PropertyGroup>

<ItemGroup>
<None Include="App.config" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>
</Project>
267 changes: 267 additions & 0 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;

namespace FasterLogSample
{
public class Program
{
// Entry length can be between 1 and ((1 << FasterLogSettings.PageSizeBits) - 4)
const int entryLength = 1 << 10;
static readonly byte[] staticEntry = new byte[entryLength];
static FasterLog log;
static FasterLogScanIterator iter;

/// <summary>
/// Main program entry point
/// </summary>
static void Main()
{
bool sync = true;
var device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
log = new FasterLog(new FasterLogSettings { LogDevice = device });

// Populate entry being inserted
for (int i = 0; i < entryLength; i++)
{
staticEntry[i] = (byte)i;
}

if (sync)
{
// Log writer thread: create as many as needed
new Thread(new ThreadStart(LogWriterThread)).Start();

// Threads for scan, reporting, commit
new Thread(new ThreadStart(ScanThread)).Start();
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();
}
else
{
// Async version of demo: expect lower performance
// particularly for small payload sizes

const int NumParallelTasks = 10_000;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
int local = i;
tasks[i] = Task.Run(() => AsyncLogWriter(local));
}

var scan = Task.Run(() => AsyncScan());

// Threads for reporting, commit
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();

Task.WaitAll(tasks);
Task.WaitAll(scan);
}
}


static void LogWriterThread()
{
while (true)
{
// TryEnqueue - can be used with throttling/back-off
// Accepts byte[] and ReadOnlySpan<byte>
while (!log.TryEnqueue(staticEntry, out _)) ;

// Synchronous blocking enqueue
// Accepts byte[] and ReadOnlySpan<byte>
// log.Enqueue(entry);

// Batched enqueue - batch must fit on one page
// Add this to class:
// static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);
// while (!log.TryEnqueue(spanBatch, out _)) ;
}
}

/// <summary>
/// Async version of enqueue
/// </summary>
static async Task AsyncLogWriter(int id)
{
bool batched = false;

await Task.Yield();

if (!batched)
{
// Single commit version - append each item and wait for commit
// Needs high parallelism (NumParallelTasks) for perf
// Needs separate commit thread to perform regular commit
// Otherwise we commit only at page boundaries
while (true)
{
try
{
await log.EnqueueAndWaitForCommitAsync(staticEntry);
}
catch (Exception ex)
{
Console.WriteLine($"{nameof(AsyncLogWriter)}({id}): {ex}");
}
}
}
else
{
// Batched version - we enqueue many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
{
await log.EnqueueAsync(staticEntry);
if (count++ % 100 == 0)
{
await log.WaitForCommitAsync();
}
}
}
}

static void ScanThread()
{
Random r = new Random();
byte[] result;

using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result, out int length))
{
// For finite end address, check if iteration ended
// if (iter.CurrentAddress >= endAddress) return;
iter.WaitAsync().GetAwaiter().GetResult();
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (Different(result, staticEntry, out int location))
throw new Exception("Invalid entry found");

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Enqueue(result);
}

// Example of random read from given address
// (result, _) = log.ReadAsync(iter.CurrentAddress).GetAwaiter().GetResult();

log.TruncateUntil(iter.NextAddress);
}
}

// Example of recoverable (named) iterator:
// using (iter = log.Scan(log.BeginAddress, long.MaxValue, "foo"))
}

static async Task AsyncScan()
{
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
{
if (Different(result, staticEntry, out int location))
throw new Exception("Invalid entry found");
log.TruncateUntil(iter.NextAddress);
}
}

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

static void CommitThread()
{
//Task<LinkedCommitInfo> prevCommitTask = null;
while (true)
{
Thread.Sleep(5);
log.Commit(true);

// Async version
// await log.CommitAsync();

// Async version that catches all commit failures in between
//try
//{
// prevCommitTask = await log.CommitAsync(prevCommitTask);
//}
//catch (CommitFailureException e)
//{
// Console.WriteLine(e);
// prevCommitTask = e.LinkedCommitInfo.nextTcs.Task;
//}
}
}

private static bool Different(byte[] b1, byte[] b2, out int location)
{
location = 0;
if (b1.Length != b2.Length) return true;
for (location = 0; location < b1.Length; location++)
{
if (b1[location] != b2[location])
{
return true;
}
}
return false;
}

private struct ReadOnlySpanBatch : IReadOnlySpanBatch
{
private readonly int batchSize;
public ReadOnlySpanBatch(int batchSize) => this.batchSize = batchSize;
public ReadOnlySpan<byte> Get(int index) => staticEntry;
public int TotalEntries() => batchSize;
}
}
}
22 changes: 22 additions & 0 deletions cs/playground/FasterLogSample/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyDescription("")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")]
2 changes: 1 addition & 1 deletion cs/playground/StructSampleCore/StructSampleCore.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifiers>win7-x64;linux-x64</RuntimeIdentifiers>
<HighEntropyVA>true</HighEntropyVA>
Expand Down
Loading

0 comments on commit 4d90dac

Please sign in to comment.