Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using BenchmarkDotNet.Attributes;
using Microsoft.IO;

namespace WebSocket.Rx.Benchmarks;

/// <summary>
/// Measures the overhead of creating <c>ReactiveWebSocketClient</c> instances.
///
/// The constructor optionally accepts a <c>RecyclableMemoryStreamManager</c>.
/// We measure:
/// - Allocation cost without vs. with a shared MemoryStreamManager
/// - GC pressure from many short-lived instances
/// No network connection required.
/// </summary>
[ShortRunJob]
[MemoryDiagnoser]
[HideColumns("Job", "RatioSD", "Error")]
public class ClientInstantiationBenchmarks
{
private static readonly Uri ServerUri = new("ws://localhost:9999");

// Shared manager – as it should be used in production code
private readonly RecyclableMemoryStreamManager _sharedManager = new();

// -----------------------------------------------------------------------
// 1) Baseline: without a custom MemoryStreamManager (new() is created internally)
// -----------------------------------------------------------------------
[Benchmark(Baseline = true, Description = "new ReactiveWebSocketClient(url)")]
public ReactiveWebSocketClient Instantiate_NoManager()
{
return new ReactiveWebSocketClient(ServerUri);
}

// -----------------------------------------------------------------------
// 2) With a shared RecyclableMemoryStreamManager
// -----------------------------------------------------------------------
[Benchmark(Description = "new ReactiveWebSocketClient(url, sharedManager)")]
public ReactiveWebSocketClient Instantiate_SharedManager()
{
return new ReactiveWebSocketClient(ServerUri, _sharedManager);
}

// -----------------------------------------------------------------------
// 3) With a dedicated RecyclableMemoryStreamManager per instance
// -----------------------------------------------------------------------
[Benchmark(Description = "new ReactiveWebSocketClient(url, new Manager())")]
public ReactiveWebSocketClient Instantiate_OwnManager()
{
return new ReactiveWebSocketClient(ServerUri, new RecyclableMemoryStreamManager());
}

// -----------------------------------------------------------------------
// 4) GC pressure: 100 short-lived instances (disposed immediately)
// -----------------------------------------------------------------------
[Benchmark(Description = "100x create + Dispose")]
public void Allocate_AndDispose_100()
{
for (var i = 0; i < 100; i++)
{
using var client = new ReactiveWebSocketClient(ServerUri, _sharedManager);
}
}
}
159 changes: 159 additions & 0 deletions src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
using System;
using System.Net;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using R3;

namespace WebSocket.Rx.Benchmarks;

/// <summary>
/// End-to-end benchmarks using <c>ReactiveWebSocketServer</c> and
/// <c>ReactiveWebSocketClient</c> over loopback.
///
/// Measures:
/// - Single round-trip latency (Send → Echo → MessageReceived)
/// - Throughput for N sequential messages
/// - Connection setup latency (ConnectAsync)
///
/// The server uses <c>ReactiveWebSocketServer</c> with a built-in echo
/// mechanism via the <c>Messages</c> stream and <c>SendAsTextAsync</c>.
///
/// NOTE: These tests take longer to run. Execute individually with:
/// dotnet run -c Release -- --filter *EndToEnd*
/// </summary>
[ShortRunJob]
[MemoryDiagnoser]
[HideColumns("Job", "RatioSD", "Error")]
public class EndToEndBenchmarks
{
private ReactiveWebSocketServer _server = null!;
private ReactiveWebSocketClient _client = null!;
private string _prefix = null!;
private Uri _serverUri = null!;

[GlobalSetup]
public async Task SetupAsync()
{
var port = FreeTcpPort();
_prefix = $"http://localhost:{port}/ws/";
_serverUri = new Uri($"ws://localhost:{port}/ws/");

_server = new ReactiveWebSocketServer(_prefix);

_server.Messages.SubscribeAwait(async (msg, ct) =>
{
if (msg.Message.IsText)
{
await _server.SendAsTextAsync(msg.Metadata.Id, msg.Message.Text ?? "", ct);
}
});

await _server.StartAsync();

_client = new ReactiveWebSocketClient(_serverUri)
{
IsReconnectionEnabled = false
};
var connectionHappenedTask = _client.ConnectionHappened.FirstAsync();
await _client.StartAsync();

await connectionHappenedTask;
}

[GlobalCleanup]
public async Task CleanupAsync()
{
await _client.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
await _server.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
await _client.DisposeAsync();
await _server.DisposeAsync();
}

// -----------------------------------------------------------------------
// 1) Single Round-Trip
// -----------------------------------------------------------------------
[Benchmark(Baseline = true, Description = "Single text round-trip")]
public async Task<string?> SingleRoundTrip()
{
var echoTask = _client.MessageReceived
.Where(m => m is { IsText: true, Text: "ping" })
.FirstAsync();

await _client.SendAsTextAsync("ping");
return (await echoTask).Text;
}

// -----------------------------------------------------------------------
// 2) N sequentielle Round-Trips
// -----------------------------------------------------------------------
[Params(10, 50)] public int RoundTrips { get; set; }

[Benchmark(Description = "Sequential round-trips (N messages)")]
public async Task<int> SequentialRoundTrips()
{
var received = 0;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

using var sub = _client.MessageReceived
.Where(m => m.IsText)
.Subscribe(_ => Interlocked.Increment(ref received));

for (var i = 0; i < RoundTrips; i++)
{
await _client.SendAsTextAsync($"msg-{i}", cts.Token);
}

while (received < RoundTrips && !cts.IsCancellationRequested)
{
await Task.Delay(1, cts.Token).ConfigureAwait(false);
}

return received;
}

// -----------------------------------------------------------------------
// 3) TrySendAsText throughput (fire-and-forget, no echo wait)
// Measures pure Channel.TryWrite speed including the SendLoop
// -----------------------------------------------------------------------
[Benchmark(Description = "TrySendAsText throughput (no echo wait)")]
public int TrySendAsText_Throughput()
{
var sent = 0;
for (var i = 0; i < 1_000; i++)
{
if (_client.TrySendAsText($"msg-{i}"))
{
sent++;
}
}

return sent;
}

// -----------------------------------------------------------------------
// 4) ConnectAsync + StopAsync overhead (connection setup and teardown)
// Creates a fresh client each time without waiting for an echo
// -----------------------------------------------------------------------
[Benchmark(Description = "ConnectAsync + StopAsync (latency)")]
public async Task ConnectAndStop()
{
var client = new ReactiveWebSocketClient(_serverUri)
{
IsReconnectionEnabled = false
};
await client.StartAsync();
await client.StopAsync(WebSocketCloseStatus.NormalClosure, "done");
client.Dispose();
}

private static int FreeTcpPort()
{
var l = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
l.Start();
var port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
}
132 changes: 132 additions & 0 deletions src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
using System;
using BenchmarkDotNet.Attributes;
using R3;

namespace WebSocket.Rx.Benchmarks;

/// <summary>
/// Measures the overhead of the <c>ConnectionHappened</c>,
/// <c>DisconnectionHappened</c> and <c>ErrorOccurred</c> event streams.
///
/// These events are emitted via R3 <c>Subject&lt;T&gt;</c>.
/// We use <c>StreamFakeMessage</c> as a reference and call the internal
/// subjects directly via reflection – exercising exactly the same code
/// paths as the production code does.
/// </summary>
[ShortRunJob]
[MemoryDiagnoser]
[HideColumns("Job", "RatioSD", "Error")]
public class EventStreamBenchmarks
{
[Params(10, 100)]
public int EventCount { get; set; }

private ReactiveWebSocketClient _client = null!;

private Subject<Connected> _connectionSource = null!;
private Subject<Disconnected> _disconnectionSource = null!;
private Subject<ErrorOccurred> _errorSource = null!;

[GlobalSetup]
public void Setup()
{
_client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999"));

var t = typeof(ReactiveWebSocketClient);
_connectionSource = (Subject<Connected>)t.GetField("ConnectionHappenedSource",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
_disconnectionSource = (Subject<Disconnected>)t.GetField("DisconnectionHappenedSource",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
_errorSource = (Subject<ErrorOccurred>)t.GetField("ErrorOccurredSource",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
}

[GlobalCleanup]
public void Cleanup() => _client.Dispose();

// -----------------------------------------------------------------------
// 1) Baseline: ConnectionHappened without subscriber
// -----------------------------------------------------------------------
[Benchmark(Baseline = true, Description = "ConnectionHappened (no subscriber)")]
public void Emit_Connection_NoSubscriber()
{
for (var i = 0; i < EventCount; i++)
{
_connectionSource.OnNext(new Connected(ConnectReason.Initialized));
}
}

// -----------------------------------------------------------------------
// 2) ConnectionHappened with one subscriber
// -----------------------------------------------------------------------
[Benchmark(Description = "ConnectionHappened → 1 subscriber")]
public int Emit_Connection_OneSubscriber()
{
var count = 0;
using var sub = _client.ConnectionHappened.Subscribe(_ => count++);
for (var i = 0; i < EventCount; i++)
{
_connectionSource.OnNext(new Connected(ConnectReason.Initialized));
}
return count;
}

// -----------------------------------------------------------------------
// 3) DisconnectionHappened with one subscriber + Where filter
// -----------------------------------------------------------------------
[Benchmark(Description = "DisconnectionHappened → Where(ServerInitiated)")]
public int Emit_Disconnection_Filtered()
{
var count = 0;
using var sub = _client.DisconnectionHappened
.Where(d => d.Reason == DisconnectReason.ServerInitiated)
.Subscribe(_ => count++);

for (var i = 0; i < EventCount; i++)
{
_disconnectionSource.OnNext(new Disconnected(
i % 2 == 0 ? DisconnectReason.ServerInitiated : DisconnectReason.ClientInitiated));
}
return count;
}

// -----------------------------------------------------------------------
// 4) ErrorOccurred with Select (extract error source)
// -----------------------------------------------------------------------
[Benchmark(Description = "ErrorOccurred → Select(Source)")]
public int Emit_Error_Select()
{
var count = 0;
using var sub = _client.ErrorOccurred
.Select(e => e.Source)
.Subscribe(_ => count++);

for (var i = 0; i < EventCount; i++)
{
_errorSource.OnNext(new ErrorOccurred(ErrorSource.ReceiveLoop,
new InvalidOperationException("test")));
}
return count;
}

// -----------------------------------------------------------------------
// 5) All three streams simultaneously with one subscriber each
// (mirrors typical production code)
// -----------------------------------------------------------------------
[Benchmark(Description = "All 3 streams → 1 subscriber each")]
public int Emit_AllStreams()
{
var count = 0;
using var s1 = _client.ConnectionHappened.Subscribe(_ => count++);
using var s2 = _client.DisconnectionHappened.Subscribe(_ => count++);
using var s3 = _client.ErrorOccurred.Subscribe(_ => count++);

for (var i = 0; i < EventCount; i++)
{
_connectionSource.OnNext(new Connected(ConnectReason.Reconnected));
_disconnectionSource.OnNext(new Disconnected(DisconnectReason.Dropped));
}

return count;
}
}
Loading
Loading