diff --git a/src/MockHttp/IO/RateLimitedStream.cs b/src/MockHttp/IO/RateLimitedStream.cs new file mode 100644 index 00000000..727ce74f --- /dev/null +++ b/src/MockHttp/IO/RateLimitedStream.cs @@ -0,0 +1,141 @@ +using System.Diagnostics; + +namespace MockHttp.IO; + +/// +/// A stream that rate limits read IO for a provided stream, which can be used to simulate network transfer rates with large data streams. +/// +/// While reading from the stream, the read throughput is averaged over time and throttled to match the requested bit rate as close as possible. +/// +/// +/// +/// - Does not support writing. +/// - Do not use in real world applications, only tests, kty! +/// - Not 100% accurate (just like real world :p). +/// +public class RateLimitedStream : Stream +{ + private const int SampleRate = 30; // How often to take samples (per sec). + private const int MaxSampleCount = SampleRate * 5; // How many samples to keep track of to calculate average bit rate. + private const int Delay = 1000 / SampleRate; + internal const int MinBitRate = 128; + private const int MaxBufferSize = 2 << 15; // 128KB + + private readonly Stopwatch _stopwatch; + private readonly Stream _actualStream; + private readonly int _byteRate; + private readonly Queue _samples; + + /// + /// Initializes a new instance of the . + /// + /// The actual stream to wrap. + /// The bit rate to simulate. + /// Thrown when is null. + /// Thrown when the stream is not readable or the bit rate is less than 128. + public RateLimitedStream(Stream actualStream, int bitRate) + { + _actualStream = actualStream ?? throw new ArgumentNullException(nameof(actualStream)); + if (!_actualStream.CanRead) + { + throw new ArgumentException("Cannot read from stream.", nameof(actualStream)); + } + + if (bitRate <= MinBitRate) + { + throw new ArgumentOutOfRangeException(nameof(bitRate), $"Bit rate must be higher than {MinBitRate}."); + } + + _byteRate = bitRate / 8; // We are computing bytes transferred. + _stopwatch = new Stopwatch(); + _samples = new Queue(); + } + + /// + public override void Flush() + { + _actualStream.Flush(); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + _stopwatch.Start(); + + // Do not read more from stream than what is allowed per sec and/or capped. + int chokeByteCount = Math.Min(Math.Min(count, _byteRate), MaxBufferSize); + + int bytesRead = _actualStream.Read(buffer, offset, chokeByteCount); + // Add a new measurement to the queue. + _samples.Enqueue(new Measurement(_stopwatch.Elapsed, bytesRead)); + + // Throttle until average bit rate drops below threshold. + while (GetAverageTransferRateInBytesPerSec() > _byteRate) + { + _samples.Enqueue(new Measurement(_stopwatch.Elapsed, 0)); + Thread.Sleep(Delay); + } + + return bytesRead; + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + return _actualStream.Seek(offset, origin); + } + + /// + public override void SetLength(long value) + { + throw new NotSupportedException("Modifying stream is not supported."); + } + + /// + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException("Modifying stream is not supported."); + } + + /// + public override bool CanRead => _actualStream.CanRead; + + /// + public override bool CanSeek => _actualStream.CanSeek; + + /// + public override bool CanWrite => false; + + /// + public override long Length => _actualStream.Length; + + /// + public override long Position + { + get => _actualStream.Position; + set => _actualStream.Position = value; + } + + /// + /// Gets the current average transfer rate per sec. + /// + private double GetAverageTransferRateInBytesPerSec() + { + // Remove measurements if we have too many. + while (_samples.Count > MaxSampleCount) + { + _samples.Dequeue(); + } + + //if (_samples.Count == 0) + //{ + // return 0; + //} + + int bytesReadForAllSamples = _samples.Sum(m => m.BytesRead); + TimeSpan totalSampleTime = _stopwatch.Elapsed - _samples.First().Time; + return bytesReadForAllSamples / totalSampleTime.TotalSeconds; + } + + private record struct Measurement(TimeSpan Time, int BytesRead); +} diff --git a/test/MockHttp.Tests/RateLimitedStreamTests.cs b/test/MockHttp.Tests/RateLimitedStreamTests.cs new file mode 100644 index 00000000..e7730a02 --- /dev/null +++ b/test/MockHttp.Tests/RateLimitedStreamTests.cs @@ -0,0 +1,178 @@ +#nullable enable +using System.Diagnostics; +using FluentAssertions; +using MockHttp.IO; +using Moq; +using Xunit; +using Xunit.Abstractions; + +namespace MockHttp; + +public class RateLimitedStreamTests : IDisposable +{ + private readonly ITestOutputHelper _testOutputHelper; + private const int DataSizeInBytes = 512 * 1024; // 512 KB + private const int BitRate = 1024000; // 1024 kbps = 128 KB/s + private static readonly TimeSpan ExpectedTotalTime = TimeSpan.FromSeconds(DataSizeInBytes / ((double)BitRate / 8)); // ~ 4sec + + private readonly MemoryStream _actualStream; + private readonly RateLimitedStream _sut; + + private readonly byte[] _content = Enumerable.Range(0, DataSizeInBytes) + .Select((_, index) => (byte)(index % 256)) + .ToArray(); + + public RateLimitedStreamTests(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + _actualStream = new MemoryStream(_content); + _sut = new RateLimitedStream(_actualStream, BitRate); + } + + [Fact] + public void When_reading_from_stream_it_should_be_rate_limited() + { + using var ms = new MemoryStream(); + + byte[] buffer = new byte[4096]; + var sw = Stopwatch.StartNew(); + long totalBytesRead = 0; + int bytesRead; + int readCount = 0; + while ((bytesRead = _sut.Read(buffer, 0, buffer.Length)) > 0) + { + totalBytesRead += bytesRead; + readCount++; + _testOutputHelper.WriteLine("Read: {0:000}, Time: {1}, Total bytes read: {2}/{3}", readCount, sw.Elapsed, totalBytesRead, DataSizeInBytes); + ms.Write(buffer, 0, bytesRead); + } + + sw.Elapsed.Should().BeGreaterThanOrEqualTo(ExpectedTotalTime); + ms.ToArray().Should().BeEquivalentTo(_content, opts => opts.WithStrictOrdering()); + } + + [Theory] + [InlineData(100, 100)] + [InlineData(1000, 1000 - 768)] + public void Given_that_position_is_set_when_reading_next_byte_it_should_return_expected_and_advance(long newPosition, byte expectedByte) + { + // Act + _sut.Position = newPosition; + int nextByte = _sut.ReadByte(); + + // Assert + nextByte.Should().Be(expectedByte); + _sut.Position.Should().Be(newPosition + 1); + } + + [Fact] + public void Given_that_actual_stream_is_null_when_creating_instance_it_should_throw() + { + Stream? actualStream = null; + + // Act + Func act = () => new RateLimitedStream(actualStream!, 1024); + + // Assert + act.Should() + .ThrowExactly() + .WithParameterName(nameof(actualStream)); + } + + [Fact] + public void Given_that_actual_stream_is_not_readable_when_creating_instance_it_should_throw() + { + var actualStream = new Mock(); + actualStream + .Setup(m => m.CanRead) + .Returns(false) + .Verifiable(); + + // Act + Func act = () => new RateLimitedStream(actualStream.Object, 1024); + + // Assert + act.Should() + .ThrowExactly() + .WithMessage("Cannot read from stream.*") + .WithParameterName(nameof(actualStream)); + actualStream.Verify(); + } + + [Theory] + [InlineData(-1)] + [InlineData(0)] + [InlineData(127)] + public void Given_that_bitRate_is_invalid_when_creating_instance_it_should_throw(int bitRate) + { + // Act + Func act = () => new RateLimitedStream(Stream.Null, bitRate); + + // Assert + act.Should() + .ThrowExactly() + .WithMessage($"Bit rate must be higher than {RateLimitedStream.MinBitRate}.*") + .WithParameterName(nameof(bitRate)); + } + + [Fact] + public void It_should_set_have_expected_properties() + { + _actualStream.Should().BeOfType(); + _sut.CanRead.Should().BeTrue(); + _sut.CanWrite.Should().BeFalse("even though actual stream supports writing, we do not"); + _sut.CanSeek.Should().BeTrue(); + + _actualStream.Length.Should().NotBe(0); + _sut.Length.Should().Be(_actualStream.Length); + + _actualStream.Position = 1000; + _sut.Position.Should().Be(1000); + _sut.Position = 100; + _actualStream.Position.Should().Be(100); + + _sut.Seek(2000, SeekOrigin.Current); + _actualStream.Position.Should().Be(2100); + } + + [Fact] + public void When_writing_it_should_throw() + { + // Act + Action act = () => _sut.Write(_content, 0, 1024); + + // Assert + act.Should().ThrowExactly(); + } + + [Fact] + public void When_setting_length_it_should_throw() + { + // Act + Action act = () => _sut.SetLength(1024); + + // Assert + act.Should().ThrowExactly(); + } + + [Fact] + public void When_flushing_it_should_flush_underlying() + { + var streamMock = new Mock { CallBase = true }; + using MemoryStream? stream = streamMock.Object; + var sut = new RateLimitedStream(stream, 1024); + + // Act + sut.Flush(); + + // Assert + streamMock.Verify(m => m.Flush(), Times.Once); + } + + public void Dispose() + { + _actualStream?.Dispose(); + _sut?.Dispose(); + } +} +#nullable restore