Skip to content

Commit

Permalink
Improved async sessions interface, working now. Does not persist doma…
Browse files Browse the repository at this point in the history
…nt sessions yet. See SimpleRecoveryTest3 for sample.
  • Loading branch information
badrishc committed Aug 8, 2019
1 parent 0bd752e commit dbe13fd
Show file tree
Hide file tree
Showing 9 changed files with 484 additions and 474 deletions.
94 changes: 0 additions & 94 deletions cs/src/core/Async/AsyncFasterKv.cs

This file was deleted.

72 changes: 55 additions & 17 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma warning disable 0162

using System;
using System.Threading.Tasks;

namespace FASTER.core
{
Expand Down Expand Up @@ -45,23 +46,24 @@ internal ClientSession(
/// </summary>
public void Dispose()
{
Resume();
ResumeThread();
fht.CompletePending(true);
fht.StopSession();
}

/// <summary>
/// Resume session on current thread
/// Call SuspendThread before any async op
/// </summary>
public void Resume()
public void ResumeThread()
{
fht.ResumeSession(prevCtx, ctx);
}

/// <summary>
/// Suspend session on current thread
/// </summary>
public void Suspend()
public void SuspendThread()
{
fht.SuspendSession();
}
Expand All @@ -77,9 +79,9 @@ public void Suspend()
/// <returns></returns>
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long monotonicSerialNum)
{
Resume();
ResumeThread();
var status = fht.Read(ref key, ref input, ref output, userContext, monotonicSerialNum);
Suspend();
SuspendThread();
return status;
}

Expand All @@ -93,9 +95,9 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
/// <returns></returns>
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long monotonicSerialNum)
{
Resume();
ResumeThread();
var status = fht.Upsert(ref key, ref desiredValue, userContext, monotonicSerialNum);
Suspend();
SuspendThread();
return status;
}

Expand All @@ -109,9 +111,9 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
/// <returns></returns>
public Status RMW(ref Key key, ref Input input, Context userContext, long monotonicSerialNum)
{
Resume();
ResumeThread();
var status = fht.RMW(ref key, ref input, userContext, monotonicSerialNum);
Suspend();
SuspendThread();
return status;
}

Expand All @@ -124,23 +126,59 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long monoto
/// <returns></returns>
public Status Delete(ref Key key, Context userContext, long monotonicSerialNum)
{
Resume();
ResumeThread();
var status = fht.Delete(ref key, userContext, monotonicSerialNum);
Suspend();
SuspendThread();
return status;
}


/// <summary>
/// Sync complete outstanding pending operations
/// </summary>
/// <param name="spinWait"></param>
/// <returns></returns>
public bool CompletePending(bool spinWait = false)
{
ResumeThread();
var result = fht.CompletePending(spinWait);
SuspendThread();
return result;
}

/// <summary>
/// Async complete outstanding pending operations
/// </summary>
/// <returns></returns>
public async ValueTask CompletePendingAsync()
{
ResumeThread();
await fht.CompletePendingAsync();
SuspendThread();
}

/// <summary>
/// Complete outstanding pending operations
/// Complete the ongoing checkpoint (if any)
/// </summary>
/// <param name="wait"></param>
/// <param name="spinWait"></param>
/// <returns></returns>
public bool CompletePending(bool wait = false)
public bool CompleteCheckpoint(bool spinWait = false)
{
Resume();
var result = fht.CompletePending(wait);
Suspend();
ResumeThread();
var result = fht.CompleteCheckpoint(spinWait);
SuspendThread();
return result;
}

/// <summary>
/// Complete the ongoing checkpoint (if any)
/// </summary>
/// <returns></returns>
internal async ValueTask CompleteCheckpointAsync()
{
ResumeThread();
await fht.CompleteCheckpointAsync();
SuspendThread();
}
}
}
Loading

0 comments on commit dbe13fd

Please sign in to comment.