Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing FASTER log as a first-class abstraction #177

Merged
merged 45 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
16edd83
Initial checkin
badrishc Sep 17, 2019
3077f52
Updates.
badrishc Sep 17, 2019
853b3ea
Updates
badrishc Sep 17, 2019
6315a14
Cleaned up epochs, improved fine grain scalability.
badrishc Sep 18, 2019
19d5d82
Fixing test change
badrishc Sep 18, 2019
88d7269
Added commit and recovery support.
badrishc Sep 19, 2019
ddcc338
Added TryAppend so users can implement log throttling.
badrishc Sep 19, 2019
2cd85e3
Fasterlog lowmem (#178)
badrishc Sep 26, 2019
ec2a3b5
Fasterlog TryAppend (#179)
badrishc Sep 30, 2019
bb4e357
minor fix
badrishc Sep 30, 2019
4504937
merge
badrishc Sep 30, 2019
b06d112
Fasterlog async (#180)
badrishc Oct 3, 2019
002b993
Merge branch 'master' into fasterlog
badrishc Oct 4, 2019
944504b
Added tailing iterator WaitAsync to wait for iteration to proceed.
badrishc Oct 4, 2019
540d1a5
Merge branch 'fasterlog' of https://github.com/Microsoft/FASTER into …
badrishc Oct 4, 2019
80a2aeb
Convert Span to ReadOnlySpan for appends
badrishc Oct 4, 2019
0050694
Added MemoryPool/IMemoryOwner variant of iterator
badrishc Oct 4, 2019
127e908
Updates
badrishc Oct 4, 2019
6dc7af6
Updated way to pin pooled memory
badrishc Oct 4, 2019
ff27448
Update azure-pipelines.yml
badrishc Oct 4, 2019
8e42a74
Support minimum buffer size of just 1 page!
badrishc Oct 5, 2019
c55de3f
Actually checking in support for 1 page in memory, added initial draf…
badrishc Oct 7, 2019
db68ae0
Added a test
badrishc Oct 7, 2019
5caea66
Improved sample, changed GetMemory to use byte[] instead of Span<byte>
badrishc Oct 7, 2019
0f33d4a
Update next address of iterator if GetNext fails early.
badrishc Oct 8, 2019
2e59b43
Added random read functionality (ReadAsync) for FasterLog. Moved GetM…
badrishc Oct 9, 2019
aa4fef3
Ensure begin addresses commit if needed, even when tail addresses do …
badrishc Oct 9, 2019
dfd683f
changed test project target
badrishc Oct 9, 2019
8dbba0a
reverting test nuget version
badrishc Oct 9, 2019
4d1c9ea
Updated random read example
badrishc Oct 10, 2019
66ee5d3
Merge branch 'master' into fasterlog
badrishc Oct 10, 2019
fd15349
Use TrySetResult instead of SetResult, since log closure moves the ta…
badrishc Oct 10, 2019
f120778
Merge branch 'fasterlog' of https://github.com/Microsoft/FASTER into …
badrishc Oct 10, 2019
15c418b
Added simple version/checksum to commit info.
badrishc Oct 15, 2019
4751080
Added opt-in support for per-entry 8-byte checksum (xor) in header of…
badrishc Oct 16, 2019
70b4c72
Fixing issue with async enqueue.
badrishc Oct 21, 2019
20a7536
Fixed testcase since thread abort not supported on some platforms.
badrishc Oct 21, 2019
64bbe14
Fixing concurrency issue with contiguous partial flush requests. Remo…
badrishc Oct 26, 2019
9435033
Fasterlog exceptions (#189)
badrishc Oct 30, 2019
e940a0a
Added async iterator support
badrishc Oct 30, 2019
78fd56b
Merging
badrishc Oct 30, 2019
8e175e0
Added support for persistent/recoverable named iterators.
badrishc Oct 30, 2019
147006c
Merge branch 'master' into fasterlog
badrishc Oct 30, 2019
53ad95a
Merge branch 'master' into fasterlog
badrishc Oct 30, 2019
819cee0
Merge branch 'master' into fasterlog
badrishc Oct 30, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B1
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -156,6 +158,12 @@ Global
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|Any CPU.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.Build.0 = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -176,6 +184,7 @@ Global
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
{E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
6 changes: 6 additions & 0 deletions cs/playground/FasterLogSample/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6" />
</startup>
</configuration>
38 changes: 38 additions & 0 deletions cs/playground/FasterLogSample/FasterLogSample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifier>win7-x64</RuntimeIdentifier>
</PropertyGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>StructSample</RootNamespace>
<ErrorReport>prompt</ErrorReport>
<RestoreProjectStyle>PackageReference</RestoreProjectStyle>
<Prefer32Bit>true</Prefer32Bit>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Debug'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\x64\Debug\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)' == 'Release'">
<DefineConstants>TRACE</DefineConstants>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\x64\Release\</OutputPath>
</PropertyGroup>

<ItemGroup>
<None Include="App.config" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>
</Project>
250 changes: 250 additions & 0 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;

namespace FasterLogSample
{
public class Program
{
// Entry length can be between 1 and ((1 << FasterLogSettings.PageSizeBits) - 4)
const int entryLength = 1 << 10;
static readonly byte[] staticEntry = new byte[entryLength];
static readonly ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(10);
static FasterLog log;
static FasterLogScanIterator iter;

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
long lastIterValue = log.BeginAddress;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Append Throughput: {0} MB/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), nowValue);
lastValue = nowValue;

if (iter != null)
{
var nowIterValue = iter.CurrentAddress;
Console.WriteLine("Scan Throughput: {0} MB/sec, Iter pos: {1}",
(nowIterValue - lastIterValue) / (1000 * (nowTime - lastTime)), nowIterValue);
lastIterValue = nowIterValue;
}

lastTime = nowTime;
}
}

static void CommitThread()
{
while (true)
{
Thread.Sleep(5);
log.FlushAndCommit(true);

// Async version
// await Task.Delay(5);
// await log.FlushAndCommitAsync();
}
}

static void AppendThread()
{
while (true)
{
// TryAppend - can be used with throttling/back-off
// Accepts byte[] and ReadOnlySpan<byte>
while (!log.TryAppend(staticEntry, out _)) ;

// Synchronous blocking append
// Accepts byte[] and ReadOnlySpan<byte>
// log.Append(entry);

// Batched append - batch must fit on one page
// while (!log.TryAppend(spanBatch, out _)) ;
}
}

static void ScanThread()
{
Random r = new Random();

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
{
entry[i] = (byte)i;
}

var entrySpan = new Span<byte>(entry);

long lastAddress = 0;
Span<byte> result;
using (iter = log.Scan(log.BeginAddress, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result, out int length))
{
iter.WaitAsync().GetAwaiter().GetResult();
badrishc marked this conversation as resolved.
Show resolved Hide resolved
}

// Memory pool variant:
// iter.GetNext(pool, out IMemoryOwner<byte> resultMem, out int length))

if (!result.SequenceEqual(entrySpan))
{
if (result.Length != entrySpan.Length)
throw new Exception("Invalid entry found, expected length " + entrySpan.Length + ", actual length " + result.Length);
else
throw new Exception("Invalid entry found at offset " + FindDiff(result, entrySpan));
}

// Re-insert entry with small probability
if (r.Next(100) < 10)
{
log.Append(result);
}

if (iter.CurrentAddress - lastAddress > 500_000_000)
{
log.TruncateUntil(iter.CurrentAddress);
lastAddress = iter.CurrentAddress;
}
}
}
}

private static int FindDiff(Span<byte> b1, Span<byte> b2)
{
for (int i = 0; i < b1.Length; i++)
{
if (b1[i] != b2[i])
{
return i;
}
}
return 0;
}

/// <summary>
/// Main program entry point
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
bool sync = true;
var device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
log = new FasterLog(new FasterLogSettings { LogDevice = device });

// Populate entry being inserted
for (int i = 0; i < entryLength; i++)
{
staticEntry[i] = (byte)i;
}

if (sync)
{
// Append thread: create as many as needed
new Thread(new ThreadStart(AppendThread)).Start();

// Threads for scan, reporting, commit
var t1 = new Thread(new ThreadStart(ScanThread));
var t2 = new Thread(new ThreadStart(ReportThread));
var t3 = new Thread(new ThreadStart(CommitThread));
t1.Start(); t2.Start(); t3.Start();
t1.Join(); t2.Join(); t3.Join();
}
else
{
// Async version of demo: expect lower performance
// particularly for small payload sizes

const int NumParallelTasks = 10_000;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
int local = i;
tasks[i] = Task.Run(() => AppendAsync(local));
}

// Threads for scan, reporting, commit
var t1 = new Thread(new ThreadStart(ScanThread));
var t2 = new Thread(new ThreadStart(ReportThread));
var t3 = new Thread(new ThreadStart(CommitThread));
t1.Start(); t2.Start(); t3.Start();
t1.Join(); t2.Join(); t3.Join();

Task.WaitAll(tasks);
}
}

static async Task AppendAsync(int id)
{
bool batched = false;

await Task.Yield();

if (!batched)
{
// Unbatched version - append each item with commit
// Needs high parallelism (NumParallelTasks) for perf
while (true)
{
try
{
await log.AppendAsync(staticEntry);
}
catch (Exception ex)
{
Console.WriteLine($"{nameof(AppendAsync)}({id}): {ex}");
}
}
}
else
{
// Batched version - we append many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
{
await log.AppendToMemoryAsync(staticEntry);
if (count++ % 100 == 0)
{
await log.WaitForCommitAsync();
}
}
}
}

private struct ReadOnlySpanBatch : IReadOnlySpanBatch
{
private readonly int batchSize;
public ReadOnlySpanBatch(int batchSize) => this.batchSize = batchSize;
public ReadOnlySpan<byte> Get(int index) => staticEntry;
public int TotalEntries() => batchSize;
}

}
}
22 changes: 22 additions & 0 deletions cs/playground/FasterLogSample/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyDescription("")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")]
Loading