Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C#] Add CompletePending[WithOutputs] to IFasterContext #664

Merged
merged 5 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ if (MSVC)
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /Od /RTC1 /MDd")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /O2 /Oi /Gy- /MD")

set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:NOICF")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:NOICF")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")

Expand Down
14 changes: 11 additions & 3 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ public abstract partial class AllocatorBase<Key, Value> : IDisposable
/// HeadOFfset lag address
/// </summary>
internal long HeadOffsetLagAddress;


/// <summary>
/// Number of <see cref="LockableUnsafeContext{Key, Value, Input, Output, Context, Functions}"/> instances active.
/// </summary>
internal long NumActiveLockingSessions = 0;

/// <summary>
/// Log mutable fraction
Expand Down Expand Up @@ -1276,8 +1280,12 @@ private void OnPagesClosed(long newSafeHeadAddress)
long start = oldSafeHeadAddress > closePageAddress ? oldSafeHeadAddress : closePageAddress;
long end = newSafeHeadAddress < closePageAddress + PageSize ? newSafeHeadAddress : closePageAddress + PageSize;

if (OnLockEvictionObserver != null) MemoryPageScan(start, end, OnLockEvictionObserver);
if (OnEvictionObserver != null) MemoryPageScan(start, end, OnEvictionObserver);
// If there are no active locking sessions, there should be no locks in the log.
if (this.NumActiveLockingSessions > 0 && OnLockEvictionObserver is not null)
MemoryPageScan(start, end, OnLockEvictionObserver);

if (OnEvictionObserver is not null)
MemoryPageScan(start, end, OnEvictionObserver);

if (newSafeHeadAddress < closePageAddress + PageSize)
{
Expand Down
9 changes: 2 additions & 7 deletions cs/src/core/Allocator/LockEvictionObserver.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using System;

namespace FASTER.core
{
/// <summary>
/// Cache size tracker
/// Observer for page-lock evictions
/// </summary>
public class LockEvictionObserver<Key, Value> : IObserver<IFasterScanIterator<Key, Value>>
{
readonly FasterKV<Key, Value> store;

/// <summary>
/// Class to track and update cache size
/// Class to manage lock eviction transfers to LockTable
/// </summary>
/// <param name="store">FASTER store instance</param>
public LockEvictionObserver(FasterKV<Key, Value> store) => this.store = store;
Expand All @@ -25,10 +24,6 @@ public class LockEvictionObserver<Key, Value> : IObserver<IFasterScanIterator<Ke
/// <param name="iter"></param>
public void OnNext(IFasterScanIterator<Key, Value> iter)
{
// If there are no active locking sessions, there should be no locks in the log.
if (this.store.NumActiveLockingSessions == 0)
return;

while (iter.GetNext(out RecordInfo info, out Key key, out Value value))
{
// If it is not Invalid, we must Seal it so there is no possibility it will be missed while we're in the process
Expand Down
21 changes: 4 additions & 17 deletions cs/src/core/ClientSession/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public LockableUnsafeContext<Key, Value, Input, Output, Context, Functions> GetL
return this.luContext;
}

#region IFasterOperations
#region IFasterContext
/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0)
Expand Down Expand Up @@ -462,7 +462,7 @@ public void Refresh()
UnsafeSuspendThread();
}

#endregion IFasterOperations
#endregion IFasterContext

#region Pending Operations

Expand All @@ -485,24 +485,11 @@ public IEnumerable<long> GetPendingRequests()
yield return val.serialNum;
}

/// <summary>
/// Synchronously complete outstanding pending synchronous operations.
/// Async operations must be completed individually.
/// </summary>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
=> CompletePending(false, wait, spinWaitForCommit);

/// <summary>
/// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations.
/// Async operations must be completed individually.
/// </summary>
/// <param name="completedOutputs">Outputs completed by this operation</param>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
{
InitializeCompletedOutputs();
Expand Down
19 changes: 19 additions & 0 deletions cs/src/core/ClientSession/IFasterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@ namespace FASTER.core
/// </summary>
public interface IFasterContext<Key, Value, Input, Output, Context>
{
/// <summary>
/// Synchronously complete outstanding pending synchronous operations.
/// Async operations must be completed individually.
/// </summary>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
bool CompletePending(bool wait = false, bool spinWaitForCommit = false);

/// <summary>
/// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations.
/// Async operations must be completed individually.
/// </summary>
/// <param name="completedOutputs">Outputs completed by this operation</param>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false);

/// <summary>
/// Read operation
/// </summary>
Expand Down
19 changes: 3 additions & 16 deletions cs/src/core/ClientSession/LockableUnsafeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,14 @@ public void SuspendThread()
/// </summary>
public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch;

/// <summary>
/// Synchronously complete outstanding pending synchronous operations.
/// Async operations must be completed individually.
/// </summary>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit);
}
}

/// <summary>
/// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations.
/// Assumes epoch protection is managed by user. Async operations must be completed individually.
/// </summary>
/// <param name="completedOutputs">Outputs completed by this operation</param>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
Expand Down
17 changes: 2 additions & 15 deletions cs/src/core/ClientSession/UnsafeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,14 @@ public void SuspendThread()
/// </summary>
public int LocalCurrentEpoch => clientSession.fht.epoch.LocalCurrentEpoch;

/// <summary>
/// Synchronously complete outstanding pending synchronous operations.
/// Async operations must be completed individually.
/// </summary>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
/// <inheritdoc/>
public bool CompletePending(bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit);
}

/// <summary>
/// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations.
/// Assumes epoch protection is managed by user. Async operations must be completed individually.
/// </summary>
/// <param name="completedOutputs">Outputs completed by this operation</param>
/// <param name="wait">Wait for all pending operations on session to complete</param>
/// <param name="spinWaitForCommit">Spin-wait until ongoing commit/checkpoint, if any, completes</param>
/// <returns>True if all pending operations have completed, false otherwise</returns>
/// <inheritdoc/>
public bool CompletePendingWithOutputs(out CompletedOutputIterator<Key, Value, Input, Output, Context> completedOutputs, bool wait = false, bool spinWaitForCommit = false)
{
Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected());
Expand Down
5 changes: 2 additions & 3 deletions cs/src/core/Index/FASTER/FASTER.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,13 @@ public partial class FasterKV<Key, Value> : FasterBase,

internal readonly bool DisableLocking;
internal readonly LockTable<Key> LockTable;
internal long NumActiveLockingSessions = 0;

internal void IncrementNumLockingSessions()
{
_hybridLogCheckpoint.info.manualLockingActive = true;
Interlocked.Increment(ref this.NumActiveLockingSessions);
Interlocked.Increment(ref this.hlog.NumActiveLockingSessions);
}
internal void DecrementNumLockingSessions() => --this.NumActiveLockingSessions;
internal void DecrementNumLockingSessions() => Interlocked.Decrement(ref this.hlog.NumActiveLockingSessions);

/// <summary>
/// Create FasterKV instance
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Index/Recovery/Checkpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ internal void InitializeIndexCheckpoint(Guid indexToken)
internal void InitializeHybridLogCheckpoint(Guid hybridLogToken, long version)
{
_hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager);
_hybridLogCheckpoint.info.manualLockingActive = this.NumActiveLockingSessions > 0;
_hybridLogCheckpoint.info.manualLockingActive = this.hlog.NumActiveLockingSessions > 0;
}

internal long Compact<T1, T2, T3, T4, CompactionFunctions>(IFunctions<Key, Value, object, object, object> functions, CompactionFunctions compactionFunctions, long untilAddress, CompactionType compactionType, SessionVariableLengthStructSettings<Value, object> sessionVariableLengthStructSettings) where CompactionFunctions : ICompactionFunctions<Key, Value>
Expand Down