Skip to content

Commit

Permalink
CSHARP-3906: Switch ConnectionPool background task to a dedicated thr…
Browse files Browse the repository at this point in the history
…ead. (#739)

CSHARP-3906: Switch ConnectionPool background task to a dedicated thread.
  • Loading branch information
DmitryLukyanov authored Mar 4, 2022
1 parent 3b7d210 commit 8c53008
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ public void ThrowIfNotInitialized()
internal sealed class MaintenanceHelper : IDisposable
{
private CancellationTokenSource _cancellationTokenSource = null;
private Func<CancellationToken, Task> _maintenanceTaskCreator;
private Task _maintenanceTask;
private readonly Action<CancellationToken> _maintenanceAction;
private Thread _maintenanceThread;
private readonly TimeSpan _interval;

public MaintenanceHelper(Func<CancellationToken, Task> maintenanceTaskCreator, TimeSpan interval)
public MaintenanceHelper(Action<CancellationToken> maintenanceAction, TimeSpan interval)
{
_interval = interval;
_maintenanceTaskCreator = Ensure.IsNotNull(maintenanceTaskCreator, nameof(maintenanceTaskCreator));
_maintenanceAction = Ensure.IsNotNull(maintenanceAction, nameof(maintenanceAction));
}

public bool IsRunning => _maintenanceTask != null;
public bool IsRunning => _maintenanceThread != null;

public void Cancel()
{
Expand All @@ -185,7 +185,8 @@ public void Cancel()

CancelAndDispose();
_cancellationTokenSource = null;
_maintenanceTask = null;
_maintenanceThread = null;
// the previous _maintenanceThread might not be stopped yet, but it will be soon
}

public void Start()
Expand All @@ -199,8 +200,13 @@ public void Start()
_cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = _cancellationTokenSource.Token;

_maintenanceTask = Task.Run(() => _maintenanceTaskCreator(cancellationToken), cancellationToken);
_maintenanceTask.ConfigureAwait(false);
_maintenanceThread = new Thread(new ParameterizedThreadStart(ThreadStart)) { IsBackground = true };
_maintenanceThread.Start(cancellationToken);

void ThreadStart(object cancellationToken)
{
_maintenanceAction((CancellationToken)cancellationToken);
}
}

public void Dispose()
Expand Down Expand Up @@ -867,12 +873,12 @@ public ConnectionCreator(ExclusiveConnectionPool pool, TimeSpan connectingTimeou
_stopwatch = null;
}

public async Task<PooledConnection> CreateOpenedAsync(CancellationToken cancellationToken)
public PooledConnection CreateOpened(CancellationToken cancellationToken)
{
try
{
var stopwatch = Stopwatch.StartNew();
_connectingWaitStatus = await _pool._maxConnectingQueue.WaitAsync(_connectingTimeout, cancellationToken).ConfigureAwait(false);
_connectingWaitStatus = _pool._maxConnectingQueue.Wait(_connectingTimeout, cancellationToken);
stopwatch.Stop();

_pool._poolState.ThrowIfNotReady();
Expand All @@ -882,7 +888,7 @@ public async Task<PooledConnection> CreateOpenedAsync(CancellationToken cancella
_pool.CreateTimeoutException(stopwatch, $"Timed out waiting for in connecting queue after {stopwatch.ElapsedMilliseconds}ms.");
}

var connection = await CreateOpenedInternalAsync(cancellationToken).ConfigureAwait(false);
var connection = CreateOpenedInternal(cancellationToken);
return connection;
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ExclusiveConnectionPool(
_connectionExceptionHandler = Ensure.IsNotNull(connectionExceptionHandler, nameof(connectionExceptionHandler));
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));

_maintenanceHelper = new MaintenanceHelper(token => MaintainSizeAsync(token), _settings.MaintenanceInterval);
_maintenanceHelper = new MaintenanceHelper(MaintainSize, _settings.MaintenanceInterval);
_poolState = new PoolState(EndPointHelper.ToString(_endPoint));
_checkOutReasonCounter = new CheckOutReasonCounter();

Expand Down Expand Up @@ -291,7 +291,7 @@ public int GetGeneration(ObjectId? serviceId)
}

// private methods
private async Task MaintainSizeAsync(CancellationToken cancellationToken)
private void MaintainSize(CancellationToken cancellationToken)
{
try
{
Expand All @@ -300,13 +300,13 @@ private async Task MaintainSizeAsync(CancellationToken cancellationToken)
try
{
_connectionHolder.Prune(cancellationToken);
await EnsureMinSizeAsync(cancellationToken).ConfigureAwait(false);
EnsureMinSize(cancellationToken);
}
catch
{
// ignore exceptions
}
await Task.Delay(_settings.MaintenanceInterval, cancellationToken).ConfigureAwait(false);
ThreadHelper.Sleep(_settings.MaintenanceInterval, cancellationToken);
}
}
catch
Expand All @@ -315,23 +315,23 @@ private async Task MaintainSizeAsync(CancellationToken cancellationToken)
}
}

private async Task EnsureMinSizeAsync(CancellationToken cancellationToken)
private void EnsureMinSize(CancellationToken cancellationToken)
{
var minTimeout = TimeSpan.FromMilliseconds(20);

while (CreatedCount < _settings.MinConnections && !cancellationToken.IsCancellationRequested)
{
using (var poolAwaiter = _maxConnectionsQueue.CreateAwaiter())
{
var entered = await poolAwaiter.WaitSignaledAsync(minTimeout, cancellationToken).ConfigureAwait(false);
var entered = poolAwaiter.WaitSignaled(minTimeout, cancellationToken);
if (!entered)
{
return;
}

using (var connectionCreator = new ConnectionCreator(this, minTimeout))
{
var connection = await connectionCreator.CreateOpenedAsync(cancellationToken).ConfigureAwait(false);
var connection = connectionCreator.CreateOpened(cancellationToken);
_connectionHolder.Return(connection);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/MongoDB.Driver.Core/Core/Misc/SemaphoreSlimSignalable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public SemaphoreSlimSignalableAwaiter(SemaphoreSlimSignalable semaphoreSlimSigna
_enteredSemaphore = false;
}

public async Task<bool> WaitSignaledAsync(TimeSpan timeout, CancellationToken cancellationToken)
public bool WaitSignaled(TimeSpan timeout, CancellationToken cancellationToken)
{
var waitResult = await _semaphoreSlimSignalable.WaitSignaledAsync(timeout, cancellationToken).ConfigureAwait(false);
var waitResult = _semaphoreSlimSignalable.WaitSignaled(timeout, cancellationToken);
_enteredSemaphore = waitResult == SemaphoreWaitResult.Entered;
return _enteredSemaphore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,8 @@ public void Maintenance_should_call_connection_dispose_when_connection_authentic
var authenticationException = new MongoAuthenticationException(new ConnectionId(_serverId), "test message");
var authenticationFailedConnection = new Mock<IConnection>();
authenticationFailedConnection
.Setup(c => c.OpenAsync(It.IsAny<CancellationToken>())) // an authentication exception is thrown from _connectionInitializer.InitializeConnection
// that in turn is called from OpenAsync
.Setup(c => c.Open(It.IsAny<CancellationToken>())) // an authentication exception is thrown from _connectionInitializer.InitializeConnection
// that in turn is called from OpenAsync
.Throws(authenticationException);

_mockConnectionExceptionHandler
Expand All @@ -1144,32 +1144,6 @@ public void Maintenance_should_call_connection_dispose_when_connection_authentic
}
}

[Fact]
public void MaintainSizeAsync_should_not_try_new_attempt_after_failing_without_delay()
{
var settings = _settings.With(maintenanceInterval: TimeSpan.FromSeconds(10));

using (var subject = CreateSubject(settings))
{
var tokenSource = new CancellationTokenSource();
_mockConnectionFactory
.SetupSequence(f => f.CreateConnection(_serverId, _endPoint))
.Throws<Exception>() // failed attempt
.Returns(() => // successful attempt which should be delayed
{
// break the loop. With this line the MaintainSizeAsync will contain only 2 iterations
tokenSource.Cancel();
return new MockConnection(_serverId);
});

var testResult = Task.WaitAny(
subject.MaintainSizeAsync(tokenSource.Token), // if this task is completed first, it will mean that there was no delay (10 sec)
Task.Delay(TimeSpan.FromSeconds(1))); // time to be sure that delay is happening,
// if the method is running more than 1 second, then delay is happening
testResult.Should().Be(1);
}
}

[Theory]
[ParameterAttributeData]
public void Maintenance_should_run_with_finite_maintenanceInterval(
Expand Down Expand Up @@ -1655,11 +1629,6 @@ private void InitializeAndWait(ExclusiveConnectionPool pool = null, ConnectionPo

internal static class ExclusiveConnectionPoolReflector
{
public static Task MaintainSizeAsync(this ExclusiveConnectionPool obj, CancellationToken cancellationToken)
{
return (Task)Reflector.Invoke(obj, nameof(MaintainSizeAsync), cancellationToken);
}

public static int _waitQueueFreeSlots(this ExclusiveConnectionPool obj)
{
return (int)Reflector.GetFieldValue(obj, nameof(_waitQueueFreeSlots));
Expand Down

0 comments on commit 8c53008

Please sign in to comment.