From 9c37a3b3eb6d955d54865a2f9edf557620ab7fa8 Mon Sep 17 00:00:00 2001 From: Brennan Date: Fri, 29 Apr 2022 12:25:08 -0700 Subject: [PATCH] Add Fixed and Sliding Window RateLimiters + PartitionedRateLimiter.Create (#68695) * Add RL non-generic fixed window, sliding window implementations (#68087) * Add initial impl of PartitionedRateLimiter.Create (#67677) Co-authored-by: Shreya Verma --- .../ref/System.Threading.RateLimiting.cs | 64 ++ .../src/System.Threading.RateLimiting.csproj | 12 +- .../RateLimiting/ConcurrencyLimiter.cs | 8 +- .../RateLimiting/FixedWindowRateLimiter.cs | 426 ++++++++++ .../FixedWindowRateLimiterOptions.cs | 76 ++ .../Threading/RateLimiting/NoopLimiter.cs | 39 + .../RateLimiting/PartitionedRateLimiter.cs | 230 ++++++ .../RateLimiting/RateLimitPartition.T.cs | 30 + .../RateLimiting/RateLimitPartition.cs | 78 ++ .../RateLimiting/SlidingWindowRateLimiter.cs | 425 ++++++++++ .../SlidingWindowRateLimiterOptions.cs | 88 ++ .../Threading/RateLimiting/TimerAwaitable.cs | 136 ++++ .../RateLimiting/TokenBucketRateLimiter.cs | 8 +- .../tests/BaseRateLimiterTests.cs | 6 + .../tests/ConcurrencyLimiterTests.cs | 49 ++ .../tests/FixedWindowRateLimiterTests.cs | 746 +++++++++++++++++ .../tests/PartitionedRateLimiterTests.cs | 466 ++++++++++- .../tests/RateLimiterPartitionTests.cs | 83 ++ .../tests/SlidingWindowRateLimiterTests.cs | 770 ++++++++++++++++++ ...System.Threading.RateLimiting.Tests.csproj | 6 + .../tests/TokenBucketRateLimiterTests.cs | 52 ++ 21 files changed, 3792 insertions(+), 6 deletions(-) create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs create mode 100644 src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs create mode 100644 src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs create mode 100644 src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs diff --git a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs index 13e779c61ca79..9b3419ab6a226 100644 --- a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs +++ b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs @@ -40,6 +40,10 @@ public MetadataName(string name) { } public static bool operator !=(System.Threading.RateLimiting.MetadataName left, System.Threading.RateLimiting.MetadataName right) { throw null; } public override string ToString() { throw null; } } + public static partial class PartitionedRateLimiter + { + public static System.Threading.RateLimiting.PartitionedRateLimiter Create(System.Func> partitioner, System.Collections.Generic.IEqualityComparer? equalityComparer = null) where TPartitionKey : notnull { throw null; } + } public abstract partial class PartitionedRateLimiter : System.IAsyncDisposable, System.IDisposable { protected PartitionedRateLimiter() { } @@ -83,6 +87,21 @@ protected virtual void Dispose(bool disposing) { } public abstract bool TryGetMetadata(string metadataName, out object? metadata); public bool TryGetMetadata(System.Threading.RateLimiting.MetadataName metadataName, [System.Diagnostics.CodeAnalysis.MaybeNullAttribute] out T metadata) { throw null; } } + public static partial class RateLimitPartition + { + public static System.Threading.RateLimiting.RateLimitPartition CreateConcurrencyLimiter(TKey partitionKey, System.Func factory) { throw null; } + public static System.Threading.RateLimiting.RateLimitPartition CreateNoLimiter(TKey partitionKey) { throw null; } + public static System.Threading.RateLimiting.RateLimitPartition CreateTokenBucketLimiter(TKey partitionKey, System.Func factory) { throw null; } + public static System.Threading.RateLimiting.RateLimitPartition Create(TKey partitionKey, System.Func factory) { throw null; } + } + public partial struct RateLimitPartition + { + private readonly TKey _PartitionKey_k__BackingField; + private object _dummy; + private int _dummyPrimitive; + public RateLimitPartition(TKey partitionKey, System.Func factory) { throw null; } + public readonly TKey PartitionKey { get { throw null; } } + } public abstract partial class ReplenishingRateLimiter : System.Threading.RateLimiting.RateLimiter { protected ReplenishingRateLimiter() { } @@ -113,4 +132,49 @@ public TokenBucketRateLimiterOptions(int tokenLimit, System.Threading.RateLimiti public int TokenLimit { get { throw null; } } public int TokensPerPeriod { get { throw null; } } } + public sealed partial class SlidingWindowRateLimiter : System.Threading.RateLimiting.ReplenishingRateLimiter + { + public SlidingWindowRateLimiter(System.Threading.RateLimiting.SlidingWindowRateLimiterOptions options) { } + public override System.TimeSpan? IdleDuration { get { throw null; } } + public override bool IsAutoReplenishing { get { throw null; } } + public override System.TimeSpan ReplenishmentPeriod { get { throw null; } } + protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int requestCount) { throw null; } + protected override void Dispose(bool disposing) { } + protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + public override int GetAvailablePermits() { throw null; } + public override bool TryReplenish() { throw null; } + protected override System.Threading.Tasks.ValueTask WaitAsyncCore(int requestCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } + public sealed partial class SlidingWindowRateLimiterOptions + { + public SlidingWindowRateLimiterOptions(int permitLimit, System.Threading.RateLimiting.QueueProcessingOrder queueProcessingOrder, int queueLimit, System.TimeSpan window, int segmentsPerWindow, bool autoReplenishment = true) { } + public bool AutoReplenishment { get { throw null; } } + public int QueueLimit { get { throw null; } } + public System.Threading.RateLimiting.QueueProcessingOrder QueueProcessingOrder { get { throw null; } } + public System.TimeSpan Window { get { throw null; } } + public int PermitLimit { get { throw null; } } + public int SegmentsPerWindow { get { throw null; } } + } + public sealed partial class FixedWindowRateLimiter : System.Threading.RateLimiting.ReplenishingRateLimiter + { + public FixedWindowRateLimiter(System.Threading.RateLimiting.FixedWindowRateLimiterOptions options) { } + public override System.TimeSpan? IdleDuration { get { throw null; } } + public override bool IsAutoReplenishing { get { throw null; } } + public override System.TimeSpan ReplenishmentPeriod { get { throw null; } } + protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int requestCount) { throw null; } + protected override void Dispose(bool disposing) { } + protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + public override int GetAvailablePermits() { throw null; } + public override bool TryReplenish() { throw null; } + protected override System.Threading.Tasks.ValueTask WaitAsyncCore(int requestCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } + public sealed partial class FixedWindowRateLimiterOptions + { + public FixedWindowRateLimiterOptions(int permitLimit, System.Threading.RateLimiting.QueueProcessingOrder queueProcessingOrder, int queueLimit, System.TimeSpan window, bool autoReplenishment = true) { } + public bool AutoReplenishment { get { throw null; } } + public int QueueLimit { get { throw null; } } + public System.Threading.RateLimiting.QueueProcessingOrder QueueProcessingOrder { get { throw null; } } + public System.TimeSpan Window { get { throw null; } } + public int PermitLimit { get { throw null; } } + } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj index a87e55873f761..c3f39a246b69f 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj +++ b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj @@ -1,4 +1,4 @@ - + $(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) enable @@ -16,13 +16,22 @@ System.Threading.RateLimiting.RateLimitLease + + + + + + + + + @@ -30,6 +39,7 @@ System.Threading.RateLimiting.RateLimitLease + diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index a733ea04e30b4..b50db111afd68 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -112,7 +112,11 @@ protected override ValueTask WaitAsyncCore(int permitCount, Canc RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - oldestRequest.Tcs.TrySetResult(FailedLease); + if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + { + // Updating queue count is handled by the cancellation code + _queueCount += oldestRequest.Count; + } } while (_options.QueueLimit - _queueCount < permitCount); } @@ -249,7 +253,7 @@ protected override void Dispose(bool disposing) ? _queue.DequeueHead() : _queue.DequeueTail(); next.CancellationTokenRegistration.Dispose(); - next.Tcs.SetResult(FailedLease); + next.Tcs.TrySetResult(FailedLease); } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs new file mode 100644 index 0000000000000..dcef507ea9455 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -0,0 +1,426 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// implementation that refreshes allowed permits in a window periodically. + /// + public sealed class FixedWindowRateLimiter : ReplenishingRateLimiter + { + private int _requestCount; + private int _queueCount; + private long _lastReplenishmentTick; + private long? _idleSince; + private bool _disposed; + + private readonly Timer? _renewTimer; + private readonly FixedWindowRateLimiterOptions _options; + private readonly Deque _queue = new Deque(); + + private object Lock => _queue; + + private static readonly RateLimitLease SuccessfulLease = new FixedWindowLease(true, null); + private static readonly RateLimitLease FailedLease = new FixedWindowLease(false, null); + private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency; + + /// + public override TimeSpan? IdleDuration => _idleSince is null ? null : new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency)); + + /// + public override bool IsAutoReplenishing => _options.AutoReplenishment; + + /// + public override TimeSpan ReplenishmentPeriod => _options.Window; + + /// + /// Initializes the . + /// + /// Options to specify the behavior of the . + public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + _requestCount = options.PermitLimit; + + _idleSince = _lastReplenishmentTick = Stopwatch.GetTimestamp(); + + if (_options.AutoReplenishment) + { + _renewTimer = new Timer(Replenish, this, _options.Window, _options.Window); + } + } + + /// + public override int GetAvailablePermits() => _requestCount; + + /// + protected override RateLimitLease AcquireCore(int requestCount) + { + // These amounts of resources can never be acquired + // Raises a PermitLimitExceeded ArgumentOutOFRangeException + if (requestCount > _options.PermitLimit) + { + throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit)); + } + + // Return SuccessfulLease or FailedLease depending to indicate limiter state + if (requestCount == 0 && !_disposed) + { + // Check if the requests are permitted in a window + // Requests will be allowed if the total served request is less than the max allowed requests (permit limit). + if (_requestCount > 0) + { + return SuccessfulLease; + } + + return CreateFailedWindowLease(requestCount); + } + + lock (Lock) + { + if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease)) + { + return lease; + } + + return CreateFailedWindowLease(requestCount); + } + } + + /// + protected override ValueTask WaitAsyncCore(int requestCount, CancellationToken cancellationToken = default) + { + // These amounts of resources can never be acquired + if (requestCount > _options.PermitLimit) + { + throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit)); + } + + ThrowIfDisposed(); + + // Return SuccessfulAcquisition if requestCount is 0 and resources are available + if (requestCount == 0 && _requestCount > 0) + { + return new ValueTask(SuccessfulLease); + } + + lock (Lock) + { + if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease)) + { + return new ValueTask(lease); + } + + // Avoid integer overflow by using subtraction instead of addition + Debug.Assert(_options.QueueLimit >= _queueCount); + if (_options.QueueLimit - _queueCount < requestCount) + { + if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && requestCount <= _options.QueueLimit) + { + // remove oldest items from queue until there is space for the newest acquisition request + do + { + RequestRegistration oldestRequest = _queue.DequeueHead(); + _queueCount -= oldestRequest.Count; + Debug.Assert(_queueCount >= 0); + if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + { + _queueCount += oldestRequest.Count; + } + } + while (_options.QueueLimit - _queueCount < requestCount); + } + else + { + // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst + return new ValueTask(CreateFailedWindowLease(requestCount)); + } + } + + CancelQueueState tcs = new CancelQueueState(requestCount, this, cancellationToken); + CancellationTokenRegistration ctr = default; + if (cancellationToken.CanBeCanceled) + { + ctr = cancellationToken.Register(static obj => + { + ((CancelQueueState)obj!).TrySetCanceled(); + }, tcs); + } + + RequestRegistration registration = new RequestRegistration(requestCount, tcs, ctr); + _queue.EnqueueTail(registration); + _queueCount += requestCount; + Debug.Assert(_queueCount <= _options.QueueLimit); + + return new ValueTask(registration.Tcs.Task); + } + } + + private RateLimitLease CreateFailedWindowLease(int requestCount) + { + int replenishAmount = requestCount - _requestCount + _queueCount; + // can't have 0 replenish window, that would mean it should be a successful lease + int replenishWindow = Math.Max(replenishAmount / _options.PermitLimit, 1); + + return new FixedWindowLease(false, TimeSpan.FromTicks(_options.Window.Ticks * replenishWindow)); + } + + private bool TryLeaseUnsynchronized(int requestCount, [NotNullWhen(true)] out RateLimitLease? lease) + { + ThrowIfDisposed(); + + // if permitCount is 0 we want to queue it if there are no available permits + if (_requestCount >= requestCount && _requestCount != 0) + { + if (requestCount == 0) + { + // Edge case where the check before the lock showed 0 available permit counters but when we got the lock, some permits were now available + lease = SuccessfulLease; + return true; + } + + // a. If there are no items queued we can lease + // b. If there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest + if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst)) + { + _idleSince = null; + _requestCount -= requestCount; + Debug.Assert(_requestCount >= 0); + lease = SuccessfulLease; + return true; + } + } + + lease = null; + return false; + } + + /// + /// Attempts to replenish request counters in the window. + /// + /// + /// False if is enabled, otherwise true. + /// Does not reflect if counters were replenished. + /// + public override bool TryReplenish() + { + if (_options.AutoReplenishment) + { + return false; + } + Replenish(this); + return true; + } + + private static void Replenish(object? state) + { + FixedWindowRateLimiter limiter = (state as FixedWindowRateLimiter)!; + Debug.Assert(limiter is not null); + + // Use Stopwatch instead of DateTime.UtcNow to avoid issues on systems where the clock can change + long nowTicks = Stopwatch.GetTimestamp(); + limiter!.ReplenishInternal(nowTicks); + } + + // Used in tests that test behavior with specific time intervals + private void ReplenishInternal(long nowTicks) + { + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes + lock (Lock) + { + if (_disposed) + { + return; + } + + if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks) + { + return; + } + + _lastReplenishmentTick = nowTicks; + + int availableRequestCounters = _requestCount; + int maxPermits = _options.PermitLimit; + int resourcesToAdd; + + if (availableRequestCounters < maxPermits) + { + resourcesToAdd = maxPermits - availableRequestCounters; + } + else + { + // All counters available, nothing to do + return; + } + + _requestCount += resourcesToAdd; + Debug.Assert(_requestCount == _options.PermitLimit); + + // Process queued requests + while (_queue.Count > 0) + { + RequestRegistration nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.PeekHead() + : _queue.PeekTail(); + + if (_requestCount >= nextPendingRequest.Count) + { + // Request can be fulfilled + nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + + _queueCount -= nextPendingRequest.Count; + _requestCount -= nextPendingRequest.Count; + Debug.Assert(_requestCount >= 0); + + if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + { + // Queued item was canceled so add count back + _requestCount += nextPendingRequest.Count; + // Updating queue count is handled by the cancellation code + _queueCount += nextPendingRequest.Count; + } + nextPendingRequest.CancellationTokenRegistration.Dispose(); + Debug.Assert(_queueCount >= 0); + } + else + { + // Request cannot be fulfilled + break; + } + } + + if (_requestCount == _options.PermitLimit) + { + Debug.Assert(_idleSince is null); + Debug.Assert(_queueCount == 0); + _idleSince = Stopwatch.GetTimestamp(); + } + } + } + + protected override void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + lock (Lock) + { + if (_disposed) + { + return; + } + _disposed = true; + _renewTimer?.Dispose(); + while (_queue.Count > 0) + { + RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + next.CancellationTokenRegistration.Dispose(); + next.Tcs.TrySetResult(FailedLease); + } + } + } + + protected override ValueTask DisposeAsyncCore() + { + Dispose(true); + + return default; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(FixedWindowRateLimiter)); + } + } + + private sealed class FixedWindowLease : RateLimitLease + { + private static readonly string[] s_allMetadataNames = new[] { MetadataName.RetryAfter.Name }; + + private readonly TimeSpan? _retryAfter; + + public FixedWindowLease(bool isAcquired, TimeSpan? retryAfter) + { + IsAcquired = isAcquired; + _retryAfter = retryAfter; + } + + public override bool IsAcquired { get; } + + public override IEnumerable MetadataNames => s_allMetadataNames; + + public override bool TryGetMetadata(string metadataName, out object? metadata) + { + if (metadataName == MetadataName.RetryAfter.Name && _retryAfter.HasValue) + { + metadata = _retryAfter.Value; + return true; + } + + metadata = default; + return false; + } + } + + private readonly struct RequestRegistration + { + public RequestRegistration(int requestCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + { + Count = requestCount; + // Use VoidAsyncOperationWithData instead + Tcs = tcs; + CancellationTokenRegistration = cancellationTokenRegistration; + } + + public int Count { get; } + + public TaskCompletionSource Tcs { get; } + + public CancellationTokenRegistration CancellationTokenRegistration { get; } + } + + private sealed class CancelQueueState : TaskCompletionSource + { + private readonly int _requestCount; + private readonly FixedWindowRateLimiter _limiter; + private readonly CancellationToken _cancellationToken; + + public CancelQueueState(int requestCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(TaskCreationOptions.RunContinuationsAsynchronously) + { + _requestCount = requestCount; + _limiter = limiter; + _cancellationToken = cancellationToken; + } + + public new bool TrySetCanceled() + { + if (TrySetCanceled(_cancellationToken)) + { + lock (_limiter.Lock) + { + _limiter._queueCount -= _requestCount; + } + return true; + } + return false; + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs new file mode 100644 index 0000000000000..0b8693b479230 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs @@ -0,0 +1,76 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Options to specify the behavior of a . + /// + public sealed class FixedWindowRateLimiterOptions + { + /// + /// Initializes the . + /// + /// Maximum number of requests that can be served in the window. + /// + /// Maximum number of unprocessed request counters waiting via . + /// + /// Specifies how often request counters can be replenished. Replenishing is triggered either by an internal timer if is true, or by calling . + /// + /// + /// Specifies whether request replenishment will be handled by the or by another party via . + /// + /// When or are less than 0. + public FixedWindowRateLimiterOptions( + int permitLimit, + QueueProcessingOrder queueProcessingOrder, + int queueLimit, + TimeSpan window, + bool autoReplenishment = true) + { + if (permitLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(permitLimit)); + } + if (queueLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(queueLimit)); + } + + PermitLimit = permitLimit; + QueueProcessingOrder = queueProcessingOrder; + QueueLimit = queueLimit; + Window = window; + AutoReplenishment = autoReplenishment; + } + + /// + /// Specifies the time window that takes in the requests. + /// + public TimeSpan Window { get; } + + /// + /// Specified whether the is automatically refresh counters or if someone else + /// will be calling to refresh counters. + /// + public bool AutoReplenishment { get; } + + /// + /// Maximum number of permit counters that can be allowed in a window. + /// + public int PermitLimit { get; } + + /// + /// Determines the behaviour of when not enough resources can be leased. + /// + /// + /// by default. + /// + public QueueProcessingOrder QueueProcessingOrder { get; } + + /// + /// Maximum cumulative permit count of queued acquisition requests. + /// + public int QueueLimit { get; } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs new file mode 100644 index 0000000000000..064e2211c8910 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs @@ -0,0 +1,39 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + internal sealed class NoopLimiter : RateLimiter + { + private static readonly RateLimitLease _lease = new NoopLease(); + + private NoopLimiter() { } + + public static NoopLimiter Instance { get; } = new NoopLimiter(); + + public override TimeSpan? IdleDuration => null; + + public override int GetAvailablePermits() => int.MaxValue; + + protected override RateLimitLease AcquireCore(int permitCount) => _lease; + + protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) + => new ValueTask(_lease); + + private sealed class NoopLease : RateLimitLease + { + public override bool IsAcquired => true; + + public override IEnumerable MetadataNames => Array.Empty(); + + public override bool TryGetMetadata(string metadataName, out object? metadata) + { + metadata = null; + return false; + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs new file mode 100644 index 0000000000000..9fb634ddaddc3 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs @@ -0,0 +1,230 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// Contains methods to assist with creating a . + /// + public static class PartitionedRateLimiter + { + /// + /// Method used to create a default implementation of . + /// + /// The resource type that is being rate limited. + /// The type to distinguish partitions with. + /// Method called every time an Acquire or WaitAsync call is made to figure out what rate limiter to apply to the request. + /// If the matches a cached entry then the rate limiter previously used for that key is used. Otherwise, the factory is called to get a new rate limiter. + /// Optional to customize the comparison logic for . + /// + public static PartitionedRateLimiter Create( + Func> partitioner, + IEqualityComparer? equalityComparer = null) where TPartitionKey : notnull + { + return new DefaultPartitionedRateLimiter(partitioner, equalityComparer); + } + } + + internal sealed class DefaultPartitionedRateLimiter : PartitionedRateLimiter where TKey : notnull + { + private readonly Func> _partitioner; + + // TODO: Look at ConcurrentDictionary to try and avoid a global lock + private Dictionary> _limiters; + private bool _disposed; + private TaskCompletionSource _disposeComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); + + // Used by the Timer to call TryRelenish on ReplenishingRateLimiters + // We use a separate list to avoid running TryReplenish (which might be user code) inside our lock + // And we cache the list to amortize the allocation cost to as close to 0 as we can get + private List> _cachedLimiters = new(); + private bool _cacheInvalid; + private TimerAwaitable _timer; + private Task _timerTask; + + // Use the Dictionary as the lock field so we don't need to allocate another object for a lock and have another field in the object + private object Lock => _limiters; + + public DefaultPartitionedRateLimiter(Func> partitioner, + IEqualityComparer? equalityComparer = null) + { + _limiters = new Dictionary>(equalityComparer); + _partitioner = partitioner; + + // TODO: Figure out what interval we should use + _timer = new TimerAwaitable(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100)); + _timerTask = RunTimer(); + } + + private async Task RunTimer() + { + _timer.Start(); + while (await _timer) + { + try + { + Replenish(this); + } + // TODO: Can we log to EventSource or somewhere? Maybe dispatch throwing the exception so it is at least an unhandled exception? + catch { } + } + _timer.Dispose(); + } + + public override int GetAvailablePermits(TResource resourceID) + { + return GetRateLimiter(resourceID).GetAvailablePermits(); + } + + protected override RateLimitLease AcquireCore(TResource resourceID, int permitCount) + { + return GetRateLimiter(resourceID).Acquire(permitCount); + } + + protected override ValueTask WaitAsyncCore(TResource resourceID, int permitCount, CancellationToken cancellationToken) + { + return GetRateLimiter(resourceID).WaitAsync(permitCount, cancellationToken); + } + + private RateLimiter GetRateLimiter(TResource resourceID) + { + RateLimitPartition partition = _partitioner(resourceID); + Lazy? limiter; + lock (Lock) + { + ThrowIfDisposed(); + if (!_limiters.TryGetValue(partition.PartitionKey, out limiter)) + { + // Using Lazy avoids calling user code (partition.Factory) inside the lock + limiter = new Lazy(() => partition.Factory(partition.PartitionKey)); + _limiters.Add(partition.PartitionKey, limiter); + // Cache is invalid now + _cacheInvalid = true; + } + } + return limiter.Value; + } + + protected override void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + bool alreadyDisposed = CommonDispose(); + + _timerTask.GetAwaiter().GetResult(); + _cachedLimiters.Clear(); + + if (alreadyDisposed) + { + _disposeComplete.Task.GetAwaiter().GetResult(); + return; + } + + // Safe to access _limiters outside the lock + // The timer is no longer running and _disposed is set so anyone trying to access fields will be checking that first + foreach (KeyValuePair> limiter in _limiters) + { + limiter.Value.Value.Dispose(); + } + _limiters.Clear(); + _disposeComplete.TrySetResult(null); + } + + protected override async ValueTask DisposeAsyncCore() + { + bool alreadyDisposed = CommonDispose(); + + await _timerTask.ConfigureAwait(false); + _cachedLimiters.Clear(); + + if (alreadyDisposed) + { + await _disposeComplete.Task.ConfigureAwait(false); + return; + } + + foreach (KeyValuePair> limiter in _limiters) + { + await limiter.Value.Value.DisposeAsync().ConfigureAwait(false); + } + _limiters.Clear(); + _disposeComplete.TrySetResult(null); + } + + // This handles the common state changes that Dispose and DisposeAsync need to do, the individual limiters still need to be Disposed after this call + private bool CommonDispose() + { + lock (Lock) + { + if (_disposed) + { + return true; + } + _disposed = true; + _timer.Stop(); + } + return false; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(PartitionedRateLimiter)); + } + } + + private static void Replenish(DefaultPartitionedRateLimiter limiter) + { + lock (limiter.Lock) + { + if (limiter._disposed) + { + return; + } + + // If the cache has been invalidated we need to recreate it + if (limiter._cacheInvalid) + { + limiter._cachedLimiters.Clear(); + bool cacheStillInvalid = false; + foreach (KeyValuePair> kvp in limiter._limiters) + { + if (kvp.Value.IsValueCreated) + { + if (kvp.Value.Value is ReplenishingRateLimiter) + { + limiter._cachedLimiters.Add(kvp.Value); + } + } + else + { + // In rare cases the RateLimiter will be added to the storage but not be initialized yet + // keep cache invalid if there was a non-initialized RateLimiter + // the next time we run the timer the cache will be updated + // with the initialized RateLimiter + cacheStillInvalid = true; + } + } + limiter._cacheInvalid = cacheStillInvalid; + } + } + + // cachedLimiters is safe to use outside the lock because it is only updated by the Timer + // and the Timer avoids re-entrancy issues via the _executingTimer field + foreach (Lazy rateLimiter in limiter._cachedLimiters) + { + Debug.Assert(rateLimiter.IsValueCreated && rateLimiter.Value is ReplenishingRateLimiter); + ((ReplenishingRateLimiter)rateLimiter.Value).TryReplenish(); + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs new file mode 100644 index 0000000000000..bbec73cc98c61 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs @@ -0,0 +1,30 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Type returned by methods to be used by to know what partitions are configured. + /// + /// The type to distinguish partitions with. + public struct RateLimitPartition + { + /// + /// Constructs the for use in . + /// + /// The specific key for this partition. + /// The function called when a rate limiter for the given is needed. + public RateLimitPartition(TKey partitionKey, Func factory) + { + PartitionKey = partitionKey; + Factory = factory; + } + + /// + /// The specific key for this partition. + /// + public TKey PartitionKey { get; } + + internal readonly Func Factory; + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs new file mode 100644 index 0000000000000..134794940cf58 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs @@ -0,0 +1,78 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Contains methods used in to assist in the creation of partitions for your rate limiter. + /// + public static class RateLimitPartition + { + /// + /// Defines a partition with the given rate limiter factory. + /// + /// The type to distinguish partitions with. + /// The specific key for this partition. This will be used to check for an existing cached limiter before calling the . + /// The function called when a rate limiter for the given is needed. This should be a new instance of a rate limiter every time it is called. + /// + public static RateLimitPartition Create( + TKey partitionKey, + Func factory) + { + return new RateLimitPartition(partitionKey, factory); + } + + /// + /// Defines a partition with a with the given . + /// + /// The type to distinguish partitions with. + /// The specific key for this partition. This will be used to check for an existing cached limiter before calling the . + /// The function called when a rate limiter for the given is needed. This can return the same instance of across different calls. + /// + public static RateLimitPartition CreateConcurrencyLimiter( + TKey partitionKey, + Func factory) + { + return Create(partitionKey, key => new ConcurrencyLimiter(factory(key))); + } + + /// + /// Defines a partition that will not have a rate limiter. + /// This means any calls to or will always succeed for the given . + /// + /// The type to distinguish partitions with. + /// The specific key for this partition. + /// + public static RateLimitPartition CreateNoLimiter(TKey partitionKey) + { + return Create(partitionKey, _ => NoopLimiter.Instance); + } + + /// + /// Defines a partition with a with the given . + /// + /// + /// Set to to save an allocation. This method will create a new options type and set to otherwise. + /// + /// The type to distinguish partitions with. + /// The specific key for this partition. + /// The function called when a rate limiter for the given is needed. This can return the same instance of across different calls. + /// + public static RateLimitPartition CreateTokenBucketLimiter( + TKey partitionKey, + Func factory) + { + return Create(partitionKey, key => + { + TokenBucketRateLimiterOptions options = factory(key); + // We don't want individual TokenBucketRateLimiters to have timers. We will instead have our own internal Timer handling all of them + if (options.AutoReplenishment is true) + { + options = new TokenBucketRateLimiterOptions(options.TokenLimit, options.QueueProcessingOrder, options.QueueLimit, + options.ReplenishmentPeriod, options.TokensPerPeriod, autoReplenishment: false); + } + return new TokenBucketRateLimiter(options); + }); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs new file mode 100644 index 0000000000000..df7c57f775782 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -0,0 +1,425 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// implementation that replenishes permit counters periodically instead of via a release mechanism. + /// + public sealed class SlidingWindowRateLimiter : ReplenishingRateLimiter + { + private int _requestCount; + private int _queueCount; + private int[] _requestsPerSegment; + private int _currentSegmentIndex; + private long _lastReplenishmentTick; + private long? _idleSince; + private bool _disposed; + + private readonly Timer? _renewTimer; + private readonly SlidingWindowRateLimiterOptions _options; + private readonly Deque _queue = new Deque(); + + // Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object + private object Lock => _queue; + + private static readonly RateLimitLease SuccessfulLease = new SlidingWindowLease(true, null); + private static readonly RateLimitLease FailedLease = new SlidingWindowLease(false, null); + private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency; + + /// + public override TimeSpan? IdleDuration => _idleSince is null ? null : new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency)); + + /// + public override bool IsAutoReplenishing => _options.AutoReplenishment; + + /// + public override TimeSpan ReplenishmentPeriod => new TimeSpan(_options.Window.Ticks / _options.SegmentsPerWindow); + + /// + /// Initializes the . + /// + /// Options to specify the behavior of the . + public SlidingWindowRateLimiter(SlidingWindowRateLimiterOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + _requestCount = options.PermitLimit; + + // _requestsPerSegment holds the no. of acquired requests in each window segment + _requestsPerSegment = new int[options.SegmentsPerWindow]; + _currentSegmentIndex = 0; + + _idleSince = _lastReplenishmentTick = Stopwatch.GetTimestamp(); + + if (_options.AutoReplenishment) + { + _renewTimer = new Timer(Replenish, this, ReplenishmentPeriod, ReplenishmentPeriod); + } + } + + /// + public override int GetAvailablePermits() => _requestCount; + + /// + protected override RateLimitLease AcquireCore(int requestCount) + { + // These amounts of resources can never be acquired + if (requestCount > _options.PermitLimit) + { + throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit)); + } + + // Return SuccessfulLease or FailedLease depending to indicate limiter state + if (requestCount == 0 && !_disposed) + { + if (_requestCount > 0) + { + return SuccessfulLease; + } + + return FailedLease; + } + + lock (Lock) + { + if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease)) + { + return lease; + } + + // TODO: Acquire additional metadata during a failed lease decision + return FailedLease; + } + } + + /// + protected override ValueTask WaitAsyncCore(int requestCount, CancellationToken cancellationToken = default) + { + // These amounts of resources can never be acquired + if (requestCount > _options.PermitLimit) + { + throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit)); + } + + ThrowIfDisposed(); + + // Return SuccessfulAcquisition if resources are available + if (requestCount == 0 && _requestCount > 0) + { + return new ValueTask(SuccessfulLease); + } + + lock (Lock) + { + if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease)) + { + return new ValueTask(lease); + } + + // Avoid integer overflow by using subtraction instead of addition + Debug.Assert(_options.QueueLimit >= _queueCount); + if (_options.QueueLimit - _queueCount < requestCount) + { + if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && requestCount <= _options.QueueLimit) + { + // Remove oldest items from queue until there is space for the newest acquisition request + do + { + RequestRegistration oldestRequest = _queue.DequeueHead(); + _queueCount -= oldestRequest.Count; + Debug.Assert(_queueCount >= 0); + if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + { + _queueCount += oldestRequest.Count; + } + } + while (_options.QueueLimit - _queueCount < requestCount); + } + else + { + // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst + return new ValueTask(FailedLease); + } + } + + CancelQueueState tcs = new CancelQueueState(requestCount, this, cancellationToken); + CancellationTokenRegistration ctr = default; + if (cancellationToken.CanBeCanceled) + { + ctr = cancellationToken.Register(static obj => + { + ((CancelQueueState)obj!).TrySetCanceled(); + }, tcs); + } + + RequestRegistration registration = new RequestRegistration(requestCount, tcs, ctr); + _queue.EnqueueTail(registration); + _queueCount += requestCount; + Debug.Assert(_queueCount <= _options.QueueLimit); + + return new ValueTask(registration.Tcs.Task); + } + } + + private bool TryLeaseUnsynchronized(int requestCount, [NotNullWhen(true)] out RateLimitLease? lease) + { + ThrowIfDisposed(); + + // if requestCount is 0 we want to queue it if there are no available permits + if (_requestCount >= requestCount && _requestCount != 0) + { + if (requestCount == 0) + { + // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available + lease = SuccessfulLease; + return true; + } + + // a. If there are no items queued we can lease + // b. If there are items queued but the processing order is NewestFirst, then we can lease the incoming request since it is the newest + if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst)) + { + _idleSince = null; + _requestsPerSegment[_currentSegmentIndex] += requestCount; + _requestCount -= requestCount; + Debug.Assert(_requestCount >= 0); + lease = SuccessfulLease; + return true; + } + } + + lease = null; + return false; + } + + /// + /// Attempts to replenish request counters in a window. + /// + /// + /// False if is enabled, otherwise true. + /// Does not reflect if permits were replenished. + /// + public override bool TryReplenish() + { + if (_options.AutoReplenishment) + { + return false; + } + + // Replenish call will slide the window one segment at a time + Replenish(this); + return true; + } + + private static void Replenish(object? state) + { + SlidingWindowRateLimiter limiter = (state as SlidingWindowRateLimiter)!; + Debug.Assert(limiter is not null); + + // Use Stopwatch instead of DateTime.UtcNow to avoid issues on systems where the clock can change + long nowTicks = Stopwatch.GetTimestamp(); + limiter!.ReplenishInternal(nowTicks); + } + + // Used in tests that test behavior with specific time intervals + private void ReplenishInternal(long nowTicks) + { + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes + lock (Lock) + { + if (_disposed) + { + return; + } + + if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < ReplenishmentPeriod.Ticks) + { + return; + } + + _lastReplenishmentTick = nowTicks; + + // Increment the current segment index while move the window + // We need to know the no. of requests that were acquired in a segment previously to ensure that we don't acquire more than the permit limit. + _currentSegmentIndex = (_currentSegmentIndex + 1) % _options.SegmentsPerWindow; + int oldSegmentRequestCount = _requestsPerSegment[_currentSegmentIndex]; + _requestsPerSegment[_currentSegmentIndex] = 0; + + if (oldSegmentRequestCount == 0) + { + return; + } + + _requestCount += oldSegmentRequestCount; + Debug.Assert(_requestCount <= _options.PermitLimit); + + // Process queued requests + while (_queue.Count > 0) + { + RequestRegistration nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.PeekHead() + : _queue.PeekTail(); + + // If we have enough permits after replenishing to serve the queued requests + if (_requestCount >= nextPendingRequest.Count) + { + // Request can be fulfilled + nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + + _queueCount -= nextPendingRequest.Count; + _requestCount -= nextPendingRequest.Count; + _requestsPerSegment[_currentSegmentIndex] += nextPendingRequest.Count; + Debug.Assert(_requestCount >= 0); + + if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + { + // Queued item was canceled so add count back + _requestCount += nextPendingRequest.Count; + _requestsPerSegment[_currentSegmentIndex] -= nextPendingRequest.Count; + // Updating queue count is handled by the cancellation code + _queueCount += nextPendingRequest.Count; + } + nextPendingRequest.CancellationTokenRegistration.Dispose(); + Debug.Assert(_queueCount >= 0); + } + else + { + // Request cannot be fulfilled + break; + } + } + + if (_requestCount == _options.PermitLimit) + { + Debug.Assert(_idleSince is null); + Debug.Assert(_queueCount == 0); + _idleSince = Stopwatch.GetTimestamp(); + } + } + } + + protected override void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + lock (Lock) + { + if (_disposed) + { + return; + } + _disposed = true; + _renewTimer?.Dispose(); + while (_queue.Count > 0) + { + RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + next.CancellationTokenRegistration.Dispose(); + next.Tcs.TrySetResult(FailedLease); + } + } + } + + protected override ValueTask DisposeAsyncCore() + { + Dispose(true); + + return default; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(SlidingWindowRateLimiter)); + } + } + + private sealed class SlidingWindowLease : RateLimitLease + { + private static readonly string[] s_allMetadataNames = new[] { MetadataName.RetryAfter.Name }; + + private readonly TimeSpan? _retryAfter; + + public SlidingWindowLease(bool isAcquired, TimeSpan? retryAfter) + { + IsAcquired = isAcquired; + _retryAfter = retryAfter; + } + + public override bool IsAcquired { get; } + + public override IEnumerable MetadataNames => s_allMetadataNames; + + public override bool TryGetMetadata(string metadataName, out object? metadata) + { + if (metadataName == MetadataName.RetryAfter.Name && _retryAfter.HasValue) + { + metadata = _retryAfter.Value; + return true; + } + + metadata = default; + return false; + } + } + + private readonly struct RequestRegistration + { + public RequestRegistration(int requestCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + { + Count = requestCount; + // Use VoidAsyncOperationWithData instead + Tcs = tcs; + CancellationTokenRegistration = cancellationTokenRegistration; + } + + public int Count { get; } + + public TaskCompletionSource Tcs { get; } + + public CancellationTokenRegistration CancellationTokenRegistration { get; } + } + + private sealed class CancelQueueState : TaskCompletionSource + { + private readonly int _requestCount; + private readonly SlidingWindowRateLimiter _limiter; + private readonly CancellationToken _cancellationToken; + + public CancelQueueState(int requestCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(TaskCreationOptions.RunContinuationsAsynchronously) + { + _requestCount = requestCount; + _limiter = limiter; + _cancellationToken = cancellationToken; + } + + public new bool TrySetCanceled() + { + if (TrySetCanceled(_cancellationToken)) + { + lock (_limiter.Lock) + { + _limiter._queueCount -= _requestCount; + } + return true; + } + return false; + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs new file mode 100644 index 0000000000000..159b4338f07d2 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs @@ -0,0 +1,88 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Options to specify the behavior of a . + /// + public sealed class SlidingWindowRateLimiterOptions + { + /// + /// Initializes the . + /// + /// Maximum number of request counters that can be served in a window. + /// + /// Maximum number of unprocessed request counters waiting via . + /// + /// Specifies how often requests can be replenished. Replenishing is triggered either by an internal timer if is true, or by calling . + /// + /// Specified how many segments a window can be divided into. The total requests a segment can serve cannot exceed the max limit.. + /// + /// Specifies whether request replenishment will be handled by the or by another party via . + /// + /// When , , or are less than 0. + public SlidingWindowRateLimiterOptions( + int permitLimit, + QueueProcessingOrder queueProcessingOrder, + int queueLimit, + TimeSpan window, + int segmentsPerWindow, + bool autoReplenishment = true) + { + if (permitLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(permitLimit)); + } + if (queueLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(queueLimit)); + } + if (segmentsPerWindow <= 0) + { + throw new ArgumentOutOfRangeException(nameof(segmentsPerWindow)); + } + + PermitLimit = permitLimit; + QueueProcessingOrder = queueProcessingOrder; + QueueLimit = queueLimit; + Window = window; + SegmentsPerWindow = segmentsPerWindow; + AutoReplenishment = autoReplenishment; + } + + /// + /// Specifies the minimum period between replenishments. + /// + public TimeSpan Window { get; } + + /// + /// Specifies the maximum number of segments a window is divided into. + /// + public int SegmentsPerWindow { get; } + + /// + /// Specified whether the is automatically replenishing request counters or if someone else + /// will be calling to replenish tokens. + /// + public bool AutoReplenishment { get; } + + /// + /// Maximum number of requests that can be served in a window. + /// + public int PermitLimit { get; } + + /// + /// Determines the behaviour of when not enough resources can be leased. + /// + /// + /// by default. + /// + public QueueProcessingOrder QueueProcessingOrder { get; } + + /// + /// Maximum cumulative permit count of queued acquisition requests. + /// + public int QueueLimit { get; } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs new file mode 100644 index 0000000000000..0b680201cc88c --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs @@ -0,0 +1,136 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + internal sealed class TimerAwaitable : IDisposable, ICriticalNotifyCompletion + { + private Timer? _timer; + private Action? _callback; + private static readonly Action _callbackCompleted = () => { }; + + private readonly TimeSpan _period; + + private readonly TimeSpan _dueTime; + private readonly object _lockObj = new object(); + private bool _disposed; + private bool _running = true; + + public TimerAwaitable(TimeSpan dueTime, TimeSpan period) + { + _dueTime = dueTime; + _period = period; + } + + public void Start() + { + if (_timer == null) + { + lock (_lockObj) + { + if (_disposed) + { + return; + } + + if (_timer == null) + { + // Don't capture the current ExecutionContext and its AsyncLocals onto the timer + bool restoreFlow = false; + try + { + if (!ExecutionContext.IsFlowSuppressed()) + { + ExecutionContext.SuppressFlow(); + restoreFlow = true; + } + + _timer = new Timer(static state => + { + var thisRef = (TimerAwaitable)state!; + thisRef.Tick(); + }, + state: this, + dueTime: _dueTime, + period: _period); + } + finally + { + // Restore the current ExecutionContext + if (restoreFlow) + { + ExecutionContext.RestoreFlow(); + } + } + } + } + } + } + + public TimerAwaitable GetAwaiter() => this; + public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); + + public bool GetResult() + { + _callback = null; + + return _running; + } + + private void Tick() + { + var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); + continuation?.Invoke(); + } + + public void OnCompleted(Action continuation) + { + if (ReferenceEquals(_callback, _callbackCompleted) || + ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) + { + Task.Run(continuation); + } + } + + public void UnsafeOnCompleted(Action continuation) + { + OnCompleted(continuation); + } + + public void Stop() + { + lock (_lockObj) + { + // Stop should be used to trigger the call to end the loop which disposes + if (_disposed) + { + throw new ObjectDisposedException(GetType().FullName); + } + + _running = false; + } + + // Call tick here to make sure that we yield the callback, + // if it's currently waiting, we don't need to wait for the next period + Tick(); + } + + public void Dispose() + { + lock (_lockObj) + { + _disposed = true; + + _timer?.Dispose(); + + _timer = null; + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index 1dfbe7fde9eb9..6eace9e71f7e2 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -126,7 +126,11 @@ protected override ValueTask WaitAsyncCore(int tokenCount, Cance RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - oldestRequest.Tcs.TrySetResult(FailedLease); + if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + { + // Updating queue count is handled by the cancellation code + _queueCount += oldestRequest.Count; + } } while (_options.QueueLimit - _queueCount < tokenCount); } @@ -328,7 +332,7 @@ protected override void Dispose(bool disposing) ? _queue.DequeueHead() : _queue.DequeueTail(); next.CancellationTokenRegistration.Dispose(); - next.Tcs.SetResult(FailedLease); + next.Tcs.TrySetResult(FailedLease); } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs index c0212c72f3c00..60947795900d0 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs @@ -80,6 +80,12 @@ public abstract class BaseRateLimiterTests [Fact] public abstract Task CanCancelWaitAsyncBeforeQueuing(); + [Fact] + public abstract Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest(); + + [Fact] + public abstract Task CanDisposeAfterCancelingQueuedRequest(); + [Fact] public abstract Task CancelUpdatesQueueLimit(); diff --git a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs index 6d83445285122..6131c17acabf2 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs @@ -449,6 +449,55 @@ public override async Task CancelUpdatesQueueLimit() Assert.True(lease.IsAcquired); } + [Fact] + public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + // Add another item to queue, will be completed as failed later when we queue another item + var wait2 = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + var wait3 = limiter.WaitAsync(2); + Assert.False(wait3.IsCompleted); + + // will be kicked by wait3 because we're using NewestFirst + var lease2 = await wait2; + Assert.False(lease2.IsAcquired); + + lease.Dispose(); + + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanDisposeAfterCancelingQueuedRequest() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item + limiter.Dispose(); + } + [Fact] public override void NoMetadataOnAcquiredLease() { diff --git a/src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs new file mode 100644 index 0000000000000..2b3f3e992652c --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs @@ -0,0 +1,746 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Test +{ + public class FixedWindowRateLimiterTests : BaseRateLimiterTests + { + [Fact] + public override void CanAcquireResource() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(); + + Assert.True(lease.IsAcquired); + Assert.False(limiter.Acquire().IsAcquired); + + lease.Dispose(); + Assert.False(limiter.Acquire().IsAcquired); + Assert.True(limiter.TryReplenish()); + + Assert.True(limiter.Acquire().IsAcquired); + } + + [Fact] + public override void InvalidOptionsThrows() + { + Assert.Throws( + () => new FixedWindowRateLimiterOptions(-1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), autoReplenishment: false)); + Assert.Throws( + () => new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, -1, TimeSpan.FromMinutes(2), autoReplenishment: false)); + } + + [Fact] + public override async Task CanAcquireResourceAsync() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + + using var lease = await limiter.WaitAsync(); + + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(); + Assert.False(wait.IsCompleted); + + Assert.True(limiter.TryReplenish()); + + Assert.True((await wait).IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsOldest() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.Zero, autoReplenishment: false)); + var lease = await limiter.WaitAsync(); + + Assert.True(lease.IsAcquired); + var wait1 = limiter.WaitAsync(); + var wait2 = limiter.WaitAsync(); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + lease = await wait1; + Assert.True(lease.IsAcquired); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.Equal(0, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3, + TimeSpan.FromMinutes(0), autoReplenishment: false)); + + var lease = await limiter.WaitAsync(2); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + // second queued item completes first with NewestFirst + lease = await wait2; + Assert.True(lease.IsAcquired); + Assert.False(wait1.IsCompleted); + + lease.Dispose(); + Assert.Equal(1, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + + lease = await wait1; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + + var failedLease = await limiter.WaitAsync(1); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var timeSpan)); + Assert.Equal(TimeSpan.Zero, timeSpan); + } + + [Fact] + public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + Assert.False(wait2.IsCompleted); + + var wait3 = limiter.WaitAsync(2); + var lease1 = await wait; + var lease2 = await wait2; + Assert.False(lease1.IsAcquired); + Assert.False(lease2.IsAcquired); + Assert.False(wait3.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var lease1 = await limiter.WaitAsync(2); + Assert.False(lease1.IsAcquired); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + + var failedLease = await limiter.WaitAsync(1); + Assert.False(failedLease.IsAcquired); + + limiter.TryReplenish(); + lease = await wait; + Assert.True(lease.IsAcquired); + + wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(int.MaxValue); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(3); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(int.MaxValue); + Assert.False(wait2.IsCompleted); + + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + + limiter.TryReplenish(); + var lease2 = await wait2; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override void ThrowsWhenAcquiringMoreThanLimit() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + Assert.Throws(() => limiter.Acquire(2)); + } + + [Fact] + public override async Task ThrowsWhenWaitingForMoreThanLimit() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + await Assert.ThrowsAsync(async () => await limiter.WaitAsync(2)); + } + + [Fact] + public override void ThrowsWhenAcquiringLessThanZero() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + Assert.Throws(() => limiter.Acquire(-1)); + } + + [Fact] + public override async Task ThrowsWhenWaitingForLessThanZero() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + await Assert.ThrowsAsync(async () => await limiter.WaitAsync(-1)); + } + + [Fact] + public override void AcquireZero_WithAvailability() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + + using var lease = limiter.Acquire(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void AcquireZero_WithoutAvailability() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var lease2 = limiter.Acquire(0); + Assert.False(lease2.IsAcquired); + lease2.Dispose(); + } + + [Fact] + public override async Task WaitAsyncZero_WithAvailability() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + + using var lease = await limiter.WaitAsync(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task WaitAsyncZero_WithoutAvailabilityWaitsForAvailability() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(0); + Assert.False(wait.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + using var lease2 = await wait; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanDequeueMultipleResourcesAtOnce() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.Zero, autoReplenishment: false)); + using var lease = await limiter.WaitAsync(2); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + var lease1 = await wait1; + var lease2 = await wait2; + Assert.True(lease1.IsAcquired); + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanCancelWaitAsyncAfterQueuing() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + Assert.Equal(1, limiter.GetAvailablePermits()); + } + + [Fact] + public override async Task CanCancelWaitAsyncBeforeQueuing() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + var ex = await Assert.ThrowsAsync(() => limiter.WaitAsync(1, cts.Token).AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + Assert.Equal(1, limiter.GetAvailablePermits()); + } + + [Fact] + public override async Task CancelUpdatesQueueLimit() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void NoMetadataOnAcquiredLease() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.False(lease.TryGetMetadata(MetadataName.RetryAfter, out _)); + } + + [Fact] + public override void MetadataNamesContainsAllMetadata() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.RetryAfter.Name)); + } + + [Fact] + public override async Task DisposeReleasesQueuedAcquires() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + limiter.Dispose(); + + lease = await wait1; + Assert.False(lease.IsAcquired); + lease = await wait2; + Assert.False(lease.IsAcquired); + lease = await wait3; + Assert.False(lease.IsAcquired); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public override async Task DisposeAsyncReleasesQueuedAcquires() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + await limiter.DisposeAsync(); + + lease = await wait1; + Assert.False(lease.IsAcquired); + lease = await wait2; + Assert.False(lease.IsAcquired); + lease = await wait3; + Assert.False(lease.IsAcquired); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public async Task RetryMetadataOnFailedWaitAsync() + { + var options = new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), autoReplenishment: false); + var limiter = new FixedWindowRateLimiter(options); + + using var lease = limiter.Acquire(2); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter.Name, out var metadata)); + var metaDataTime = Assert.IsType(metadata); + Assert.Equal(options.Window.Ticks, metaDataTime.Ticks); + + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.Window.Ticks, typedMetadata.Ticks); + Assert.Collection(failedLease.MetadataNames, item => item.Equals(MetadataName.RetryAfter.Name)); + } + + [Fact] + public async Task CorrectRetryMetadataWithQueuedItem() + { + var options = new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), autoReplenishment: false); + var limiter = new FixedWindowRateLimiter(options); + + using var lease = limiter.Acquire(2); + // Queue item which changes the retry after time for failed items + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.Window.Ticks, typedMetadata.Ticks); + } + + + [Fact] + public async Task CorrectRetryMetadataWithNonZeroAvailableItems() + { + var options = new FixedWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), autoReplenishment: false); + var limiter = new FixedWindowRateLimiter(options); + + using var lease = limiter.Acquire(2); + + var failedLease = await limiter.WaitAsync(3); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.Window.Ticks, typedMetadata.Ticks); + } + + [Fact] + public void TryReplenishWithAutoReplenish_ReturnsFalse() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(1), autoReplenishment: true)); + Assert.Equal(2, limiter.GetAvailablePermits()); + Assert.False(limiter.TryReplenish()); + Assert.Equal(2, limiter.GetAvailablePermits()); + } + + [Fact] + public async Task AutoReplenish_ReplenishesCounters() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromMilliseconds(1000), autoReplenishment: true)); + Assert.Equal(2, limiter.GetAvailablePermits()); + limiter.Acquire(2); + + var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithWaitAsyncWithQueuedItemsIfNewestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + Assert.Equal(1, limiter.GetAvailablePermits()); + lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithWaitAsyncWithQueuedItemsIfOldestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3, + TimeSpan.Zero, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithAcquireWithQueuedItemsIfOldestFirst() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + lease = limiter.Acquire(1); + Assert.False(lease.IsAcquired); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void NullIdleDurationWhenActive() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.FromMilliseconds(2), autoReplenishment: false)); + limiter.Acquire(1); + Assert.Null(limiter.IdleDuration); + } + + [Fact] + public override async Task IdleDurationUpdatesWhenIdle() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.FromMilliseconds(2), autoReplenishment: false)); + Assert.NotNull(limiter.IdleDuration); + var previousDuration = limiter.IdleDuration; + await Task.Delay(15); + Assert.True(previousDuration < limiter.IdleDuration); + } + + [Fact] + public override void IdleDurationUpdatesWhenChangingFromActive() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.Zero, autoReplenishment: false)); + limiter.Acquire(1); + limiter.TryReplenish(); + Assert.NotNull(limiter.IdleDuration); + } + + [Fact] + public void ReplenishingRateLimiterPropertiesHaveCorrectValues() + { + var replenishPeriod = TimeSpan.FromMinutes(1); + using ReplenishingRateLimiter limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + replenishPeriod, autoReplenishment: true)); + Assert.True(limiter.IsAutoReplenishing); + Assert.Equal(replenishPeriod, limiter.ReplenishmentPeriod); + + replenishPeriod = TimeSpan.FromSeconds(2); + using ReplenishingRateLimiter limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + replenishPeriod, autoReplenishment: false)); + Assert.False(limiter2.IsAutoReplenishing); + Assert.Equal(replenishPeriod, limiter2.ReplenishmentPeriod); + } + + [Fact] + public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + // Add another item to queue, will be completed as failed later when we queue another item + var wait2 = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + lease.Dispose(); + + var wait3 = limiter.WaitAsync(2); + Assert.False(wait3.IsCompleted); + + // will be kicked by wait3 because we're using NewestFirst + lease = await wait2; + Assert.False(lease.IsAcquired); + + limiter.TryReplenish(); + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanDisposeAfterCancelingQueuedRequest() + { + var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item + limiter.Dispose(); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs index a47c078d96ef9..47ccdc5c376c5 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs @@ -1,6 +1,8 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Xunit; @@ -30,11 +32,473 @@ await Assert.ThrowsAsync( async () => await limiter.WaitAsync(string.Empty, 1, new CancellationToken(true))); } - internal class NotImplementedPartitionedRateLimiter : PartitionedRateLimiter + // Create + + [Fact] + public void Create_AcquireCallsUnderlyingPartitionsLimiter() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + limiter.Acquire(""); + Assert.Equal(1, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + } + + [Fact] + public async Task Create_WaitAsyncCallsUnderlyingPartitionsLimiter() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + await limiter.WaitAsync(""); + Assert.Equal(1, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount); + } + + [Fact] + public void Create_GetAvailablePermitsCallsUnderlyingPartitionsLimiter() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + limiter.GetAvailablePermits(""); + Assert.Equal(1, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.GetAvailablePermitsCallCount); + } + + [Fact] + public async Task Create_PartitionIsCached() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + limiter.Acquire(""); + await limiter.WaitAsync(""); + limiter.Acquire(""); + await limiter.WaitAsync(""); + Assert.Equal(1, limiterFactory.Limiters.Count); + Assert.Equal(2, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + Assert.Equal(2, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount); + } + + [Fact] + public void Create_MultiplePartitionsWork() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + } + else + { + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + } + }); + + limiter.Acquire("1"); + limiter.Acquire("2"); + limiter.Acquire("1"); + limiter.Acquire("2"); + + Assert.Equal(2, limiterFactory.Limiters.Count); + + Assert.Equal(2, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + + Assert.Equal(2, limiterFactory.Limiters[1].Limiter.AcquireCallCount); + Assert.Equal(2, limiterFactory.Limiters[1].Key); + } + + [Fact] + public async Task Create_BlockingWaitDoesNotBlockOtherPartitions() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.CreateConcurrencyLimiter(2, + _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2)); + }); + + var lease = await limiter.WaitAsync("2"); + var wait = limiter.WaitAsync("2"); + Assert.False(wait.IsCompleted); + + // Different partition, should not be blocked by the wait in the other partition + await limiter.WaitAsync("1"); + + lease.Dispose(); + await wait; + + Assert.Equal(1, limiterFactory.Limiters.Count); + Assert.Equal(0, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount); + } + + // Uses Task.Wait in a Task.Run to purposefully test a blocking scenario, this doesn't work on WASM currently + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + public async Task Create_BlockingFactoryDoesNotBlockOtherPartitions() + { + var limiterFactory = new TrackingRateLimiterFactory(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var startedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var limiter = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => + { + startedTcs.SetResult(null); + // block the factory method + Assert.True(tcs.Task.Wait(TimeSpan.FromSeconds(10))); + return limiterFactory.GetLimiter(key); + }); + } + return RateLimitPartition.Create(2, + key => limiterFactory.GetLimiter(key)); + }); + + var lease = await limiter.WaitAsync("2"); + + var blockedTask = Task.Run(async () => + { + await limiter.WaitAsync("1"); + }); + await startedTcs.Task; + + // Other partitions aren't blocked + await limiter.WaitAsync("2"); + + // Try to acquire from the blocking limiter, this should wait until the blocking limiter has been resolved and not create a new one + var blockedTask2 = Task.Run(async () => + { + await limiter.WaitAsync("1"); + }); + + // unblock limiter factory + tcs.SetResult(null); + await blockedTask; + await blockedTask2; + + // Only 2 limiters should have been created + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(2, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount); + Assert.Equal(2, limiterFactory.Limiters[1].Limiter.WaitAsyncCallCount); + } + + [Fact] + public void Create_PassedInEqualityComparerIsUsed() + { + var limiterFactory = new TrackingRateLimiterFactory(); + var equality = new TestEquality(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + }, equality); + + limiter.Acquire("1"); + // GetHashCode to add item to dictionary (skips TryGet for empty dictionary) + Assert.Equal(0, equality.EqualsCallCount); + Assert.Equal(1, equality.GetHashCodeCallCount); + limiter.Acquire("1"); + // GetHashCode and Equal from TryGet to see if item is in dictionary + Assert.Equal(1, equality.EqualsCallCount); + Assert.Equal(2, equality.GetHashCodeCallCount); + limiter.Acquire("2"); + // GetHashCode from TryGet (fails check) and second GetHashCode to add item to dictionary + Assert.Equal(1, equality.EqualsCallCount); + Assert.Equal(4, equality.GetHashCodeCallCount); + + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(2, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.AcquireCallCount); + } + + [Fact] + public void Create_DisposeWithoutLimitersNoops() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + limiter.Dispose(); + + Assert.Equal(0, limiterFactory.Limiters.Count); + } + + [Fact] + public void Create_DisposeDisposesAllLimiters() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + }); + + limiter.Acquire("1"); + limiter.Acquire("2"); + + limiter.Dispose(); + + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.DisposeCallCount); + + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.AcquireCallCount); + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeCallCount); + } + + [Fact] + public void Create_DisposeThrowsForFutureMethodCalls() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + limiter.Dispose(); + + Assert.Throws(() => limiter.Acquire("1")); + + Assert.Equal(0, limiterFactory.Limiters.Count); + } + + [Fact] + public async Task Create_DisposeAsyncDisposesAllLimiters() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + }); + + limiter.Acquire("1"); + limiter.Acquire("2"); + + await limiter.DisposeAsync(); + + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.DisposeCallCount); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.DisposeAsyncCallCount); + + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.AcquireCallCount); + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeCallCount); + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeAsyncCallCount); + } + + [Fact] + public async Task Create_WithTokenBucketReplenishesAutomatically() + { + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateTokenBucketLimiter(1, + _ => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false)); + }); + + var lease = limiter.Acquire(""); + Assert.True(lease.IsAcquired); + + lease = await limiter.WaitAsync(""); + Assert.True(lease.IsAcquired); + } + + [Fact] + public async Task Create_WithReplenishingLimiterReplenishesAutomatically() + { + using var limiter = PartitionedRateLimiter.Create(resource => + { + // Use the non-specific Create method to make sure ReplenishingRateLimiters are still handled properly + return RateLimitPartition.Create(1, + _ => new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false))); + }); + + var lease = limiter.Acquire(""); + Assert.True(lease.IsAcquired); + + lease = await limiter.WaitAsync(""); + Assert.True(lease.IsAcquired); + } + + [Fact] + public async Task Create_MultipleReplenishingLimitersReplenishAutomatically() + { + using var limiter = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.CreateTokenBucketLimiter(1, + _ => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false)); + } + return RateLimitPartition.CreateTokenBucketLimiter(2, + _ => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false)); + }); + + var lease = limiter.Acquire("1"); + Assert.True(lease.IsAcquired); + + lease = await limiter.WaitAsync("1"); + Assert.True(lease.IsAcquired); + + // Creates the second Replenishing limiter + // Indirectly tests that the cached list of limiters used by the timer is probably updated by making sure a limiter already made use of it before we create a second replenishing one + lease = limiter.Acquire("2"); + Assert.True(lease.IsAcquired); + + lease = await limiter.WaitAsync("1"); + Assert.True(lease.IsAcquired); + lease = await limiter.WaitAsync("2"); + Assert.True(lease.IsAcquired); + } + + [Fact] + public async Task Create_CancellationTokenPassedToUnderlyingLimiter() + { + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, + _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + }); + + var lease = limiter.Acquire(""); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var waitTask = limiter.WaitAsync("", 1, cts.Token); + Assert.False(waitTask.IsCompleted); + cts.Cancel(); + await Assert.ThrowsAsync(async () => await waitTask); + } + + internal sealed class NotImplementedPartitionedRateLimiter : PartitionedRateLimiter { public override int GetAvailablePermits(T resourceID) => throw new NotImplementedException(); protected override RateLimitLease AcquireCore(T resourceID, int permitCount) => throw new NotImplementedException(); protected override ValueTask WaitAsyncCore(T resourceID, int permitCount, CancellationToken cancellationToken) => throw new NotImplementedException(); } + + internal sealed class TrackingRateLimiter : RateLimiter + { + private int _getAvailablePermitsCallCount; + private int _acquireCallCount; + private int _waitAsyncCallCount; + private int _disposeCallCount; + private int _disposeAsyncCallCount; + + public int GetAvailablePermitsCallCount => _getAvailablePermitsCallCount; + public int AcquireCallCount => _acquireCallCount; + public int WaitAsyncCallCount => _waitAsyncCallCount; + public int DisposeCallCount => _disposeCallCount; + public int DisposeAsyncCallCount => _disposeAsyncCallCount; + + public override TimeSpan? IdleDuration => throw new NotImplementedException(); + + public override int GetAvailablePermits() + { + Interlocked.Increment(ref _getAvailablePermitsCallCount); + return 1; + } + + protected override RateLimitLease AcquireCore(int permitCount) + { + Interlocked.Increment(ref _acquireCallCount); + return new Lease(); + } + + protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) + { + Interlocked.Increment(ref _waitAsyncCallCount); + return new ValueTask(new Lease()); + } + + protected override void Dispose(bool disposing) + { + Interlocked.Increment(ref _disposeCallCount); + } + + protected override ValueTask DisposeAsyncCore() + { + Interlocked.Increment(ref _disposeAsyncCallCount); + return new ValueTask(); + } + + private sealed class Lease : RateLimitLease + { + public override bool IsAcquired => throw new NotImplementedException(); + + public override IEnumerable MetadataNames => throw new NotImplementedException(); + + public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); + } + } + + internal sealed class TrackingRateLimiterFactory + { + public List<(TKey Key, TrackingRateLimiter Limiter)> Limiters { get; } = new(); + + public RateLimiter GetLimiter(TKey key) + { + TrackingRateLimiter limiter; + lock (Limiters) + { + limiter = new TrackingRateLimiter(); + Limiters.Add((key, limiter)); + } + return limiter; + } + } + + internal sealed class TestEquality : IEqualityComparer + { + private int _equalsCallCount; + private int _getHashCodeCallCount; + + public int EqualsCallCount => _equalsCallCount; + public int GetHashCodeCallCount => _getHashCodeCallCount; + + public bool Equals(int x, int y) + { + Interlocked.Increment(ref _equalsCallCount); + return x == y; + } + public int GetHashCode([DisallowNull] int obj) + { + Interlocked.Increment(ref _getHashCodeCallCount); + return obj.GetHashCode(); + } + } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs b/src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs new file mode 100644 index 0000000000000..7e2bfe1728570 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs @@ -0,0 +1,83 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Tests +{ + public class RateLimiterPartitionTests + { + [Fact] + public void Create_Concurrency() + { + var options = new ConcurrencyLimiterOptions(10, QueueProcessingOrder.OldestFirst, 10); + var partition = RateLimitPartition.CreateConcurrencyLimiter(1, key => options); + + var factoryProperty = typeof(RateLimitPartition).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!; + var factory = (Func)factoryProperty.GetValue(partition); + var limiter = factory(1); + var concurrencyLimiter = Assert.IsType(limiter); + Assert.Equal(options.PermitLimit, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public void Create_TokenBucket() + { + var options = new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 10, TimeSpan.FromMinutes(1), 1, true); + var partition = RateLimitPartition.CreateTokenBucketLimiter(1, key => options); + + var factoryProperty = typeof(RateLimitPartition).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!; + var factory = (Func)factoryProperty.GetValue(partition); + var limiter = factory(1); + var tokenBucketLimiter = Assert.IsType(limiter); + Assert.Equal(options.TokenLimit, tokenBucketLimiter.GetAvailablePermits()); + // TODO: Check other properties when ReplenshingRateLimiter is merged + // TODO: Check that autoReplenishment: true got changed to false + } + + [Fact] + public async Task Create_NoLimiter() + { + var partition = RateLimitPartition.CreateNoLimiter(1); + + var factoryProperty = typeof(RateLimitPartition).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!; + var factory = (Func)factoryProperty.GetValue(partition); + var limiter = factory(1); + + // How do we test an internal implementation of a limiter that doesn't limit? Just try some stuff that normal limiters would probably block on and see if it works. + var available = limiter.GetAvailablePermits(); + var lease = limiter.Acquire(int.MaxValue); + Assert.True(lease.IsAcquired); + Assert.Equal(available, limiter.GetAvailablePermits()); + + lease = limiter.Acquire(int.MaxValue); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(int.MaxValue); + Assert.True(wait.IsCompletedSuccessfully); + lease = await wait; + Assert.True(lease.IsAcquired); + + lease.Dispose(); + } + + [Fact] + public void Create_AnyLimiter() + { + var partition = RateLimitPartition.Create(1, key => new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 10))); + + var factoryProperty = typeof(RateLimitPartition).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!; + var factory = (Func)factoryProperty.GetValue(partition); + var limiter = factory(1); + var concurrencyLimiter = Assert.IsType(limiter); + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + + var partition2 = RateLimitPartition.Create(1, key => new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 10, TimeSpan.FromMilliseconds(100), 1, autoReplenishment: false))); + factory = (Func)factoryProperty.GetValue(partition2); + limiter = factory(1); + var tokenBucketLimiter = Assert.IsType(limiter); + Assert.Equal(1, tokenBucketLimiter.GetAvailablePermits()); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs new file mode 100644 index 0000000000000..46cf35c5a85ec --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs @@ -0,0 +1,770 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Test +{ + public class SlidingWindowRateLimiterTests : BaseRateLimiterTests + { + [Fact] + public override void CanAcquireResource() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(); + + Assert.True(lease.IsAcquired); + Assert.False(limiter.Acquire().IsAcquired); + + lease.Dispose(); + Assert.False(limiter.Acquire().IsAcquired); + Assert.True(limiter.TryReplenish()); + Assert.True(limiter.TryReplenish()); + + Assert.True(limiter.Acquire().IsAcquired); + } + + [Fact] + public override void InvalidOptionsThrows() + { + Assert.Throws( + () => new SlidingWindowRateLimiterOptions(-1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), 1, autoReplenishment: false)); + Assert.Throws( + () => new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, -1, TimeSpan.FromMinutes(2), 1, autoReplenishment: false)); + Assert.Throws( + () => new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), -1, autoReplenishment: false)); + } + + [Fact] + public override async Task CanAcquireResourceAsync() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 4, + TimeSpan.Zero, 2, autoReplenishment: false)); + + using var lease = await limiter.WaitAsync(); + + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + Assert.True(limiter.TryReplenish()); + + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(2); + Assert.False(wait2.IsCompleted); + + Assert.True(limiter.TryReplenish()); + + Assert.True((await wait2).IsAcquired); + } + + [Fact] + public async Task CanAcquireMultipleRequestsAsync() + { + // This test verifies the following behavior + // 1. when we have available permits after replenish to serve the queued requests + // 2. when the oldest item from queue is remove to accomodate new requests (QueueProcessingOrder: NewestFirst) + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(4, QueueProcessingOrder.NewestFirst, 4, + TimeSpan.Zero, 3, autoReplenishment: false)); + + using var lease = await limiter.WaitAsync(2); + + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(3); + Assert.False(wait.IsCompleted); + + Assert.True(limiter.TryReplenish()); + + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(2); + Assert.True(wait2.IsCompleted); + + Assert.True(limiter.TryReplenish()); + + var wait3 = limiter.WaitAsync(2); + Assert.False(wait3.IsCompleted); + + Assert.True(limiter.TryReplenish()); + Assert.True((await wait3).IsAcquired); + + Assert.False((await wait).IsAcquired); + Assert.Equal(0, limiter.GetAvailablePermits()); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsOldest() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.FromMinutes(0), 2, autoReplenishment: false)); + var lease = await limiter.WaitAsync(2); + + Assert.True(lease.IsAcquired); + var wait1 = limiter.WaitAsync(); + var wait2 = limiter.WaitAsync(2); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + Assert.False(wait1.IsCompleted); + Assert.True(limiter.TryReplenish()); + + lease = await wait1; + Assert.True(lease.IsAcquired); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.Equal(1, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + Assert.True(limiter.TryReplenish()); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3, + TimeSpan.FromMinutes(0), 2, autoReplenishment: false)); + + var lease = await limiter.WaitAsync(2); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + Assert.False(wait2.IsCompleted); + + Assert.True(limiter.TryReplenish()); + // second queued item completes first with NewestFirst + lease = await wait2; + Assert.True(lease.IsAcquired); + Assert.False(wait1.IsCompleted); + + lease.Dispose(); + Assert.Equal(1, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + Assert.True(limiter.TryReplenish()); + + lease = await wait1; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + + var failedLease = await limiter.WaitAsync(1); + Assert.False(failedLease.IsAcquired); + } + + [Fact] + public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + Assert.False(wait2.IsCompleted); + + var wait3 = limiter.WaitAsync(2); + var lease1 = await wait; + var lease2 = await wait2; + Assert.False(lease1.IsAcquired); + Assert.False(lease2.IsAcquired); + Assert.False(wait3.IsCompleted); + + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var lease1 = await limiter.WaitAsync(2); + Assert.False(lease1.IsAcquired); + + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.Zero, 3, autoReplenishment: false)); + var lease = limiter.Acquire(2); + var wait = limiter.WaitAsync(2); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + + limiter.TryReplenish(); + limiter.TryReplenish(); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + lease = await wait; + Assert.True(lease.IsAcquired); + + wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(int.MaxValue); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(3); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(int.MaxValue); + Assert.False(wait2.IsCompleted); + + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + + limiter.TryReplenish(); + limiter.TryReplenish(); + var lease2 = await wait2; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override void ThrowsWhenAcquiringMoreThanLimit() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + Assert.Throws(() => limiter.Acquire(2)); + } + + [Fact] + public override async Task ThrowsWhenWaitingForMoreThanLimit() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + await Assert.ThrowsAsync(async () => await limiter.WaitAsync(2)); + } + + [Fact] + public override void ThrowsWhenAcquiringLessThanZero() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + Assert.Throws(() => limiter.Acquire(-1)); + } + + [Fact] + public override async Task ThrowsWhenWaitingForLessThanZero() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + await Assert.ThrowsAsync(async () => await limiter.WaitAsync(-1)); + } + + [Fact] + public override void AcquireZero_WithAvailability() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + + using var lease = limiter.Acquire(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void AcquireZero_WithoutAvailability() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var lease2 = limiter.Acquire(0); + Assert.False(lease2.IsAcquired); + lease2.Dispose(); + } + + [Fact] + public override async Task WaitAsyncZero_WithAvailability() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + + using var lease = await limiter.WaitAsync(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task WaitAsyncZero_WithoutAvailabilityWaitsForAvailability() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(0); + Assert.False(wait.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + Assert.True(limiter.TryReplenish()); + using var lease2 = await wait; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanDequeueMultipleResourcesAtOnce() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 4, + TimeSpan.Zero, 2, autoReplenishment: false)); + using var lease = await limiter.WaitAsync(2); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + Assert.True(limiter.TryReplenish()); + + var lease1 = await wait1; + var lease2 = await wait2; + Assert.True(lease1.IsAcquired); + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanCancelWaitAsyncAfterQueuing() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + Assert.Equal(0, limiter.GetAvailablePermits()); + } + + [Fact] + public override async Task CanCancelWaitAsyncBeforeQueuing() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + var ex = await Assert.ThrowsAsync(() => limiter.WaitAsync(1, cts.Token).AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + Assert.Equal(0, limiter.GetAvailablePermits()); + } + + [Fact] + public override async Task CancelUpdatesQueueLimit() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + Assert.Equal(1, limiter.GetAvailablePermits()); + } + + [Fact] + public override void NoMetadataOnAcquiredLease() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.False(lease.TryGetMetadata(MetadataName.RetryAfter, out _)); + } + + [Fact] + public override void MetadataNamesContainsAllMetadata() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.RetryAfter.Name)); + } + + [Fact] + public override async Task DisposeReleasesQueuedAcquires() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + limiter.Dispose(); + + lease = await wait1; + Assert.False(lease.IsAcquired); + lease = await wait2; + Assert.False(lease.IsAcquired); + lease = await wait3; + Assert.False(lease.IsAcquired); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public override async Task DisposeAsyncReleasesQueuedAcquires() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + await limiter.DisposeAsync(); + + lease = await wait1; + Assert.False(lease.IsAcquired); + lease = await wait2; + Assert.False(lease.IsAcquired); + lease = await wait3; + Assert.False(lease.IsAcquired); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public void TryReplenishWithAutoReplenish_ReturnsFalse() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(1), 1, autoReplenishment: true)); + Assert.Equal(2, limiter.GetAvailablePermits()); + Assert.False(limiter.TryReplenish()); + Assert.Equal(2, limiter.GetAvailablePermits()); + } + + [Fact] + public async Task AutoReplenish_ReplenishesCounters() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromMilliseconds(1000), 2, autoReplenishment: true)); + Assert.Equal(2, limiter.GetAvailablePermits()); + limiter.Acquire(2); + + var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithWaitAsyncWithQueuedItemsIfNewestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, 3, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + Assert.Equal(1, limiter.GetAvailablePermits()); + lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + Assert.True(limiter.TryReplenish()); + + Assert.False(wait.IsCompleted); + + Assert.True(limiter.TryReplenish()); + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithWaitAsyncWithQueuedItemsIfOldestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 5, + TimeSpan.Zero, 2, autoReplenishment: false)); + + var lease = limiter.Acquire(3); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + Assert.False(wait.IsCompleted); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3, + TimeSpan.Zero, 2, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithAcquireWithQueuedItemsIfOldestFirst() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, 2, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + lease = limiter.Acquire(1); + Assert.False(lease.IsAcquired); + + limiter.TryReplenish(); + Assert.True(limiter.TryReplenish()); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void NullIdleDurationWhenActive() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.FromMilliseconds(2), 1, autoReplenishment: false)); + limiter.Acquire(1); + Assert.Null(limiter.IdleDuration); + } + + [Fact] + public override async Task IdleDurationUpdatesWhenIdle() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.FromMilliseconds(2), 2, autoReplenishment: false)); + Assert.NotNull(limiter.IdleDuration); + var previousDuration = limiter.IdleDuration; + await Task.Delay(15); + Assert.True(previousDuration < limiter.IdleDuration); + } + + [Fact] + public override void IdleDurationUpdatesWhenChangingFromActive() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.Zero, 2, autoReplenishment: false)); + limiter.Acquire(1); + limiter.TryReplenish(); + limiter.TryReplenish(); + Assert.NotNull(limiter.IdleDuration); + } + + [Fact] + public void ReplenishingRateLimiterPropertiesHaveCorrectValues() + { + var replenishPeriod = TimeSpan.FromMinutes(1); + using ReplenishingRateLimiter limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + replenishPeriod, 1, autoReplenishment: true)); + Assert.True(limiter.IsAutoReplenishing); + Assert.Equal(replenishPeriod, limiter.ReplenishmentPeriod); + + replenishPeriod = TimeSpan.FromSeconds(2); + using ReplenishingRateLimiter limiter2 = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + replenishPeriod, 1, autoReplenishment: false)); + Assert.False(limiter2.IsAutoReplenishing); + Assert.Equal(replenishPeriod, limiter2.ReplenishmentPeriod); + } + + [Fact] + public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + // Add another item to queue, will be completed as failed later when we queue another item + var wait2 = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + lease.Dispose(); + limiter.TryReplenish(); + + var wait3 = limiter.WaitAsync(2); + Assert.False(wait3.IsCompleted); + + // will be kicked by wait3 because we're using NewestFirst + lease = await wait2; + Assert.False(lease.IsAcquired); + + limiter.TryReplenish(); + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanDisposeAfterCancelingQueuedRequest() + { + var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item + limiter.Dispose(); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj index 4bb1f2a339264..ca8bf22f743f6 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj +++ b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj @@ -6,9 +6,15 @@ + + + + + + diff --git a/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs index f0bcea61a6d4f..1fb831ab4d58a 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs @@ -366,6 +366,58 @@ public override async Task CanCancelWaitAsyncAfterQueuing() Assert.Equal(1, limiter.GetAvailablePermits()); } + [Fact] + public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, 2, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + // Add another item to queue, will be completed as failed later when we queue another item + var wait2 = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + lease.Dispose(); + + var wait3 = limiter.WaitAsync(2); + Assert.False(wait3.IsCompleted); + + // will be kicked by wait3 because we're using NewestFirst + lease = await wait2; + Assert.False(lease.IsAcquired); + + limiter.TryReplenish(); + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanDisposeAfterCancelingQueuedRequest() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + var ex = await Assert.ThrowsAsync(() => wait.AsTask()); + Assert.Equal(cts.Token, ex.CancellationToken); + + // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item + limiter.Dispose(); + } + [Fact] public override async Task CanCancelWaitAsyncBeforeQueuing() {