Skip to content

Commit

Permalink
Merge branch 'master' into async-support
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc authored Dec 3, 2019
2 parents 72ecc4f + a90aa54 commit a1b87a5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cc/CMakeLists.txt.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ project(googletest-download NONE)
include(ExternalProject)
ExternalProject_Add(googletest
GIT_REPOSITORY https://github.com/google/googletest.git
GIT_TAG master
GIT_TAG release-1.8.1
SOURCE_DIR "${CMAKE_BINARY_DIR}/googletest-src"
BINARY_DIR "${CMAKE_BINARY_DIR}/googletest-build"
CONFIGURE_COMMAND ""
Expand Down
4 changes: 2 additions & 2 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ static void ScanThread()
while (!iter.GetNext(out result, out _, out _))
{
// For finite end address, check if iteration ended
// if (iter.CurrentAddress >= endAddress) return;
// if (currentAddress >= endAddress) return;
iter.WaitAsync().GetAwaiter().GetResult();
}

Expand All @@ -186,7 +186,7 @@ static void ScanThread()

static async Task AsyncScan()
{
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
await foreach ((byte[] result, int length, long currentAddress) in iter.GetAsyncEnumerable())
{
if (Different(result, staticEntry))
throw new Exception("Invalid entry found");
Expand Down
18 changes: 10 additions & 8 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,42 +97,44 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
/// <summary>
/// Async enumerable for iterator
/// </summary>
/// <returns>Entry and entry length</returns>
public async IAsyncEnumerable<(byte[], int)> GetAsyncEnumerable([EnumeratorCancellation] CancellationToken token = default)
/// <returns>Entry, entry length, entry address</returns>
public async IAsyncEnumerable<(byte[], int, long)> GetAsyncEnumerable([EnumeratorCancellation] CancellationToken token = default)
{
while (!disposed)
{
byte[] result;
int length;
while (!GetNext(out result, out length, out long currentAddress))
long currentAddress;
while (!GetNext(out result, out length, out currentAddress))
{
if (currentAddress >= endAddress)
yield break;
if (!await WaitAsync(token))
yield break;
}
yield return (result, length);
yield return (result, length, currentAddress);
}
}

/// <summary>
/// Async enumerable for iterator (memory pool based version)
/// </summary>
/// <returns>Entry and entry length</returns>
public async IAsyncEnumerable<(IMemoryOwner<byte>, int)> GetAsyncEnumerable(MemoryPool<byte> pool, [EnumeratorCancellation] CancellationToken token = default)
/// <returns>Entry, entry length, entry address</returns>
public async IAsyncEnumerable<(IMemoryOwner<byte>, int, long)> GetAsyncEnumerable(MemoryPool<byte> pool, [EnumeratorCancellation] CancellationToken token = default)
{
while (!disposed)
{
IMemoryOwner<byte> result;
int length;
while (!GetNext(pool, out result, out length, out long currentAddress))
long currentAddress;
while (!GetNext(pool, out result, out length, out currentAddress))
{
if (currentAddress >= endAddress)
yield break;
if (!await WaitAsync(token))
yield break;
}
yield return (result, length);
yield return (result, length, currentAddress);
}
}
#endif
Expand Down
46 changes: 33 additions & 13 deletions docs/cs/FasterLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ segments corresponding to files on disk. Each segment consists of a fixed number
and pages sizes are configurable during construction of FasterLog. By default,
FasterLog commits at page boundaries. You can also force-commit the log as frequently as you need, e.g., every
5ms. The typical use cases of FasterLog are captured in our extremely detailed commented sample [here](https://github.com/microsoft/FASTER/blob/master/cs/playground/FasterLogSample/Program.cs). FasterLog
works with .NET Standard 2.0, and can be used on a broad range of machines and devices.
works with .NET Standard 2.0, and can be used on a broad range of machines and devices. We have tested
it on both Windows and Linux-based machines.

## Creating the Log

Expand Down Expand Up @@ -105,7 +106,7 @@ Scan using `IAsyncEnumerable`:

```cs
using (iter = log.Scan(log.BeginAddress, 100_000_000))
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
await foreach ((byte[] result, int length, long currentAddress) in iter.GetAsyncEnumerable())
{
// Process record
}
Expand All @@ -118,9 +119,9 @@ end of iteration, or because we are waiting for a page read or commit to complet
using (var iter = log.Scan(0, 100_000_000))
while (true)
{
while (!iter.GetNext(out Span<byte> result))
while (!iter.GetNext(out byte[] result, out int entryLength, out long currentAddress))
{
if (iter.CurrentAddress >= 100_000_000) return;
if (currentAddress >= 100_000_000) return;
await iter.WaitAsyc();
}
// Process record
Expand All @@ -130,9 +131,18 @@ end of iteration, or because we are waiting for a page read or commit to complet
For tailing iteration, you simply specify `long.MaxValue` as the end address for the scan. You are guaranteed
to see only committed entries during a tailing iteration.

You can persist iterators as part of commit by simply naming them during their creation. On recovery, if an
iterator with the specified name exists, it will be initialized with the corresponding current iterator
pointer.
An iterator is associated with a `CompletedUntilAddress` which indicates until what address the iteration has been
completed. Users update the `CompletedUntilAddress` as follows:

```cs
iter.CompleteUntil(long address);
```

The specified address needs to be a valid log address, similar to the `TruncateUntil` call on the log (described below).

You can persist iterators (or more precisely, their `CompletedUntilAddress`) as part of a commit by simply naming
them during their creation. On recovery, if an iterator with the specified name exists, it will be initialized with
its last-committed `CompletedUntilAddress` as the current iterator pointer.

```cs
using (var iter = log.Scan(0, long.MaxValue, name: "foo"))
Expand All @@ -149,10 +159,18 @@ using (var iter = log.Scan(0, long.MaxValue, name: "foo", recover: false))

FasterLog support log head truncation, until any prefix of the log. Truncation updates the log begin address and
deletes truncated parts of the log from disk, if applicable. A truncation is persisted via commit, similar
to tail address commit. Here is an example, where we truncate the log to limit it to the last 100GB:
to tail address commit.

There are two variants: `TruncateUntilPageStart` truncates until the start of the page corresponding to the
specified address. This is safe to invoke with any address, as the page start is always a valid pointer to the
log. The second variant, called `TruncateUntil`, truncates exactly until the specified address. Here, the user
needs to be careful and provide a valid address that points to a log entry. For example, any address returned by
an iterator is a valid location that one could truncate until.

Here is an example, where we truncate the log to limit it to the last 100GB:

```cs
log.TruncateUntil(log.CommittedUntilAddress - 100_000_000_000L);
log.TruncateUntilPageStart(log.CommittedUntilAddress - 100_000_000_000L);
```

### Random Read
Expand Down Expand Up @@ -257,6 +275,7 @@ async ValueTask<long> EnqueueAndWaitForCommitAsync(IReadOnlySpanBatch readOnlySp

// Truncate log (from head)
void TruncateUntilPageStart(long untilAddress)
void TruncateUntil(long untilAddress)

// Scan interface
Expand All @@ -265,13 +284,14 @@ FasterLogScanIterator Scan(long beginAddress, long endAddress, string name = nul

// FasterLogScanIterator interface
bool GetNext(out byte[] entry, out int entryLength)
bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry, out int entryLength)
void CompleteUntil(long address)
bool GetNext(out byte[] entry, out int entryLength, out long currentAddress)
bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry, out int entryLength, out long currentAddress)
async ValueTask WaitAsync()

// IAsyncEnumerable interface to FasterLogScanIterator
async IAsyncEnumerable<(byte[], int)> GetAsyncEnumerable()
async IAsyncEnumerable<(IMemoryOwner<byte>, int)> GetAsyncEnumerable(MemoryPool<byte> pool)
async IAsyncEnumerable<(byte[], int, long)> GetAsyncEnumerable()
async IAsyncEnumerable<(IMemoryOwner<byte>, int, long)> GetAsyncEnumerable(MemoryPool<byte> pool)

// Random read
Expand Down

0 comments on commit a1b87a5

Please sign in to comment.