Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async/await session model #130

Merged
merged 118 commits into from
Dec 20, 2019
Merged
Changes from 1 commit
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
484c674
Initial checkin
badrishc May 16, 2019
fc89978
Merge branch 'master' into async-support
badrishc May 21, 2019
63d1276
Merge branch 'master' into async-support
badrishc May 21, 2019
0c7bddd
Merge branch 'master' into async-support
badrishc Jul 10, 2019
3d17a2d
Merge branch 'master' into async-support
badrishc Jul 10, 2019
2b7e24f
Rough sketch of session without touching currrent thread-local framew…
badrishc Jul 11, 2019
00dc474
Improved support, added session pooling option, added session testcases.
badrishc Jul 12, 2019
533652b
Updates to session support
badrishc Jul 13, 2019
2fff5cb
Merge branch 'master' into async-support
badrishc Jul 24, 2019
6c5420a
Merge branch 'master' into async-support
badrishc Jul 26, 2019
4412113
Merge branch 'master' of https://github.com/Microsoft/FASTER into asy…
badrishc Jul 26, 2019
366a8cc
Merge branch 'async-support' of https://github.com/Microsoft/FASTER i…
badrishc Jul 26, 2019
0353678
Support large number of sessions (not limited by epoch table size). S…
badrishc Aug 1, 2019
7a487e3
Update FASTER.core.csproj
badrishc Aug 1, 2019
571333d
Initial checkin of async CompletePending - no tests yet.
badrishc Aug 3, 2019
d63603c
Merge branch 'async-support' of https://github.com/Microsoft/FASTER i…
badrishc Aug 3, 2019
c41855e
Update azure-pipelines.yml
badrishc Aug 3, 2019
252ef54
Update azure-pipelines.yml
badrishc Aug 3, 2019
2a2d7ac
Update azure-pipelines.yml
badrishc Aug 3, 2019
c67d882
Update azure-pipelines.yml
badrishc Aug 3, 2019
bcb75e7
Update azure-pipelines.yml
badrishc Aug 3, 2019
2eafe9b
Support suspend/resume sessions
badrishc Aug 6, 2019
0bd752e
Support true async CompleteCheckpoint
badrishc Aug 7, 2019
dbe13fd
Improved async sessions interface, working now. Does not persist doma…
badrishc Aug 8, 2019
1754b68
Persist dormant sessions correctly.
badrishc Aug 8, 2019
a486834
Update testcase
badrishc Aug 8, 2019
d72c92a
Correct handling of thread switching with async
badrishc Aug 9, 2019
869a88e
Merge branch 'master' into async-support
badrishc Aug 9, 2019
e5a4859
Merge branch 'async-support' of https://github.com/Microsoft/FASTER i…
badrishc Aug 9, 2019
82a01a5
Update
badrishc Aug 9, 2019
26be7f6
update
badrishc Aug 9, 2019
733e4de
update
badrishc Aug 9, 2019
30adeca
Updates and bugfixes
badrishc Aug 12, 2019
9e84d63
Removing sample test code.
badrishc Aug 12, 2019
9c60144
Update
badrishc Aug 13, 2019
d7064e8
Updated fast threadlocal
badrishc Aug 13, 2019
ea64fdf
Revert "Update"
badrishc Aug 13, 2019
363508c
Merge remote-tracking branch 'origin/async-support-test2' into async-…
badrishc Aug 13, 2019
789d38e
update
badrishc Aug 13, 2019
572b8bd
revert max threads in pool
badrishc Aug 13, 2019
4b75494
test
badrishc Aug 14, 2019
026912c
test
badrishc Aug 14, 2019
03c4d8e
Update azure-pipelines.yml for Azure Pipelines
badrishc Aug 14, 2019
e925af3
Update azure-pipelines.yml for Azure Pipelines
badrishc Aug 14, 2019
1c33b49
Update azure-pipelines.yml
badrishc Aug 14, 2019
4dc6d27
Update azure-pipelines.yml
badrishc Aug 14, 2019
5cda2f9
more tests, phew
badrishc Aug 15, 2019
68df34d
trying to reduce async context - incomplete
badrishc Aug 17, 2019
7e80b07
Merge branch 'master' into async-support
badrishc Aug 20, 2019
81a4a82
Merging from master
badrishc Aug 29, 2019
75b9cb2
Update
badrishc Sep 3, 2019
b16212f
Updates
badrishc Sep 4, 2019
b2479ed
Merge branch 'async-support-test' of https://github.com/Microsoft/FAS…
badrishc Sep 4, 2019
d03bcea
Updates
badrishc Sep 4, 2019
00fadc5
Merge branch 'master' into async-support-test
badrishc Sep 4, 2019
27b677e
Fixes
badrishc Sep 4, 2019
94b901e
Merge branch 'master' into async-support-test
badrishc Sep 4, 2019
9afc5fb
Wired in excluded serial nos for commit points.
badrishc Sep 5, 2019
62cecc8
Cleaning up interface and comments.
badrishc Sep 5, 2019
6cab32a
Fixing thread local init, cleanup
badrishc Sep 5, 2019
990d789
Merging from work branch
badrishc Sep 5, 2019
b9404b6
Fixing merge
badrishc Sep 5, 2019
78c2649
Updates to support sessions natively without thread-local
badrishc Sep 12, 2019
e170463
updates
badrishc Sep 12, 2019
38a26ff
Added tests
badrishc Sep 13, 2019
0740f87
Merge branch 'async-support' into async-support-test
badrishc Sep 13, 2019
669881d
Cleanup of warnings
badrishc Sep 13, 2019
29c64e1
Merge branch 'master' into async-support-test
badrishc Sep 13, 2019
16edd83
Initial checkin
badrishc Sep 17, 2019
3077f52
Updates.
badrishc Sep 17, 2019
853b3ea
Updates
badrishc Sep 17, 2019
6315a14
Cleaned up epochs, improved fine grain scalability.
badrishc Sep 18, 2019
19d5d82
Fixing test change
badrishc Sep 18, 2019
4ef03bb
Merge branch 'fasterlog' into async-support
badrishc Sep 18, 2019
ce31917
Fixing break after merge.
badrishc Sep 18, 2019
88d7269
Added commit and recovery support.
badrishc Sep 19, 2019
ddcc338
Added TryAppend so users can implement log throttling.
badrishc Sep 19, 2019
2cd85e3
Fasterlog lowmem (#178)
badrishc Sep 26, 2019
ec2a3b5
Fasterlog TryAppend (#179)
badrishc Sep 30, 2019
bb4e357
minor fix
badrishc Sep 30, 2019
4504937
merge
badrishc Sep 30, 2019
b06d112
Fasterlog async (#180)
badrishc Oct 3, 2019
002b993
Merge branch 'master' into fasterlog
badrishc Oct 4, 2019
944504b
Added tailing iterator WaitAsync to wait for iteration to proceed.
badrishc Oct 4, 2019
540d1a5
Merge branch 'fasterlog' of https://github.com/Microsoft/FASTER into …
badrishc Oct 4, 2019
80a2aeb
Convert Span to ReadOnlySpan for appends
badrishc Oct 4, 2019
0050694
Added MemoryPool/IMemoryOwner variant of iterator
badrishc Oct 4, 2019
127e908
Updates
badrishc Oct 4, 2019
6dc7af6
Updated way to pin pooled memory
badrishc Oct 4, 2019
ff27448
Update azure-pipelines.yml
badrishc Oct 4, 2019
72cdfdb
Merging from FasterLog branch for epoch and allocator goodness.
badrishc Oct 5, 2019
0c0bf58
Updates to merge
badrishc Oct 5, 2019
7565a21
Merging
badrishc Oct 5, 2019
8e42a74
Support minimum buffer size of just 1 page!
badrishc Oct 5, 2019
915f01e
Merge branch 'fasterlog' into async-support
badrishc Oct 6, 2019
089d545
Cleanup and updates.
badrishc Oct 7, 2019
c55de3f
Actually checking in support for 1 page in memory, added initial draf…
badrishc Oct 7, 2019
db68ae0
Added a test
badrishc Oct 7, 2019
5caea66
Improved sample, changed GetMemory to use byte[] instead of Span<byte>
badrishc Oct 7, 2019
e0f745f
Merging from fasterlog
badrishc Oct 7, 2019
fc47dbd
Merging goodness from master.
badrishc Oct 31, 2019
c4fba23
Merge branch 'master' into async-support
badrishc Oct 31, 2019
ca15128
Merge branch 'master' into async-support
badrishc Nov 14, 2019
0f65fbb
Merge branch 'master' into async-support
badrishc Nov 20, 2019
72ecc4f
Fixed break
badrishc Nov 20, 2019
a1b87a5
Merge branch 'master' into async-support
badrishc Dec 3, 2019
6b03887
Merge branch 'master' into async-support
badrishc Dec 3, 2019
876c0d3
Merging from master
badrishc Dec 4, 2019
b3cccd0
Fix merge
badrishc Dec 4, 2019
1eca340
Major API cleanup and porting to session-based interface
badrishc Dec 9, 2019
0237a0c
Added async API to sessions, added first draft of async sample.
badrishc Dec 10, 2019
255c000
Added tests
badrishc Dec 12, 2019
a16c5eb
Update README.md
badrishc Dec 12, 2019
95c0e58
Merge branch 'master' into async-support
badrishc Dec 14, 2019
0624fd5
Updates based on review.
badrishc Dec 19, 2019
b31631b
Merge branch 'async-support' of https://github.com/microsoft/FASTER i…
badrishc Dec 19, 2019
ecbf696
Minor fixes.
badrishc Dec 19, 2019
49866bf
fixed call to newsession
badrishc Dec 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cs/playground/FasterLogSample/Program.cs
Original file line number Diff line number Diff line change
@@ -87,9 +87,9 @@ static void ScanThread()
{
while (true)
{
while (!iter.GetNext(out result))
while (!iter.GetNext(out result, out int length))
{
Thread.Sleep(1000);
iter.WaitAsync().GetAwaiter().GetResult();
}

if (!result.SequenceEqual(entrySpan))
200 changes: 123 additions & 77 deletions cs/src/core/Index/FasterLog/FasterLogIterator.cs
Original file line number Diff line number Diff line change
@@ -6,15 +6,16 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using System.Buffers;

namespace FASTER.core
{
/// <summary>
/// Delegate for getting memory from user
/// </summary>
/// <param name="length"></param>
/// <param name="minLength">Minimum length of returned span</param>
/// <returns></returns>
public delegate Span<byte> GetMemory(int length);
public delegate Span<byte> GetMemory(int minLength);

/// <summary>
/// Scan iterator for hybrid log
@@ -84,7 +85,7 @@ internal unsafe FasterLogScanIterator(FasterLog fasterLog, BlittableAllocator<Em
/// Wait for iteration to be ready to continue
/// </summary>
/// <returns></returns>
public async void WaitAsync()
public async ValueTask WaitAsync()
{
while (true)
{
@@ -99,100 +100,60 @@ public async void WaitAsync()
/// <summary>
/// Get next record in iterator
/// </summary>
/// <param name="entry"></param>
/// <param name="entry">Copy of entry, if found</param>
/// <param name="entryLength">Actual length of entry</param>
/// <returns></returns>
public unsafe bool GetNext(out Span<byte> entry)
public unsafe bool GetNext(out Span<byte> entry, out int entryLength)
{
currentAddress = nextAddress;
while (true)
if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken))
{
// Check for boundary conditions
if (currentAddress < allocator.BeginAddress)
{
Debug.WriteLine("Iterator address is less than log BeginAddress " + allocator.BeginAddress + ", adjusting iterator address");
currentAddress = allocator.BeginAddress;
}

if ((currentAddress >= endAddress) || (currentAddress >= fasterLog.CommittedUntilAddress))
{
entry = default(Span<byte>);
return false;
}


if (frameSize == 0 && currentAddress < allocator.HeadAddress)
{
throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> allocator.LogPageSizeBits;
var offset = currentAddress & allocator.PageSizeMask;

var headAddress = allocator.HeadAddress;
var physicalAddress = default(long);

if (currentAddress < headAddress)
{
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset);
}
else
{
epoch.Resume();
headAddress = allocator.HeadAddress;
if (currentAddress < headAddress) // rare case
{
epoch.Suspend();
continue;
}

physicalAddress = allocator.GetPhysicalAddress(currentAddress);
}

// Check if record fits on page, if not skip to next page
int length = *(int*)physicalAddress;
int recordSize = 4 + Align(length);

if ((currentAddress & allocator.PageSizeMask) + recordSize > allocator.PageSize)
throw new Exception();

if (length == 0) // we are at end of page, skip to next
{
// If record
if (currentAddress >= headAddress)
epoch.Suspend();
currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
continue;
}

if (getMemory != null)
{
// Use user delegate to allocate memory
entry = getMemory(length);
if (entry.Length != length)
entry = getMemory(entryLength);
if (entry.Length < entryLength)
throw new Exception("Span provided has invalid length");
}
else
{
// We allocate a byte array from heap
entry = new Span<byte>(new byte[length]);
entry = new Span<byte>(new byte[entryLength]);
}

fixed (byte* bp = &entry.GetPinnableReference())
Buffer.MemoryCopy((void*)(4 + physicalAddress), bp, length, length);
Buffer.MemoryCopy((void*)(4 + physicalAddress), bp, entryLength, entryLength);

if (currentAddress >= headAddress)
if (epochTaken)
epoch.Suspend();

Debug.Assert((currentAddress & allocator.PageSizeMask) + recordSize <= allocator.PageSize);
return true;
}
entry = default;
return false;
}

/// <summary>
/// GetNext supporting memory pools
/// </summary>
/// <param name="pool"></param>
/// <param name="entry"></param>
/// <param name="entryLength"></param>
/// <returns></returns>
public unsafe bool GetNext(MemoryPool<byte> pool, out IMemoryOwner<byte> entry, out int entryLength)
{
if (GetNextInternal(out long physicalAddress, out entryLength, out bool epochTaken))
{
entry = pool.Rent(entryLength);
Buffer.MemoryCopy((void*)(4 + physicalAddress), (void*)((byte*)entry.Memory.Pin().Pointer + 4), entryLength, entryLength);

if (epochTaken)
epoch.Suspend();

if ((currentAddress & allocator.PageSizeMask) + recordSize == allocator.PageSize)
nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
else
nextAddress = currentAddress + recordSize;

return true;
}
entry = default;
entryLength = default;
return false;
}

/// <summary>
@@ -260,6 +221,91 @@ private int Align(int length)
return (length + 3) & ~3;
}

/// <summary>
/// Retrieve physical address of next iterator value
/// (under epoch protection if it is from main page buffer)
/// </summary>
/// <param name="physicalAddress"></param>
/// <param name="entryLength"></param>
/// <param name="epochTaken"></param>
/// <returns></returns>
private unsafe bool GetNextInternal(out long physicalAddress, out int entryLength, out bool epochTaken)
{
physicalAddress = 0;
entryLength = 0;
epochTaken = false;

currentAddress = nextAddress;
while (true)
{
// Check for boundary conditions
if (currentAddress < allocator.BeginAddress)
{
Debug.WriteLine("Iterator address is less than log BeginAddress " + allocator.BeginAddress + ", adjusting iterator address");
currentAddress = allocator.BeginAddress;
}

if ((currentAddress >= endAddress) || (currentAddress >= fasterLog.CommittedUntilAddress))
{
return false;
}

if (frameSize == 0 && currentAddress < allocator.HeadAddress)
{
throw new Exception("Iterator address is less than log HeadAddress in memory-scan mode");
}

var currentPage = currentAddress >> allocator.LogPageSizeBits;
var offset = currentAddress & allocator.PageSizeMask;

var headAddress = allocator.HeadAddress;

if (currentAddress < headAddress)
{
BufferAndLoad(currentAddress, currentPage, currentPage % frameSize);
physicalAddress = frame.GetPhysicalAddress(currentPage % frameSize, offset);
}
else
{
epoch.Resume();
headAddress = allocator.HeadAddress;
if (currentAddress < headAddress) // rare case
{
epoch.Suspend();
continue;
}

physicalAddress = allocator.GetPhysicalAddress(currentAddress);
}

// Check if record fits on page, if not skip to next page
entryLength = *(int*)physicalAddress;
int recordSize = 4 + Align(entryLength);

if ((currentAddress & allocator.PageSizeMask) + recordSize > allocator.PageSize)
throw new Exception();

if (entryLength == 0) // we are at end of page, skip to next
{
// If record
if (currentAddress >= headAddress)
epoch.Suspend();
currentAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
continue;
}

Debug.Assert((currentAddress & allocator.PageSizeMask) + recordSize <= allocator.PageSize);

if ((currentAddress & allocator.PageSizeMask) + recordSize == allocator.PageSize)
nextAddress = (1 + (currentAddress >> allocator.LogPageSizeBits)) << allocator.LogPageSizeBits;
else
nextAddress = currentAddress + recordSize;

epochTaken = currentAddress >= headAddress;
return true;
}
}

}
}