Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stream that rate limits response streams. #17

Merged
merged 1 commit into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/MockHttp/IO/RateLimitedStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System.Diagnostics;

namespace MockHttp.IO;

/// <summary>
/// A stream that rate limits read IO for a provided stream, which can be used to simulate network transfer rates with large data streams.
/// <para>
/// While reading from the stream, the read throughput is averaged over time and throttled to match the requested bit rate as close as possible.
/// </para>
/// </summary>
/// <remarks>
/// - Does not support writing.
/// - Do not use in real world applications, only tests, kty!
/// - Not 100% accurate (just like real world :p).
/// </remarks>
public class RateLimitedStream : Stream
{
private const int SampleRate = 30; // How often to take samples (per sec).
private const int MaxSampleCount = SampleRate * 5; // How many samples to keep track of to calculate average bit rate.
private const int Delay = 1000 / SampleRate;
internal const int MinBitRate = 128;
private const int MaxBufferSize = 2 << 15; // 128KB

private readonly Stopwatch _stopwatch;
private readonly Stream _actualStream;
private readonly int _byteRate;
private readonly Queue<Measurement> _samples;

/// <summary>
/// Initializes a new instance of the <see cref="RateLimitedStream" />.
/// </summary>
/// <param name="actualStream">The actual stream to wrap.</param>
/// <param name="bitRate">The bit rate to simulate.</param>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="actualStream" /> is null.</exception>
/// <exception cref="ArgumentException">Thrown when the stream is not readable or the bit rate is less than 128.</exception>
public RateLimitedStream(Stream actualStream, int bitRate)
{
_actualStream = actualStream ?? throw new ArgumentNullException(nameof(actualStream));
if (!_actualStream.CanRead)
{
throw new ArgumentException("Cannot read from stream.", nameof(actualStream));
}

if (bitRate <= MinBitRate)
{
throw new ArgumentOutOfRangeException(nameof(bitRate), $"Bit rate must be higher than {MinBitRate}.");
}

_byteRate = bitRate / 8; // We are computing bytes transferred.
_stopwatch = new Stopwatch();
_samples = new Queue<Measurement>();
}

/// <inheritdoc />
public override void Flush()
{
_actualStream.Flush();
}

/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
_stopwatch.Start();

// Do not read more from stream than what is allowed per sec and/or capped.
int chokeByteCount = Math.Min(Math.Min(count, _byteRate), MaxBufferSize);

int bytesRead = _actualStream.Read(buffer, offset, chokeByteCount);
// Add a new measurement to the queue.
_samples.Enqueue(new Measurement(_stopwatch.Elapsed, bytesRead));

// Throttle until average bit rate drops below threshold.
while (GetAverageTransferRateInBytesPerSec() > _byteRate)
{
_samples.Enqueue(new Measurement(_stopwatch.Elapsed, 0));
Thread.Sleep(Delay);
}

return bytesRead;
}

/// <inheritdoc />
public override long Seek(long offset, SeekOrigin origin)
{
return _actualStream.Seek(offset, origin);
}

/// <inheritdoc />
public override void SetLength(long value)
{
throw new NotSupportedException("Modifying stream is not supported.");
}

/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Modifying stream is not supported.");
}

/// <inheritdoc />
public override bool CanRead => _actualStream.CanRead;

/// <inheritdoc />
public override bool CanSeek => _actualStream.CanSeek;

/// <inheritdoc />
public override bool CanWrite => false;

/// <inheritdoc />
public override long Length => _actualStream.Length;

/// <inheritdoc />
public override long Position
{
get => _actualStream.Position;
set => _actualStream.Position = value;
}

/// <summary>
/// Gets the current average transfer rate per sec.
/// </summary>
private double GetAverageTransferRateInBytesPerSec()
{
// Remove measurements if we have too many.
while (_samples.Count > MaxSampleCount)
{
_samples.Dequeue();
}

//if (_samples.Count == 0)
//{
// return 0;
//}

int bytesReadForAllSamples = _samples.Sum(m => m.BytesRead);
TimeSpan totalSampleTime = _stopwatch.Elapsed - _samples.First().Time;
return bytesReadForAllSamples / totalSampleTime.TotalSeconds;
}

private record struct Measurement(TimeSpan Time, int BytesRead);
}
178 changes: 178 additions & 0 deletions test/MockHttp.Tests/RateLimitedStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#nullable enable
using System.Diagnostics;
using FluentAssertions;
using MockHttp.IO;
using Moq;
using Xunit;
using Xunit.Abstractions;

namespace MockHttp;

public class RateLimitedStreamTests : IDisposable
{
private readonly ITestOutputHelper _testOutputHelper;
private const int DataSizeInBytes = 512 * 1024; // 512 KB
private const int BitRate = 1024000; // 1024 kbps = 128 KB/s
private static readonly TimeSpan ExpectedTotalTime = TimeSpan.FromSeconds(DataSizeInBytes / ((double)BitRate / 8)); // ~ 4sec

private readonly MemoryStream _actualStream;
private readonly RateLimitedStream _sut;

private readonly byte[] _content = Enumerable.Range(0, DataSizeInBytes)
.Select((_, index) => (byte)(index % 256))
.ToArray();

public RateLimitedStreamTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
_actualStream = new MemoryStream(_content);
_sut = new RateLimitedStream(_actualStream, BitRate);
}

[Fact]
public void When_reading_from_stream_it_should_be_rate_limited()
{
using var ms = new MemoryStream();

byte[] buffer = new byte[4096];
var sw = Stopwatch.StartNew();
long totalBytesRead = 0;
int bytesRead;
int readCount = 0;
while ((bytesRead = _sut.Read(buffer, 0, buffer.Length)) > 0)
{
totalBytesRead += bytesRead;
readCount++;
_testOutputHelper.WriteLine("Read: {0:000}, Time: {1}, Total bytes read: {2}/{3}", readCount, sw.Elapsed, totalBytesRead, DataSizeInBytes);
ms.Write(buffer, 0, bytesRead);
}

sw.Elapsed.Should().BeGreaterThanOrEqualTo(ExpectedTotalTime);
ms.ToArray().Should().BeEquivalentTo(_content, opts => opts.WithStrictOrdering());
}

[Theory]
[InlineData(100, 100)]
[InlineData(1000, 1000 - 768)]
public void Given_that_position_is_set_when_reading_next_byte_it_should_return_expected_and_advance(long newPosition, byte expectedByte)
{
// Act
_sut.Position = newPosition;
int nextByte = _sut.ReadByte();

// Assert
nextByte.Should().Be(expectedByte);
_sut.Position.Should().Be(newPosition + 1);
}

[Fact]
public void Given_that_actual_stream_is_null_when_creating_instance_it_should_throw()
{
Stream? actualStream = null;

// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(actualStream!, 1024);

// Assert
act.Should()
.ThrowExactly<ArgumentNullException>()
.WithParameterName(nameof(actualStream));
}

[Fact]
public void Given_that_actual_stream_is_not_readable_when_creating_instance_it_should_throw()
{
var actualStream = new Mock<Stream>();
actualStream
.Setup(m => m.CanRead)
.Returns(false)
.Verifiable();

// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(actualStream.Object, 1024);

// Assert
act.Should()
.ThrowExactly<ArgumentException>()
.WithMessage("Cannot read from stream.*")
.WithParameterName(nameof(actualStream));
actualStream.Verify();
}

[Theory]
[InlineData(-1)]
[InlineData(0)]
[InlineData(127)]
public void Given_that_bitRate_is_invalid_when_creating_instance_it_should_throw(int bitRate)
{
// Act
Func<RateLimitedStream> act = () => new RateLimitedStream(Stream.Null, bitRate);

// Assert
act.Should()
.ThrowExactly<ArgumentOutOfRangeException>()
.WithMessage($"Bit rate must be higher than {RateLimitedStream.MinBitRate}.*")
.WithParameterName(nameof(bitRate));
}

[Fact]
public void It_should_set_have_expected_properties()
{
_actualStream.Should().BeOfType<MemoryStream>();
_sut.CanRead.Should().BeTrue();
_sut.CanWrite.Should().BeFalse("even though actual stream supports writing, we do not");
_sut.CanSeek.Should().BeTrue();

_actualStream.Length.Should().NotBe(0);
_sut.Length.Should().Be(_actualStream.Length);

_actualStream.Position = 1000;
_sut.Position.Should().Be(1000);
_sut.Position = 100;
_actualStream.Position.Should().Be(100);

_sut.Seek(2000, SeekOrigin.Current);
_actualStream.Position.Should().Be(2100);
}

[Fact]
public void When_writing_it_should_throw()
{
// Act
Action act = () => _sut.Write(_content, 0, 1024);

// Assert
act.Should().ThrowExactly<NotSupportedException>();
}

[Fact]
public void When_setting_length_it_should_throw()
{
// Act
Action act = () => _sut.SetLength(1024);

// Assert
act.Should().ThrowExactly<NotSupportedException>();
}

[Fact]
public void When_flushing_it_should_flush_underlying()
{
var streamMock = new Mock<MemoryStream> { CallBase = true };
using MemoryStream? stream = streamMock.Object;
var sut = new RateLimitedStream(stream, 1024);

// Act
sut.Flush();

// Assert
streamMock.Verify(m => m.Flush(), Times.Once);
}

public void Dispose()
{
_actualStream?.Dispose();
_sut?.Dispose();
}
}
#nullable restore