Skip to content

Commit

Permalink
Merge branch 'fasterlog' into async-support
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 18, 2019
2 parents 29c64e1 + 19d5d82 commit 4ef03bb
Show file tree
Hide file tree
Showing 16 changed files with 823 additions and 64 deletions.
9 changes: 9 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B1
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -156,6 +158,12 @@ Global
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|Any CPU.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.ActiveCfg = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Debug|x64.Build.0 = Debug|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -176,6 +184,7 @@ Global
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
{E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
6 changes: 6 additions & 0 deletions cs/playground/FasterLogSample/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6" />
</startup>
</configuration>
38 changes: 38 additions & 0 deletions cs/playground/FasterLogSample/FasterLogSample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net46</TargetFramework>
<Platforms>x64</Platforms>
<RuntimeIdentifier>win7-x64</RuntimeIdentifier>
</PropertyGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>StructSample</RootNamespace>
<ErrorReport>prompt</ErrorReport>
<RestoreProjectStyle>PackageReference</RestoreProjectStyle>
<Prefer32Bit>true</Prefer32Bit>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Debug'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\x64\Debug\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)' == 'Release'">
<DefineConstants>TRACE</DefineConstants>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\x64\Release\</OutputPath>
</PropertyGroup>

<ItemGroup>
<None Include="App.config" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>
</Project>
97 changes: 97 additions & 0 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading;
using FASTER.core;

namespace FasterLogSample
{
public class Program
{
const int entryLength = 96;
static FasterLog log;

static void ReportThread()
{
long lastTime = 0;
long lastValue = log.TailAddress;
Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);
var nowTime = sw.ElapsedMilliseconds;
var nowValue = log.TailAddress;

Console.WriteLine("Throughput: {0} MB/sec",
(nowValue - lastValue) / (1000*(nowTime - lastTime)));
lastTime = nowTime;
lastValue = nowValue;
}
}

static void AppendThread()
{
byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
entry[i] = (byte)i;

while (true)
{
log.Append(entry);
}
}

static void ScanThread()
{
Random r = new Random();

Thread.Sleep(5000);

byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
entry[i] = (byte)i;
var entrySpan = new Span<byte>(entry);


long lastAddress = 0;
Span<byte> result;
using (var iter = log.Scan(0, long.MaxValue))
{
while (true)
{
while (!iter.GetNext(out result))
Thread.Sleep(1000);
if (!result.SequenceEqual(entrySpan))
{
throw new Exception("Invalid entry found");
}

if (r.Next(100) < 10)
log.Append(result);

if (iter.CurrentAddress - lastAddress > 500000000)
{
log.TruncateUntil(iter.CurrentAddress);
lastAddress = iter.CurrentAddress;
}
}
}
}

static void Main(string[] args)
{
var device = Devices.CreateLogDevice("D:\\logs\\hlog.log");
log = new FasterLog(new FasterLogSettings { LogDevice = device });

new Thread(new ThreadStart(AppendThread)).Start();
new Thread(new ThreadStart(ScanThread)).Start();
new Thread(new ThreadStart(ReportThread)).Start();

Thread.Sleep(500*1000);
}
}
}
22 changes: 22 additions & 0 deletions cs/playground/FasterLogSample/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyDescription("")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")]
9 changes: 5 additions & 4 deletions cs/src/core/Device/Devices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ public static class Devices
/// <param name="logPath">Path to file that will store the log (empty for null device)</param>
/// <param name="preallocateFile">Whether we try to preallocate the file on creation</param>
/// <param name="deleteOnClose">Delete files on close</param>
/// <param name="capacity"></param>
/// <param name="capacity">The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit</param>
/// <param name="recoverDevice">Whether to recover device metadata from existing files</param>
/// <returns>Device instance</returns>
public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED)
public static IDevice CreateLogDevice(string logPath, bool preallocateFile = true, bool deleteOnClose = false, long capacity = CAPACITY_UNSPECIFIED, bool recoverDevice = false)
{
if (string.IsNullOrWhiteSpace(logPath))
return new NullDevice();
Expand All @@ -38,12 +39,12 @@ public static IDevice CreateLogDevice(string logPath, bool preallocateFile = tru
#if DOTNETCORE
if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity);
logDevice = new ManagedLocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity, recoverDevice);
}
else
#endif
{
logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, capacity: capacity);
logDevice = new LocalStorageDevice(logPath, preallocateFile, deleteOnClose, true, capacity, recoverDevice);
}
return logDevice;
}
Expand Down
21 changes: 15 additions & 6 deletions cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Win32.SafeHandles;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
Expand All @@ -24,16 +25,18 @@ public class LocalStorageDevice : StorageDeviceBase
/// <summary>
/// Constructor
/// </summary>
/// <param name="filename"></param>
/// <param name="filename">File name (or prefix) with path</param>
/// <param name="preallocateFile"></param>
/// <param name="deleteOnClose"></param>
/// <param name="disableFileBuffering"></param>
/// <param name="capacity">The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit </param>
/// <param name="recoverDevice">Whether to recover device metadata from existing files</param>
public LocalStorageDevice(string filename,
bool preallocateFile = false,
bool deleteOnClose = false,
bool disableFileBuffering = true,
long capacity = Devices.CAPACITY_UNSPECIFIED)
long capacity = Devices.CAPACITY_UNSPECIFIED,
bool recoverDevice = false)
: base(filename, GetSectorSize(filename), capacity)

{
Expand All @@ -42,7 +45,8 @@ public LocalStorageDevice(string filename,
this.deleteOnClose = deleteOnClose;
this.disableFileBuffering = disableFileBuffering;
logHandles = new SafeConcurrentDictionary<int, SafeFileHandle>();
RecoverFiles();
if (recoverDevice)
RecoverFiles();
}

private void RecoverFiles()
Expand All @@ -53,14 +57,19 @@ private void RecoverFiles()

string bareName = fi.Name;

int prevSegmentId = -1;
List<int> segids = new List<int>();
foreach (FileInfo item in di.GetFiles(bareName + "*"))
{
int segmentId = Int32.Parse(item.Name.Replace(bareName, "").Replace(".", ""));
segids.Add(Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")));
}
segids.Sort();

int prevSegmentId = -1;
foreach (int segmentId in segids)
{
if (segmentId != prevSegmentId + 1)
{
startSegment = segmentId;

}
else
{
Expand Down
23 changes: 16 additions & 7 deletions cs/src/core/Device/ManagedLocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Win32.SafeHandles;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
Expand All @@ -24,19 +25,21 @@ public class ManagedLocalStorageDevice : StorageDeviceBase
/// <summary>
///
/// </summary>
/// <param name="filename"></param>
/// <param name="filename">File name (or prefix) with path</param>
/// <param name="preallocateFile"></param>
/// <param name="deleteOnClose"></param>
/// <param name="capacity">The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit </param>
public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED)
/// <param name="capacity">The maximal number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit</param>
/// <param name="recoverDevice">Whether to recover device metadata from existing files</param>
public ManagedLocalStorageDevice(string filename, bool preallocateFile = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED, bool recoverDevice = false)
: base(filename, GetSectorSize(filename), capacity)
{
pool = new SectorAlignedBufferPool(1, 1);

this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
logHandles = new ConcurrentDictionary<int, Stream>();
RecoverFiles();
if (recoverDevice)
RecoverFiles();
}


Expand All @@ -48,14 +51,19 @@ private void RecoverFiles()

string bareName = fi.Name;

int prevSegmentId = -1;
List<int> segids = new List<int>();
foreach (FileInfo item in di.GetFiles(bareName + "*"))
{
int segmentId = Int32.Parse(item.Name.Replace(bareName, "").Replace(".", ""));
segids.Add(Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")));
}
segids.Sort();

int prevSegmentId = -1;
foreach (int segmentId in segids)
{
if (segmentId != prevSegmentId + 1)
{
startSegment = segmentId;

}
else
{
Expand All @@ -68,6 +76,7 @@ private void RecoverFiles()




class ReadCallbackWrapper
{
readonly IOCompletionCallback callback;
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Device/StorageDeviceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void TruncateUntilSegment(int toSegment)
public virtual void TruncateUntilAddressAsync(long toAddress, AsyncCallback callback, IAsyncResult result)
{
// Truncate only up to segment boundary if address is not aligned
TruncateUntilSegmentAsync((int)toAddress >> segmentSizeBits, callback, result);
TruncateUntilSegmentAsync((int)(toAddress >> segmentSizeBits), callback, result);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Epochs/FastThreadLocal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ public T Value

public bool IsInitializedForThread => (tl_values != null) && (iid == tl_iid[offset]);
}
}
}
Loading

0 comments on commit 4ef03bb

Please sign in to comment.