Skip to content

Commit

Permalink
Fixing thread local init, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Sep 5, 2019
1 parent 62cecc8 commit 6cab32a
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 63 deletions.
37 changes: 18 additions & 19 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,23 @@ internal ClientSession(
/// </summary>
public void Dispose()
{
ResumeThread();
fht.CompletePending(true);
fht.StopSession();
TryCompletePending(true);
fht.DisposeClientSession(ID);
}

/// <summary>
/// Resume session on current thread
/// Call SuspendThread before any async op
/// </summary>
public void ResumeThread()
public void UnsafeResumeThread()
{
fht.ResumeSession(prevCtx, ctx);
}

/// <summary>
/// Suspend session on current thread
/// </summary>
public void SuspendThread()
public void UnsafeSuspendThread()
{
fht.SuspendSession();
}
Expand All @@ -79,9 +78,9 @@ public void SuspendThread()
/// <returns></returns>
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext, long monotonicSerialNum)
{
ResumeThread();
UnsafeResumeThread();
var status = fht.Read(ref key, ref input, ref output, userContext, monotonicSerialNum);
SuspendThread();
UnsafeSuspendThread();
return status;
}

Expand All @@ -95,9 +94,9 @@ public Status Read(ref Key key, ref Input input, ref Output output, Context user
/// <returns></returns>
public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, long monotonicSerialNum)
{
ResumeThread();
UnsafeResumeThread();
var status = fht.Upsert(ref key, ref desiredValue, userContext, monotonicSerialNum);
SuspendThread();
UnsafeSuspendThread();
return status;
}

Expand All @@ -111,9 +110,9 @@ public Status Upsert(ref Key key, ref Value desiredValue, Context userContext, l
/// <returns></returns>
public Status RMW(ref Key key, ref Input input, Context userContext, long monotonicSerialNum)
{
ResumeThread();
UnsafeResumeThread();
var status = fht.RMW(ref key, ref input, userContext, monotonicSerialNum);
SuspendThread();
UnsafeSuspendThread();
return status;
}

Expand All @@ -126,9 +125,9 @@ public Status RMW(ref Key key, ref Input input, Context userContext, long monoto
/// <returns></returns>
public Status Delete(ref Key key, Context userContext, long monotonicSerialNum)
{
ResumeThread();
UnsafeResumeThread();
var status = fht.Delete(ref key, userContext, monotonicSerialNum);
SuspendThread();
UnsafeSuspendThread();
return status;
}

Expand All @@ -138,11 +137,11 @@ public Status Delete(ref Key key, Context userContext, long monotonicSerialNum)
/// </summary>
/// <param name="spinWait"></param>
/// <returns></returns>
public bool CompletePending(bool spinWait = false)
public bool TryCompletePending(bool spinWait = false)
{
ResumeThread();
UnsafeResumeThread();
var result = fht.CompletePending(spinWait);
SuspendThread();
UnsafeSuspendThread();
return result;
}

Expand All @@ -160,11 +159,11 @@ public async ValueTask CompletePendingAsync()
/// </summary>
/// <param name="spinWait"></param>
/// <returns></returns>
public bool CompleteCheckpoint(bool spinWait = false)
public bool TryCompleteCheckpoint(bool spinWait = false)
{
ResumeThread();
UnsafeResumeThread();
var result = fht.CompleteCheckpoint(spinWait);
SuspendThread();
UnsafeSuspendThread();
return result;
}

Expand Down
20 changes: 10 additions & 10 deletions cs/src/core/ClientSession/FASTERAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ internal async ValueTask CompletePendingAsync(ClientSession<Key, Value, Input, O

if (clientSession.prevCtx.retryRequests.Count > 0)
{
clientSession.ResumeThread();
clientSession.UnsafeResumeThread();
CompleteRetryRequests(clientSession.prevCtx, clientSession);
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
}

done &= (clientSession.prevCtx.ioPendingRequests.Count == 0);
Expand Down Expand Up @@ -128,7 +128,7 @@ private bool AtomicSwitch(FasterExecutionContext fromCtx, FasterExecutionContext
private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession, bool async = true)
{
if (async)
clientSession.ResumeThread();
clientSession.UnsafeResumeThread();

var previousState = SystemState.Make(threadCtx.Value.phase, threadCtx.Value.version);
var finalState = SystemState.Copy(ref _systemState);
Expand Down Expand Up @@ -183,9 +183,9 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,

if (async && !IsIndexFuzzyCheckpointCompleted())
{
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
await IsIndexFuzzyCheckpointCompletedAsync();
clientSession.ResumeThread();
clientSession.UnsafeResumeThread();
}
GlobalMoveToNextCheckpointState(currentState);

Expand Down Expand Up @@ -287,9 +287,9 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
if (async && !notify)
{
Debug.Assert(_hybridLogCheckpoint.flushedSemaphore != null);
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
await _hybridLogCheckpoint.flushedSemaphore.WaitAsync();
clientSession.ResumeThread();
clientSession.UnsafeResumeThread();

_hybridLogCheckpoint.flushedSemaphore.Release();

Expand All @@ -302,9 +302,9 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,

if (async && !notify)
{
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
await IsIndexFuzzyCheckpointCompletedAsync();
clientSession.ResumeThread();
clientSession.UnsafeResumeThread();

notify = true;
}
Expand Down Expand Up @@ -366,7 +366,7 @@ private async ValueTask HandleCheckpointingPhasesAsync(ClientSession<Key, Value,
} while (previousState.word != finalState.word);

if (async)
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
}
}
}
7 changes: 5 additions & 2 deletions cs/src/core/ClientSession/FASTERClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ internal void ResumeSession(FasterExecutionContext prevThreadCtx, FasterExecutio
epoch.Resume();

// Copy contexts to thread-local
this.prevThreadCtx.InitializeThread();
this.threadCtx.InitializeThread();
if (!this.prevThreadCtx.IsInitializedForThread)
this.prevThreadCtx.InitializeThread();

if (!this.threadCtx.IsInitializedForThread)
this.threadCtx.InitializeThread();

this.prevThreadCtx.Value = prevThreadCtx;
this.threadCtx.Value = threadCtx;
Expand Down
30 changes: 16 additions & 14 deletions cs/src/core/Epochs/FastThreadLocal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ internal class FastThreadLocal<T>
private const int kMaxInstances = 128;

[ThreadStatic]
private static T[] values;
private static T[] tl_values;
[ThreadStatic]
private static int t_iid;
private static int[] tl_iid;

private readonly int id;
private readonly int offset;
private readonly int iid;

private static readonly int[] instances = new int[kMaxInstances];
Expand All @@ -35,7 +35,7 @@ public FastThreadLocal()
{
if (0 == Interlocked.CompareExchange(ref instances[i], iid, 0))
{
id = i;
offset = i;
return;
}
}
Expand All @@ -44,36 +44,38 @@ public FastThreadLocal()

public void InitializeThread()
{
if (values == null)
if (tl_values == null)
{
values = new T[kMaxInstances];
tl_values = new T[kMaxInstances];
tl_iid = new int[kMaxInstances];
}
if (t_iid != iid)
if (tl_iid[offset] != iid)
{
t_iid = iid;
values[id] = default(T);
tl_iid[offset] = iid;
tl_values[offset] = default(T);
}
}

public void DisposeThread()
{
values[id] = default(T);
tl_values[offset] = default(T);
tl_iid[offset] = 0;
}

/// <summary>
/// Dispose instance for all threads
/// </summary>
public void Dispose()
{
instances[id] = 0;
instances[offset] = 0;
}

public T Value
{
get => values[id];
set => values[id] = value;
get => tl_values[offset];
set => tl_values[offset] = value;
}

public bool IsInitializedForThread => (values != null) && (iid == t_iid);
public bool IsInitializedForThread => (tl_values != null) && (iid == tl_iid[offset]);
}
}
1 change: 0 additions & 1 deletion cs/src/core/Epochs/LightEpoch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ public void Resume()
{
Acquire();
}
ProtectAndDrain();
}

internal FastThreadLocal<int> ThreadEntry => threadEntryIndex;
Expand Down
12 changes: 6 additions & 6 deletions cs/src/core/Index/FASTER/FASTERThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,13 @@ internal void CompleteRetryRequests(FasterExecutionContext context)

if (count == 0) return;

clientSession.ResumeThread();
clientSession.UnsafeResumeThread();
for (int i = 0; i < count; i++)
{
var pendingContext = context.retryRequests.Dequeue();
InternalRetryRequestAndCallback(context, pendingContext);
}
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
}

internal void CompleteIOPendingRequests(FasterExecutionContext context)
Expand All @@ -279,21 +279,21 @@ internal void CompleteIOPendingRequests(FasterExecutionContext context)

if (context.readyResponses.Count > 0)
{
clientSession.ResumeThread();
clientSession.UnsafeResumeThread();
while (context.readyResponses.Count > 0)
{
context.readyResponses.TryDequeue(out request);
InternalContinuePendingRequestAndCallback(context, request);
}
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
}
else
{
request = await context.readyResponses.DequeueAsync(token);

clientSession.ResumeThread();
clientSession.UnsafeResumeThread();
InternalContinuePendingRequestAndCallback(context, request);
clientSession.SuspendThread();
clientSession.UnsafeSuspendThread();
}

}
Expand Down
4 changes: 2 additions & 2 deletions cs/test/AsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public async Task AsyncRecoveryTest1(CheckpointType checkpointType)

s2.Dispose();
s1.Dispose(); // should receive persistence callback
s0.ResumeThread(); // should receive persistence callback
s0.UnsafeResumeThread(); // should receive persistence callback
s0.Dispose();
fht1.Dispose();

Expand All @@ -93,7 +93,7 @@ public async Task AsyncRecoveryTest1(CheckpointType checkpointType)
var status = s3.Read(ref inputArray[key], ref inputArg, ref output, Empty.Default, 0);

if (status == Status.PENDING)
s3.CompletePending(true);
s3.TryCompletePending(true);
else
{
Assert.IsTrue(output.value.numClicks == key);
Expand Down
Loading

0 comments on commit 6cab32a

Please sign in to comment.