Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/7.0] [RateLimiting] Handle Timer jitter #74971

Merged
merged 8 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
{
throw new ArgumentException($"{nameof(options.QueueLimit)} must be set to a value greater than or equal to 0.", nameof(options));
}
if (options.Window < TimeSpan.Zero)
if (options.Window <= TimeSpan.Zero)
{
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than or equal to TimeSpan.Zero.", nameof(options));
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than TimeSpan.Zero.", nameof(options));
}

_options = new FixedWindowRateLimiterOptions
Expand Down Expand Up @@ -287,29 +287,22 @@ private void ReplenishInternal(long nowTicks)
return;
}

if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks)
if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks && !_options.AutoReplenishment)
{
return;
}

_lastReplenishmentTick = nowTicks;

int availableRequestCounters = _requestCount;
int maxPermits = _options.PermitLimit;
int resourcesToAdd;

if (availableRequestCounters < maxPermits)
{
resourcesToAdd = maxPermits - availableRequestCounters;
}
else
if (availableRequestCounters >= _options.PermitLimit)
{
// All counters available, nothing to do
return;
}

_requestCount += resourcesToAdd;
Debug.Assert(_requestCount == _options.PermitLimit);
_requestCount = _options.PermitLimit;

// Process queued requests
while (_queue.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class FixedWindowRateLimiterOptions
{
/// <summary>
/// Specifies the time window that takes in the requests.
/// Must be set to a value >= <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="FixedWindowRateLimiter"/>.
/// Must be set to a value greater than <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="FixedWindowRateLimiter"/>.
/// </summary>
public TimeSpan Window { get; set; } = TimeSpan.Zero;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public sealed class SlidingWindowRateLimiter : ReplenishingRateLimiter

private readonly Timer? _renewTimer;
private readonly SlidingWindowRateLimiterOptions _options;
private readonly TimeSpan _replenishmentPeriod;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();

// Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
Expand All @@ -42,7 +43,7 @@ public sealed class SlidingWindowRateLimiter : ReplenishingRateLimiter
public override bool IsAutoReplenishing => _options.AutoReplenishment;

/// <inheritdoc />
public override TimeSpan ReplenishmentPeriod => new TimeSpan(_options.Window.Ticks / _options.SegmentsPerWindow);
public override TimeSpan ReplenishmentPeriod => _replenishmentPeriod;

/// <summary>
/// Initializes the <see cref="SlidingWindowRateLimiter"/>.
Expand All @@ -62,9 +63,9 @@ public SlidingWindowRateLimiter(SlidingWindowRateLimiterOptions options)
{
throw new ArgumentException($"{nameof(options.QueueLimit)} must be set to a value greater than or equal to 0.", nameof(options));
}
if (options.Window < TimeSpan.Zero)
if (options.Window <= TimeSpan.Zero)
{
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than or equal to TimeSpan.Zero.", nameof(options));
throw new ArgumentException($"{nameof(options.Window)} must be set to a value greater than TimeSpan.Zero.", nameof(options));
}

_options = new SlidingWindowRateLimiterOptions
Expand All @@ -78,6 +79,7 @@ public SlidingWindowRateLimiter(SlidingWindowRateLimiterOptions options)
};

_requestCount = options.PermitLimit;
_replenishmentPeriod = new TimeSpan(_options.Window.Ticks / _options.SegmentsPerWindow);

// _requestsPerSegment holds the no. of acquired requests in each window segment
_requestsPerSegment = new int[options.SegmentsPerWindow];
Expand Down Expand Up @@ -287,7 +289,7 @@ private void ReplenishInternal(long nowTicks)
return;
}

if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < ReplenishmentPeriod.Ticks)
if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < ReplenishmentPeriod.Ticks && !_options.AutoReplenishment)
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class SlidingWindowRateLimiterOptions
{
/// <summary>
/// Specifies the minimum period between replenishments.
/// Must be set to a value >= <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="SlidingWindowRateLimiter"/>.
/// Must be set to a value greater than <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="SlidingWindowRateLimiter"/>.
/// </summary>
public TimeSpan Window { get; set; } = TimeSpan.Zero;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace System.Threading.RateLimiting
/// </summary>
public sealed class TokenBucketRateLimiter : ReplenishingRateLimiter
{
private int _tokenCount;
private double _tokenCount;
private int _queueCount;
private long _lastReplenishmentTick;
private long? _idleSince;
Expand All @@ -22,6 +22,7 @@ public sealed class TokenBucketRateLimiter : ReplenishingRateLimiter
private long _failedLeasesCount;
private long _successfulLeasesCount;

private readonly double _fillRate;
private readonly Timer? _renewTimer;
private readonly TokenBucketRateLimiterOptions _options;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();
Expand Down Expand Up @@ -60,9 +61,9 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
{
throw new ArgumentException($"{nameof(options.QueueLimit)} must be set to a value greater than or equal to 0.", nameof(options));
}
if (options.ReplenishmentPeriod < TimeSpan.Zero)
if (options.ReplenishmentPeriod <= TimeSpan.Zero)
{
throw new ArgumentException($"{nameof(options.ReplenishmentPeriod)} must be set to a value greater than or equal to TimeSpan.Zero.", nameof(options));
throw new ArgumentException($"{nameof(options.ReplenishmentPeriod)} must be set to a value greater than TimeSpan.Zero.", nameof(options));
}

_options = new TokenBucketRateLimiterOptions
Expand All @@ -76,6 +77,7 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
};

_tokenCount = options.TokenLimit;
_fillRate = (double)options.TokensPerPeriod / options.ReplenishmentPeriod.Ticks;

_idleSince = _lastReplenishmentTick = Stopwatch.GetTimestamp();

Expand All @@ -91,7 +93,7 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
ThrowIfDisposed();
return new RateLimiterStatistics()
{
CurrentAvailablePermits = _tokenCount,
CurrentAvailablePermits = (long)_tokenCount,
CurrentQueuedCount = _queueCount,
TotalFailedLeases = Interlocked.Read(ref _failedLeasesCount),
TotalSuccessfulLeases = Interlocked.Read(ref _successfulLeasesCount),
Expand Down Expand Up @@ -210,7 +212,7 @@ protected override ValueTask<RateLimitLease> AcquireAsyncCore(int tokenCount, Ca

private RateLimitLease CreateFailedTokenLease(int tokenCount)
{
int replenishAmount = tokenCount - _tokenCount + _queueCount;
int replenishAmount = tokenCount - (int)_tokenCount + _queueCount;
// can't have 0 replenish periods, that would mean it should be a successful lease
// if TokensPerPeriod is larger than the replenishAmount needed then it would be 0
Debug.Assert(_options.TokensPerPeriod > 0);
Expand Down Expand Up @@ -278,7 +280,7 @@ private static void Replenish(object? state)
limiter!.ReplenishInternal(nowTicks);
}

// Used in tests that test behavior with specific time intervals
// Used in tests to avoid dealing with real time
private void ReplenishInternal(long nowTicks)
{
// method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
Expand All @@ -289,45 +291,43 @@ private void ReplenishInternal(long nowTicks)
return;
}

if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.ReplenishmentPeriod.Ticks)
if (_tokenCount == _options.TokenLimit)
{
return;
}

_lastReplenishmentTick = nowTicks;

int availablePermits = _tokenCount;
TokenBucketRateLimiterOptions options = _options;
int maxPermits = options.TokenLimit;
int resourcesToAdd;
double add;

if (availablePermits < maxPermits)
// Trust the timer to be close enough to when we want to replenish, this avoids issues with Timer jitter where it might be .99 seconds instead of 1, and 1.1 seconds the next time etc.
if (_options.AutoReplenishment)
{
resourcesToAdd = Math.Min(options.TokensPerPeriod, maxPermits - availablePermits);
add = _options.TokensPerPeriod;
}
else
{
// All tokens available, nothing to do
return;
add = _fillRate * (nowTicks - _lastReplenishmentTick) * TickFrequency;
}

_tokenCount = Math.Min(_options.TokenLimit, _tokenCount + add);

_lastReplenishmentTick = nowTicks;

// Process queued requests
Deque<RequestRegistration> queue = _queue;

_tokenCount += resourcesToAdd;
Debug.Assert(_tokenCount <= _options.TokenLimit);
while (queue.Count > 0)
{
RequestRegistration nextPendingRequest =
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? queue.PeekHead()
: queue.PeekTail();

if (_tokenCount >= nextPendingRequest.Count)
{
// Request can be fulfilled
nextPendingRequest =
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? queue.DequeueHead()
: queue.DequeueTail();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class TokenBucketRateLimiterOptions
{
/// <summary>
/// Specifies the minimum period between replenishments.
/// Must be set to a value >= <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="TokenBucketRateLimiter"/>.
/// Must be set to a value greater than <see cref="TimeSpan.Zero" /> by the time these options are passed to the constructor of <see cref="TokenBucketRateLimiter"/>.
/// </summary>
public TimeSpan ReplenishmentPeriod { get; set; } = TimeSpan.Zero;

Expand Down
Loading