Skip to content

Commit

Permalink
feat: add stream that rate limits request/response streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
skwasjer committed May 26, 2022
1 parent 486f762 commit 97c0456
Show file tree
Hide file tree
Showing 2 changed files with 319 additions and 0 deletions.
141 changes: 141 additions & 0 deletions src/MockHttp/IO/RateLimitedStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System.Diagnostics;

namespace MockHttp.IO;

/// <summary>
/// A stream that rate limits read IO for a provided stream, which can be used to simulate network transfer rates with large data streams.
/// <para>
/// While reading from the stream, the read throughput is averaged over time and throttled to match the requested bit rate as close as possible.
/// </para>
/// </summary>
/// <remarks>
/// - Does not support writing.
/// - Do not use in real world applications, only tests, kty!
/// - Not 100% accurate (just like real world :p).
/// </remarks>
public class RateLimitedStream : Stream
{
private const int SampleRate = 30; // How often to take samples (per sec).
private const int MaxSampleCount = SampleRate * 5; // How many samples to keep track of to calculate average bit rate.
private const int Delay = 1000 / SampleRate;
internal const int MinBitRate = 128;
private const int MaxBufferSize = 2 << 15; // 128KB

private readonly Stopwatch _stopwatch;
private readonly Stream _actualStream;
private readonly int _byteRate;
private readonly Queue<Measurement> _samples;

/// <summary>
/// Initializes a new instance of the <see cref="RateLimitedStream" />.
/// </summary>
/// <param name="actualStream">The actual stream to wrap.</param>
/// <param name="bitRate">The bit rate to simulate.</param>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="actualStream" /> is null.</exception>
/// <exception cref="ArgumentException">Thrown when the stream is not readable or the bit rate is less than 128.</exception>
public RateLimitedStream(Stream actualStream, int bitRate)
{
_actualStream = actualStream ?? throw new ArgumentNullException(nameof(actualStream));
if (!_actualStream.CanRead)
{
throw new ArgumentException("Cannot read from stream.", nameof(actualStream));
}

if (bitRate <= MinBitRate)
{
throw new ArgumentOutOfRangeException(nameof(bitRate), $"Bit rate must be higher than {MinBitRate}.");
}

_byteRate = bitRate / 8; // We are computing bytes transferred.
_stopwatch = new Stopwatch();
_samples = new Queue<Measurement>();
}

/// <inheritdoc />
public override void Flush()
{
_actualStream.Flush();
}

/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
_stopwatch.Start();

// Do not read more from stream than what is allowed per sec and/or capped.
int chokeByteCount = Math.Min(Math.Min(count, _byteRate), MaxBufferSize);

int bytesRead = _actualStream.Read(buffer, offset, chokeByteCount);
// Add a new measurement to the queue.
_samples.Enqueue(new Measurement(_stopwatch.Elapsed, bytesRead));

// Throttle until average bit rate drops below threshold.
while (GetAverageTransferRateInBytesPerSec() > _byteRate)
{
_samples.Enqueue(new Measurement(_stopwatch.Elapsed, 0));
Thread.Sleep(Delay);
}

return bytesRead;
}

/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
{
return _actualStream.Seek(offset, origin);
}

/// <inheritdoc />
public override void SetLength(long value)
{
throw new NotSupportedException("Modifying stream is not supported.");
}

/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Modifying stream is not supported.");
}

/// <inheritdoc />
public override bool CanRead => _actualStream.CanRead;

/// <inheritdoc />
public override bool CanSeek => _actualStream.CanSeek;

/// <inheritdoc />
public override bool CanWrite => false;

/// <inheritdoc />
public override long Length => _actualStream.Length;

/// <inheritdoc />
public override long Position
{
get => _actualStream.Position;
set => _actualStream.Position = value;
}

/// <summary>
/// Gets the current average transfer rate per sec.
/// </summary>
private double GetAverageTransferRateInBytesPerSec()
{
// Remove measurements if we have too many.
while (_samples.Count > MaxSampleCount)
{
_samples.Dequeue();
}

//if (_samples.Count == 0)
//{
// return 0;
//}

int bytesReadForAllSamples = _samples.Sum(m => m.BytesRead);
TimeSpan totalSampleTime = _stopwatch.Elapsed - _samples.First().Time;
return bytesReadForAllSamples / totalSampleTime.TotalSeconds;
}

private record struct Measurement(TimeSpan Time, int BytesRead);
}
178 changes: 178 additions & 0 deletions test/MockHttp.Tests/RateLimitedStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#nullable enable
using System.Diagnostics;
using FluentAssertions;
using MockHttp.IO;
using Moq;
using Xunit;
using Xunit.Abstractions;

namespace MockHttp;

public class RateLimitedStreamTests : IDisposable
{
private readonly ITestOutputHelper _testOutputHelper;
private const int DataSizeInBytes = 512 * 1024; // 512 KB
private const int BitRate = 1024000; // 1024 kbps = 128 KB/s
private static readonly TimeSpan ExpectedTotalTime = TimeSpan.FromSeconds(DataSizeInBytes / ((double)BitRate / 8)); // ~ 4sec

private readonly MemoryStream _actualStream;
private readonly RateLimitedStream _sut;

private readonly byte[] _content = Enumerable.Range(0, DataSizeInBytes)
.Select((_, index) => (byte)(index % 256))
.ToArray();

public RateLimitedStreamTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
_actualStream = new MemoryStream(_content);
_sut = new RateLimitedStream(_actualStream, BitRate);
}

[Fact]
public void When_reading_from_stream_it_should_be_rate_limited()
{
using var ms = new MemoryStream();

byte[] buffer = new byte[4096];
var sw = Stopwatch.StartNew();
long totalBytesRead = 0;
int bytesRead;
int readCount = 0;
while ((bytesRead = _sut.Read(buffer, 0, buffer.Length)) > 0)
{
totalBytesRead += bytesRead;
readCount++;
_testOutputHelper.WriteLine("Read: {0:000}, Time: {1}, Total bytes read: {2}/{3}", readCount, sw.Elapsed, totalBytesRead, DataSizeInBytes);
ms.Write(buffer, 0, bytesRead);
}

sw.Elapsed.Should().BeGreaterThanOrEqualTo(ExpectedTotalTime);
ms.ToArray().Should().BeEquivalentTo(_content, opts => opts.WithStrictOrdering());
}

[Theory]
[InlineData(100, 100)]
[InlineData(1000, 1000 - 768)]
public void Given_that_position_is_set_when_reading_next_byte_it_should_return_expected_and_advance(long newPosition, byte expectedByte)
{
// Act
_sut.Position = newPosition;
int nextByte = _sut.ReadByte();

// Assert
nextByte.Should().Be(expectedByte);
_sut.Position.Should().Be(newPosition + 1);
}

[Fact]
public void Given_that_actual_stream_is_null_when_creating_instance_it_should_throw()
{
Stream? actualStream = null;

// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(actualStream!, 1024);

// Assert
act.Should()
.ThrowExactly<ArgumentNullException>()
.WithParameterName(nameof(actualStream));
}

[Fact]
public void Given_that_actual_stream_is_not_readable_when_creating_instance_it_should_throw()
{
var actualStream = new Mock<Stream>();
actualStream
.Setup(m => m.CanRead)
.Returns(false)
.Verifiable();

// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(actualStream.Object, 1024);

// Assert
act.Should()
.ThrowExactly<ArgumentException>()
.WithMessage("Cannot read from stream.*")
.WithParameterName(nameof(actualStream));
actualStream.Verify();
}

[Theory]
[InlineData(-1)]
[InlineData(0)]
[InlineData(127)]
public void Given_that_bitRate_is_invalid_when_creating_instance_it_should_throw(int bitRate)
{
// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(Stream.Null, bitRate);

// Assert
act.Should()
.ThrowExactly<ArgumentOutOfRangeException>()
.WithMessage($"Bit rate must be higher than {RateLimitedStream.MinBitRate}.*")
.WithParameterName(nameof(bitRate));
}

[Fact]
public void It_should_set_have_expected_properties()
{
_actualStream.Should().BeOfType<MemoryStream>();
_sut.CanRead.Should().BeTrue();
_sut.CanWrite.Should().BeFalse("even though actual stream supports writing, we do not");
_sut.CanSeek.Should().BeTrue();

_actualStream.Length.Should().NotBe(0);
_sut.Length.Should().Be(_actualStream.Length);

_actualStream.Position = 1000;
_sut.Position.Should().Be(1000);
_sut.Position = 100;
_actualStream.Position.Should().Be(100);

_sut.Seek(2000, SeekOrigin.Current);
_actualStream.Position.Should().Be(2100);
}

[Fact]
public void When_writing_it_should_throw()
{
// Act
Action act = () => _sut.Write(_content, 0, 1024);

// Assert
act.Should().ThrowExactly<NotSupportedException>();
}

[Fact]
public void When_setting_length_it_should_throw()
{
// Act
Action act = () => _sut.SetLength(1024);

// Assert
act.Should().ThrowExactly<NotSupportedException>();
}

[Fact]
public void When_flushing_it_should_flush_underlying()
{
var streamMock = new Mock<MemoryStream> { CallBase = true };
using MemoryStream? stream = streamMock.Object;
var sut = new RateLimitedStream(stream, 1024);

// Act
sut.Flush();

// Assert
streamMock.Verify(m => m.Flush(), Times.Once);
}

public void Dispose()
{
_actualStream?.Dispose();
_sut?.Dispose();
}
}
#nullable restore

0 comments on commit 97c0456

Please sign in to comment.