From 030a3b39c789f701c84c4149337f7bf26ca24d9d Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Fri, 25 Feb 2022 09:50:04 -0800 Subject: [PATCH 1/3] Move CompletePending(WithOutputs) into IFasterContext --- cs/src/core/ClientSession/ClientSession.cs | 21 ++++--------------- cs/src/core/ClientSession/IFasterContext.cs | 19 +++++++++++++++++ .../ClientSession/LockableUnsafeContext.cs | 19 +++-------------- cs/src/core/ClientSession/UnsafeContext.cs | 17 ++------------- 4 files changed, 28 insertions(+), 48 deletions(-) diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index a2cdbe73c..05a0946b5 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -173,7 +173,7 @@ public LockableUnsafeContext GetL return this.luContext; } - #region IFasterOperations + #region IFasterContext /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) @@ -462,7 +462,7 @@ public void Refresh() UnsafeSuspendThread(); } - #endregion IFasterOperations + #endregion IFasterContext #region Pending Operations @@ -485,24 +485,11 @@ public IEnumerable GetPendingRequests() yield return val.serialNum; } - /// - /// Synchronously complete outstanding pending synchronous operations. - /// Async operations must be completed individually. - /// - /// Wait for all pending operations on session to complete - /// Spin-wait until ongoing commit/checkpoint, if any, completes - /// True if all pending operations have completed, false otherwise + /// public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) => CompletePending(false, wait, spinWaitForCommit); - /// - /// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations. - /// Async operations must be completed individually. - /// - /// Outputs completed by this operation - /// Wait for all pending operations on session to complete - /// Spin-wait until ongoing commit/checkpoint, if any, completes - /// True if all pending operations have completed, false otherwise + /// public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) { InitializeCompletedOutputs(); diff --git a/cs/src/core/ClientSession/IFasterContext.cs b/cs/src/core/ClientSession/IFasterContext.cs index 6cef9f9d0..dc2b6ed7e 100644 --- a/cs/src/core/ClientSession/IFasterContext.cs +++ b/cs/src/core/ClientSession/IFasterContext.cs @@ -11,6 +11,25 @@ namespace FASTER.core /// public interface IFasterContext { + /// + /// Synchronously complete outstanding pending synchronous operations. + /// Async operations must be completed individually. + /// + /// Wait for all pending operations on session to complete + /// Spin-wait until ongoing commit/checkpoint, if any, completes + /// True if all pending operations have completed, false otherwise + bool CompletePending(bool wait = false, bool spinWaitForCommit = false); + + /// + /// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations. + /// Async operations must be completed individually. + /// + /// Outputs completed by this operation + /// Wait for all pending operations on session to complete + /// Spin-wait until ongoing commit/checkpoint, if any, completes + /// True if all pending operations have completed, false otherwise + bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false); + /// /// Read operation /// diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs index 292dad5fb..116c5d4d1 100644 --- a/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -72,27 +72,14 @@ public void SuspendThread() /// public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch; - /// - /// Synchronously complete outstanding pending synchronous operations. - /// Async operations must be completed individually. - /// - /// Wait for all pending operations on session to complete - /// Spin-wait until ongoing commit/checkpoint, if any, completes - /// True if all pending operations have completed, false otherwise + /// public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); - } + } - /// - /// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations. - /// Assumes epoch protection is managed by user. Async operations must be completed individually. - /// - /// Outputs completed by this operation - /// Wait for all pending operations on session to complete - /// Spin-wait until ongoing commit/checkpoint, if any, completes - /// True if all pending operations have completed, false otherwise + /// public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); diff --git a/cs/src/core/ClientSession/UnsafeContext.cs b/cs/src/core/ClientSession/UnsafeContext.cs index e8b15d887..6f0b7b01f 100644 --- a/cs/src/core/ClientSession/UnsafeContext.cs +++ b/cs/src/core/ClientSession/UnsafeContext.cs @@ -68,27 +68,14 @@ public void SuspendThread() /// public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch; - /// - /// Synchronously complete outstanding pending synchronous operations. - /// Async operations must be completed individually. - /// - /// Wait for all pending operations on session to complete - /// Spin-wait until ongoing commit/checkpoint, if any, completes - /// True if all pending operations have completed, false otherwise + /// public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); } - /// - /// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations. - /// Assumes epoch protection is managed by user. Async operations must be completed individually. - /// - /// Outputs completed by this operation - /// Wait for all pending operations on session to complete - /// Spin-wait until ongoing commit/checkpoint, if any, completes - /// True if all pending operations have completed, false otherwise + /// public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); From ed5d92d0759f83a05a58a730490ca1792c1c8125 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Sun, 27 Feb 2022 15:07:22 -0800 Subject: [PATCH 2/3] Skip LockEvictionObserver entirely if no LUCs active --- cs/src/core/Allocator/AllocatorBase.cs | 14 +++++++++++--- cs/src/core/Allocator/LockEvictionObserver.cs | 9 ++------- cs/src/core/Index/FASTER/FASTER.cs | 6 +++--- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 5a5a5d9c5..fc6a0bb70 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -113,7 +113,11 @@ public abstract partial class AllocatorBase : IDisposable /// HeadOFfset lag address /// internal long HeadOffsetLagAddress; - + + /// + /// Number of instances active. + /// + internal long NumActiveLockingSessions = 0; /// /// Log mutable fraction @@ -1276,8 +1280,12 @@ private void OnPagesClosed(long newSafeHeadAddress) long start = oldSafeHeadAddress > closePageAddress ? oldSafeHeadAddress : closePageAddress; long end = newSafeHeadAddress < closePageAddress + PageSize ? newSafeHeadAddress : closePageAddress + PageSize; - if (OnLockEvictionObserver != null) MemoryPageScan(start, end, OnLockEvictionObserver); - if (OnEvictionObserver != null) MemoryPageScan(start, end, OnEvictionObserver); + // If there are no active locking sessions, there should be no locks in the log. + if (this.NumActiveLockingSessions > 0 && OnLockEvictionObserver is not null) + MemoryPageScan(start, end, OnLockEvictionObserver); + + if (OnEvictionObserver is not null) + MemoryPageScan(start, end, OnEvictionObserver); if (newSafeHeadAddress < closePageAddress + PageSize) { diff --git a/cs/src/core/Allocator/LockEvictionObserver.cs b/cs/src/core/Allocator/LockEvictionObserver.cs index 05cdce36e..c3b60e1c5 100644 --- a/cs/src/core/Allocator/LockEvictionObserver.cs +++ b/cs/src/core/Allocator/LockEvictionObserver.cs @@ -1,20 +1,19 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. -using FASTER.core; using System; namespace FASTER.core { /// - /// Cache size tracker + /// Observer for page-lock evictions /// public class LockEvictionObserver : IObserver> { readonly FasterKV store; /// - /// Class to track and update cache size + /// Class to manage lock eviction transfers to LockTable /// /// FASTER store instance public LockEvictionObserver(FasterKV store) => this.store = store; @@ -25,10 +24,6 @@ public class LockEvictionObserver : IObserver public void OnNext(IFasterScanIterator iter) { - // If there are no active locking sessions, there should be no locks in the log. - if (this.store.NumActiveLockingSessions == 0) - return; - while (iter.GetNext(out RecordInfo info, out Key key, out Value value)) { // If it is not Invalid, we must Seal it so there is no possibility it will be missed while we're in the process diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index f5f681de8..a566bc876 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -65,14 +65,14 @@ public partial class FasterKV : FasterBase, internal readonly bool DisableLocking; internal readonly LockTable LockTable; - internal long NumActiveLockingSessions = 0; + internal long NumActiveLockingSessions => this.hlog.NumActiveLockingSessions; internal void IncrementNumLockingSessions() { _hybridLogCheckpoint.info.manualLockingActive = true; - Interlocked.Increment(ref this.NumActiveLockingSessions); + Interlocked.Increment(ref this.hlog.NumActiveLockingSessions); } - internal void DecrementNumLockingSessions() => --this.NumActiveLockingSessions; + internal void DecrementNumLockingSessions() => --this.hlog.NumActiveLockingSessions; /// /// Create FasterKV instance From 1fe16e270e147908ab94384575ca726805797eef Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Sun, 27 Feb 2022 15:57:28 -0800 Subject: [PATCH 3/3] Make Decrement interlocked; inline one accessor; revert CMakeLists.txt --- cc/CMakeLists.txt | 4 ++-- cs/src/core/Index/FASTER/FASTER.cs | 3 +-- cs/src/core/Index/Recovery/Checkpoint.cs | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cc/CMakeLists.txt b/cc/CMakeLists.txt index 4f92318b6..15d165af5 100644 --- a/cc/CMakeLists.txt +++ b/cc/CMakeLists.txt @@ -20,8 +20,8 @@ if (MSVC) set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /Od /RTC1 /MDd") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /O2 /Oi /Gy- /MD") - set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO") - set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:NOICF") + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:NOICF") else() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index a566bc876..0e38e845d 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -65,14 +65,13 @@ public partial class FasterKV : FasterBase, internal readonly bool DisableLocking; internal readonly LockTable LockTable; - internal long NumActiveLockingSessions => this.hlog.NumActiveLockingSessions; internal void IncrementNumLockingSessions() { _hybridLogCheckpoint.info.manualLockingActive = true; Interlocked.Increment(ref this.hlog.NumActiveLockingSessions); } - internal void DecrementNumLockingSessions() => --this.hlog.NumActiveLockingSessions; + internal void DecrementNumLockingSessions() => Interlocked.Decrement(ref this.hlog.NumActiveLockingSessions); /// /// Create FasterKV instance diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs index f2b2e94bf..8acfef667 100644 --- a/cs/src/core/Index/Recovery/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -84,7 +84,7 @@ internal void InitializeIndexCheckpoint(Guid indexToken) internal void InitializeHybridLogCheckpoint(Guid hybridLogToken, long version) { _hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager); - _hybridLogCheckpoint.info.manualLockingActive = this.NumActiveLockingSessions > 0; + _hybridLogCheckpoint.info.manualLockingActive = this.hlog.NumActiveLockingSessions > 0; } internal long Compact(IFunctions functions, CompactionFunctions compactionFunctions, long untilAddress, CompactionType compactionType, SessionVariableLengthStructSettings sessionVariableLengthStructSettings) where CompactionFunctions : ICompactionFunctions