Skip to content

Commit

Permalink
Updated IAsyncEnumerable to also return entry address
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Nov 26, 2019
1 parent db9bf45 commit bf65763
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ static void ScanThread()
while (!iter.GetNext(out result, out _, out _))
{
// For finite end address, check if iteration ended
// if (iter.CurrentAddress >= endAddress) return;
// if (currentAddress >= endAddress) return;
iter.WaitAsync().GetAwaiter().GetResult();
}

Expand All @@ -186,7 +186,7 @@ static void ScanThread()

static async Task AsyncScan()
{
await foreach ((byte[] result, int length) in iter.GetAsyncEnumerable())
await foreach ((byte[] result, int length, long currentAddress) in iter.GetAsyncEnumerable())
{
if (Different(result, staticEntry))
throw new Exception("Invalid entry found");
Expand Down
18 changes: 10 additions & 8 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,42 +97,44 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
/// <summary>
/// Async enumerable for iterator
/// </summary>
/// <returns>Entry and entry length</returns>
public async IAsyncEnumerable<(byte[], int)> GetAsyncEnumerable([EnumeratorCancellation] CancellationToken token = default)
/// <returns>Entry, entry length, entry address</returns>
public async IAsyncEnumerable<(byte[], int, long)> GetAsyncEnumerable([EnumeratorCancellation] CancellationToken token = default)
{
while (!disposed)
{
byte[] result;
int length;
while (!GetNext(out result, out length, out long currentAddress))
long currentAddress;
while (!GetNext(out result, out length, out currentAddress))
{
if (currentAddress >= endAddress)
yield break;
if (!await WaitAsync(token))
yield break;
}
yield return (result, length);
yield return (result, length, currentAddress);
}
}

/// <summary>
/// Async enumerable for iterator (memory pool based version)
/// </summary>
/// <returns>Entry and entry length</returns>
public async IAsyncEnumerable<(IMemoryOwner<byte>, int)> GetAsyncEnumerable(MemoryPool<byte> pool, [EnumeratorCancellation] CancellationToken token = default)
/// <returns>Entry, entry length, entry address</returns>
public async IAsyncEnumerable<(IMemoryOwner<byte>, int, long)> GetAsyncEnumerable(MemoryPool<byte> pool, [EnumeratorCancellation] CancellationToken token = default)
{
while (!disposed)
{
IMemoryOwner<byte> result;
int length;
while (!GetNext(pool, out result, out length, out long currentAddress))
long currentAddress;
while (!GetNext(pool, out result, out length, out currentAddress))
{
if (currentAddress >= endAddress)
yield break;
if (!await WaitAsync(token))
yield break;
}
yield return (result, length);
yield return (result, length, currentAddress);
}
}
#endif
Expand Down

0 comments on commit bf65763

Please sign in to comment.