Skip to content


feat: add stream that rate limits request/response streams. (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
skwasjer authored May 27, 2022
1 parent 486f762 commit 58b2183
Show file tree
Hide file tree
Showing 2 changed files with 319 additions and 0 deletions.
141 changes: 141 additions & 0 deletions src/MockHttp/IO/RateLimitedStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System.Diagnostics;

namespace MockHttp.IO;

/// <summary>
/// A stream that rate limits read IO for a provided stream, which can be used to simulate network transfer rates with large data streams.
/// <para>
/// While reading from the stream, the read throughput is averaged over time and throttled to match the requested bit rate as close as possible.
/// </para>
/// </summary>
/// <remarks>
/// - Does not support writing.
/// - Do not use in real world applications, only tests, kty!
/// - Not 100% accurate (just like real world :p).
/// </remarks>
public class RateLimitedStream : Stream
private const int SampleRate = 30; // How often to take samples (per sec).
private const int MaxSampleCount = SampleRate * 5; // How many samples to keep track of to calculate average bit rate.
private const int Delay = 1000 / SampleRate;
internal const int MinBitRate = 128;
private const int MaxBufferSize = 2 << 15; // 128KB

private readonly Stopwatch _stopwatch;
private readonly Stream _actualStream;
private readonly int _byteRate;
private readonly Queue<Measurement> _samples;

/// <summary>
/// Initializes a new instance of the <see cref="RateLimitedStream" />.
/// </summary>
/// <param name="actualStream">The actual stream to wrap.</param>
/// <param name="bitRate">The bit rate to simulate.</param>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="actualStream" /> is null.</exception>
/// <exception cref="ArgumentException">Thrown when the stream is not readable or the bit rate is less than 128.</exception>
public RateLimitedStream(Stream actualStream, int bitRate)
_actualStream = actualStream ?? throw new ArgumentNullException(nameof(actualStream));
if (!_actualStream.CanRead)
throw new ArgumentException("Cannot read from stream.", nameof(actualStream));

if (bitRate <= MinBitRate)
throw new ArgumentOutOfRangeException(nameof(bitRate), $"Bit rate must be higher than {MinBitRate}.");

_byteRate = bitRate / 8; // We are computing bytes transferred.
_stopwatch = new Stopwatch();
_samples = new Queue<Measurement>();

/// <inheritdoc />
public override void Flush()

/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)

// Do not read more from stream than what is allowed per sec and/or capped.
int chokeByteCount = Math.Min(Math.Min(count, _byteRate), MaxBufferSize);

int bytesRead = _actualStream.Read(buffer, offset, chokeByteCount);
// Add a new measurement to the queue.
_samples.Enqueue(new Measurement(_stopwatch.Elapsed, bytesRead));

// Throttle until average bit rate drops below threshold.
while (GetAverageTransferRateInBytesPerSec() > _byteRate)
_samples.Enqueue(new Measurement(_stopwatch.Elapsed, 0));

return bytesRead;

/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
return _actualStream.Seek(offset, origin);

/// <inheritdoc />
public override void SetLength(long value)
throw new NotSupportedException("Modifying stream is not supported.");

/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
throw new NotSupportedException("Modifying stream is not supported.");

/// <inheritdoc />
public override bool CanRead => _actualStream.CanRead;

/// <inheritdoc />
public override bool CanSeek => _actualStream.CanSeek;

/// <inheritdoc />
public override bool CanWrite => false;

/// <inheritdoc />
public override long Length => _actualStream.Length;

/// <inheritdoc />
public override long Position
get => _actualStream.Position;
set => _actualStream.Position = value;

/// <summary>
/// Gets the current average transfer rate per sec.
/// </summary>
private double GetAverageTransferRateInBytesPerSec()
// Remove measurements if we have too many.
while (_samples.Count > MaxSampleCount)

//if (_samples.Count == 0)
// return 0;

int bytesReadForAllSamples = _samples.Sum(m => m.BytesRead);
TimeSpan totalSampleTime = _stopwatch.Elapsed - _samples.First().Time;
return bytesReadForAllSamples / totalSampleTime.TotalSeconds;

private record struct Measurement(TimeSpan Time, int BytesRead);
178 changes: 178 additions & 0 deletions test/MockHttp.Tests/RateLimitedStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#nullable enable
using System.Diagnostics;
using FluentAssertions;
using MockHttp.IO;
using Moq;
using Xunit;
using Xunit.Abstractions;

namespace MockHttp;

public class RateLimitedStreamTests : IDisposable
private readonly ITestOutputHelper _testOutputHelper;
private const int DataSizeInBytes = 512 * 1024; // 512 KB
private const int BitRate = 1024000; // 1024 kbps = 128 KB/s
private static readonly TimeSpan ExpectedTotalTime = TimeSpan.FromSeconds(DataSizeInBytes / ((double)BitRate / 8)); // ~ 4sec

private readonly MemoryStream _actualStream;
private readonly RateLimitedStream _sut;

private readonly byte[] _content = Enumerable.Range(0, DataSizeInBytes)
.Select((_, index) => (byte)(index % 256))

public RateLimitedStreamTests(ITestOutputHelper testOutputHelper)
_testOutputHelper = testOutputHelper;
_actualStream = new MemoryStream(_content);
_sut = new RateLimitedStream(_actualStream, BitRate);

public void When_reading_from_stream_it_should_be_rate_limited()
using var ms = new MemoryStream();

byte[] buffer = new byte[4096];
var sw = Stopwatch.StartNew();
long totalBytesRead = 0;
int bytesRead;
int readCount = 0;
while ((bytesRead = _sut.Read(buffer, 0, buffer.Length)) > 0)
totalBytesRead += bytesRead;
_testOutputHelper.WriteLine("Read: {0:000}, Time: {1}, Total bytes read: {2}/{3}", readCount, sw.Elapsed, totalBytesRead, DataSizeInBytes);
ms.Write(buffer, 0, bytesRead);

ms.ToArray().Should().BeEquivalentTo(_content, opts => opts.WithStrictOrdering());

[InlineData(100, 100)]
[InlineData(1000, 1000 - 768)]
public void Given_that_position_is_set_when_reading_next_byte_it_should_return_expected_and_advance(long newPosition, byte expectedByte)
// Act
_sut.Position = newPosition;
int nextByte = _sut.ReadByte();

// Assert
_sut.Position.Should().Be(newPosition + 1);

public void Given_that_actual_stream_is_null_when_creating_instance_it_should_throw()
Stream? actualStream = null;

// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(actualStream!, 1024);

// Assert

public void Given_that_actual_stream_is_not_readable_when_creating_instance_it_should_throw()
var actualStream = new Mock<Stream>();
.Setup(m => m.CanRead)

// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(actualStream.Object, 1024);

// Assert
.WithMessage("Cannot read from stream.*")

public void Given_that_bitRate_is_invalid_when_creating_instance_it_should_throw(int bitRate)
// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(Stream.Null, bitRate);

// Assert
.WithMessage($"Bit rate must be higher than {RateLimitedStream.MinBitRate}.*")

public void It_should_set_have_expected_properties()
_sut.CanWrite.Should().BeFalse("even though actual stream supports writing, we do not");


_actualStream.Position = 1000;
_sut.Position = 100;

_sut.Seek(2000, SeekOrigin.Current);

public void When_writing_it_should_throw()
// Act
Action act = () => _sut.Write(_content, 0, 1024);

// Assert

public void When_setting_length_it_should_throw()
// Act
Action act = () => _sut.SetLength(1024);

// Assert

public void When_flushing_it_should_flush_underlying()
var streamMock = new Mock<MemoryStream> { CallBase = true };
using MemoryStream? stream = streamMock.Object;
var sut = new RateLimitedStream(stream, 1024);

// Act

// Assert
streamMock.Verify(m => m.Flush(), Times.Once);

public void Dispose()
#nullable restore

0 comments on commit 58b2183

Please sign in to comment.