Skip to content

Commit

Permalink
dispose task
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Apr 28, 2022
1 parent 6b55db9 commit f993c4b
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int requestCount, Can
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
oldestRequest.Tcs.TrySetResult(FailedLease);
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
{
_queueCount += oldestRequest.Count;
}
}
while (_options.QueueLimit - _queueCount < requestCount);
}
Expand Down Expand Up @@ -326,7 +329,7 @@ protected override void Dispose(bool disposing)
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
next.Tcs.SetResult(FailedLease);
next.Tcs.TrySetResult(FailedLease);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : Partition
// TODO: Look at ConcurrentDictionary to try and avoid a global lock
private Dictionary<TKey, Lazy<RateLimiter>> _limiters;
private bool _disposed;
private TaskCompletionSource<object?> _disposeComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);

// Used by the Timer to call TryRelenish on ReplenishingRateLimiters
// We use a separate list to avoid running TryReplenish (which might be user code) inside our lock
Expand Down Expand Up @@ -123,6 +124,7 @@ protected override void Dispose(bool disposing)

if (alreadyDisposed)
{
_disposeComplete.Task.GetAwaiter().GetResult();
return;
}

Expand All @@ -133,6 +135,7 @@ protected override void Dispose(bool disposing)
limiter.Value.Value.Dispose();
}
_limiters.Clear();
_disposeComplete.TrySetResult(null);
}

protected override async ValueTask DisposeAsyncCore()
Expand All @@ -144,6 +147,7 @@ protected override async ValueTask DisposeAsyncCore()

if (alreadyDisposed)
{
await _disposeComplete.Task.ConfigureAwait(false);
return;
}

Expand All @@ -152,6 +156,7 @@ protected override async ValueTask DisposeAsyncCore()
await limiter.Value.Value.DisposeAsync().ConfigureAwait(false);
}
_limiters.Clear();
_disposeComplete.TrySetResult(null);
}

// This handles the common state changes that Dispose and DisposeAsync need to do, the individual limiters still need to be Disposed after this call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int requestCount, Can
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
oldestRequest.Tcs.TrySetResult(FailedLease);
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
{
_queueCount += oldestRequest.Count;
}
}
while (_options.QueueLimit - _queueCount < requestCount);
}
Expand Down Expand Up @@ -325,7 +328,7 @@ protected override void Dispose(bool disposing)
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
next.Tcs.SetResult(FailedLease);
next.Tcs.TrySetResult(FailedLease);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,5 +690,57 @@ public void ReplenishingRateLimiterPropertiesHaveCorrectValues()
Assert.False(limiter2.IsAutoReplenishing);
Assert.Equal(replenishPeriod, limiter2.ReplenishmentPeriod);
}

[Fact]
public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest()
{
var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
TimeSpan.Zero, autoReplenishment: false));
var lease = limiter.Acquire(2);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.WaitAsync(1, cts.Token);

// Add another item to queue, will be completed as failed later when we queue another item
var wait2 = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

cts.Cancel();
var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

lease.Dispose();

var wait3 = limiter.WaitAsync(2);
Assert.False(wait3.IsCompleted);

// will be kicked by wait3 because we're using NewestFirst
lease = await wait2;
Assert.False(lease.IsAcquired);

limiter.TryReplenish();
lease = await wait3;
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task CanDisposeAfterCancelingQueuedRequest()
{
var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
TimeSpan.Zero, autoReplenishment: false));
var lease = limiter.Acquire(1);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.WaitAsync(1, cts.Token);

cts.Cancel();
var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

// Make sure dispose doesn't have any side-effects when dealing with a canceled queued item
limiter.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -713,5 +713,58 @@ public void ReplenishingRateLimiterPropertiesHaveCorrectValues()
Assert.False(limiter2.IsAutoReplenishing);
Assert.Equal(replenishPeriod, limiter2.ReplenishmentPeriod);
}

[Fact]
public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest()
{
var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
TimeSpan.Zero, 2, autoReplenishment: false));
var lease = limiter.Acquire(2);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.WaitAsync(1, cts.Token);

// Add another item to queue, will be completed as failed later when we queue another item
var wait2 = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

cts.Cancel();
var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

lease.Dispose();
limiter.TryReplenish();

var wait3 = limiter.WaitAsync(2);
Assert.False(wait3.IsCompleted);

// will be kicked by wait3 because we're using NewestFirst
lease = await wait2;
Assert.False(lease.IsAcquired);

limiter.TryReplenish();
lease = await wait3;
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task CanDisposeAfterCancelingQueuedRequest()
{
var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
TimeSpan.Zero, 2, autoReplenishment: false));
var lease = limiter.Acquire(1);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.WaitAsync(1, cts.Token);

cts.Cancel();
var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

// Make sure dispose doesn't have any side-effects when dealing with a canceled queued item
limiter.Dispose();
}
}
}

0 comments on commit f993c4b

Please sign in to comment.