From e1704634a4a1bc8d2aae096c8d26ac8c2a318fc3 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 12 Sep 2019 13:39:22 -0700 Subject: [PATCH] updates --- cs/src/core/ClientSession/FASTERAsync.cs | 19 ++++++--------- cs/src/core/Index/FASTER/FASTER.cs | 10 +++----- cs/src/core/Index/FASTER/FASTERThread.cs | 30 ++++++++---------------- 3 files changed, 20 insertions(+), 39 deletions(-) diff --git a/cs/src/core/ClientSession/FASTERAsync.cs b/cs/src/core/ClientSession/FASTERAsync.cs index e75aa6fca..3335e4e6b 100644 --- a/cs/src/core/ClientSession/FASTERAsync.cs +++ b/cs/src/core/ClientSession/FASTERAsync.cs @@ -25,9 +25,9 @@ internal async ValueTask CompletePendingAsync(ClientSession _recoveredSessions; - private readonly FastThreadLocal prevThreadCtx; private readonly FastThreadLocal threadCtx; @@ -95,7 +94,6 @@ private enum CheckpointType public FasterKV(long size, Functions functions, LogSettings logSettings, CheckpointSettings checkpointSettings = null, SerializerSettings serializerSettings = null, IFasterEqualityComparer comparer = null, VariableLengthStructSettings variableLengthStructSettings = null) { threadCtx = new FastThreadLocal(); - prevThreadCtx = new FastThreadLocal(); if (comparer != null) this.comparer = comparer; @@ -294,7 +292,6 @@ public Guid StartSession() return InternalAcquire(); } - /// /// Continue session with FASTER /// @@ -344,10 +341,10 @@ public bool CompletePending(bool wait = false) public IEnumerable GetPendingRequests() { - foreach (var kvp in prevThreadCtx.Value.ioPendingRequests) + foreach (var kvp in threadCtx.Value.prevCtx?.ioPendingRequests) yield return kvp.Value.serialNum; - foreach (var val in prevThreadCtx.Value.retryRequests) + foreach (var val in threadCtx.Value.prevCtx?.retryRequests) yield return val.serialNum; foreach (var kvp in threadCtx.Value.ioPendingRequests) @@ -548,8 +545,7 @@ public bool GrowIndex() public void Dispose() { base.Free(); - threadCtx.Dispose(); - prevThreadCtx.Dispose(); + threadCtx?.Dispose(); hlog.Dispose(); readcache?.Dispose(); } diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 722b0d8f4..6612ec390 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -21,7 +21,6 @@ internal Guid InternalAcquire() { epoch.Acquire(); threadCtx.InitializeThread(); - prevThreadCtx.InitializeThread(); Phase phase = _systemState.phase; if (phase != Phase.REST) { @@ -30,10 +29,10 @@ internal Guid InternalAcquire() Guid guid = Guid.NewGuid(); threadCtx.Value = new FasterExecutionContext(); InitContext(threadCtx.Value, guid); - prevThreadCtx.Value = new FasterExecutionContext(); - InitContext(prevThreadCtx.Value, guid); - prevThreadCtx.Value.version--; - threadCtx.Value.prevCtx = prevThreadCtx.Value; + + threadCtx.Value.prevCtx = new FasterExecutionContext(); + InitContext(threadCtx.Value.prevCtx, guid); + threadCtx.Value.prevCtx.version--; InternalRefresh(threadCtx.Value); return threadCtx.Value.guid; } @@ -183,33 +182,24 @@ internal bool InternalCompletePending(FasterExecutionContext ctx, bool wait = fa { bool done = true; + #region Previous pending requests if (!RelaxedCPR) { - #region Previous pending requests - if (ctx.phase == Phase.IN_PROGRESS - || - ctx.phase == Phase.WAIT_PENDING) + if (ctx.phase == Phase.IN_PROGRESS || ctx.phase == Phase.WAIT_PENDING) { CompleteIOPendingRequests(ctx.prevCtx, ctx); - epoch.ProtectAndDrain(); // incorrect? - // InternalRefresh(); CompleteRetryRequests(ctx.prevCtx, ctx); + InternalRefresh(ctx); done &= (ctx.prevCtx.ioPendingRequests.Count == 0); done &= (ctx.prevCtx.retryRequests.Count == 0); } - #endregion } + #endregion - if (!(ctx.phase == Phase.IN_PROGRESS - || - ctx.phase == Phase.WAIT_PENDING)) - { - CompleteIOPendingRequests(ctx, ctx); - } - epoch.ProtectAndDrain(); // incorrect? - // InternalRefresh(); + CompleteIOPendingRequests(ctx, ctx); CompleteRetryRequests(ctx, ctx); + InternalRefresh(ctx); done &= (ctx.ioPendingRequests.Count == 0); done &= (ctx.retryRequests.Count == 0);