diff --git a/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs
new file mode 100644
index 0000000..14c83d6
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/ClientInstantiationBenchmarks.cs
@@ -0,0 +1,64 @@
+using System;
+using BenchmarkDotNet.Attributes;
+using Microsoft.IO;
+
+namespace WebSocket.Rx.Benchmarks;
+
+///
+/// Measures the overhead of creating ReactiveWebSocketClient instances.
+///
+/// The constructor optionally accepts a RecyclableMemoryStreamManager.
+/// We measure:
+/// - Allocation cost without vs. with a shared MemoryStreamManager
+/// - GC pressure from many short-lived instances
+/// No network connection required.
+///
+[ShortRunJob]
+[MemoryDiagnoser]
+[HideColumns("Job", "RatioSD", "Error")]
+public class ClientInstantiationBenchmarks
+{
+ private static readonly Uri ServerUri = new("ws://localhost:9999");
+
+ // Shared manager – as it should be used in production code
+ private readonly RecyclableMemoryStreamManager _sharedManager = new();
+
+ // -----------------------------------------------------------------------
+ // 1) Baseline: without a custom MemoryStreamManager (new() is created internally)
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "new ReactiveWebSocketClient(url)")]
+ public ReactiveWebSocketClient Instantiate_NoManager()
+ {
+ return new ReactiveWebSocketClient(ServerUri);
+ }
+
+ // -----------------------------------------------------------------------
+ // 2) With a shared RecyclableMemoryStreamManager
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "new ReactiveWebSocketClient(url, sharedManager)")]
+ public ReactiveWebSocketClient Instantiate_SharedManager()
+ {
+ return new ReactiveWebSocketClient(ServerUri, _sharedManager);
+ }
+
+ // -----------------------------------------------------------------------
+ // 3) With a dedicated RecyclableMemoryStreamManager per instance
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "new ReactiveWebSocketClient(url, new Manager())")]
+ public ReactiveWebSocketClient Instantiate_OwnManager()
+ {
+ return new ReactiveWebSocketClient(ServerUri, new RecyclableMemoryStreamManager());
+ }
+
+ // -----------------------------------------------------------------------
+ // 4) GC pressure: 100 short-lived instances (disposed immediately)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "100x create + Dispose")]
+ public void Allocate_AndDispose_100()
+ {
+ for (var i = 0; i < 100; i++)
+ {
+ using var client = new ReactiveWebSocketClient(ServerUri, _sharedManager);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs
new file mode 100644
index 0000000..8ce9fea
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs
@@ -0,0 +1,159 @@
+using System;
+using System.Net;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+using BenchmarkDotNet.Attributes;
+using R3;
+
+namespace WebSocket.Rx.Benchmarks;
+
+///
+/// End-to-end benchmarks using ReactiveWebSocketServer and
+/// ReactiveWebSocketClient over loopback.
+///
+/// Measures:
+/// - Single round-trip latency (Send → Echo → MessageReceived)
+/// - Throughput for N sequential messages
+/// - Connection setup latency (ConnectAsync)
+///
+/// The server uses ReactiveWebSocketServer with a built-in echo
+/// mechanism via the Messages stream and SendAsTextAsync.
+///
+/// NOTE: These tests take longer to run. Execute individually with:
+/// dotnet run -c Release -- --filter *EndToEnd*
+///
+[ShortRunJob]
+[MemoryDiagnoser]
+[HideColumns("Job", "RatioSD", "Error")]
+public class EndToEndBenchmarks
+{
+ private ReactiveWebSocketServer _server = null!;
+ private ReactiveWebSocketClient _client = null!;
+ private string _prefix = null!;
+ private Uri _serverUri = null!;
+
+ [GlobalSetup]
+ public async Task SetupAsync()
+ {
+ var port = FreeTcpPort();
+ _prefix = $"http://localhost:{port}/ws/";
+ _serverUri = new Uri($"ws://localhost:{port}/ws/");
+
+ _server = new ReactiveWebSocketServer(_prefix);
+
+ _server.Messages.SubscribeAwait(async (msg, ct) =>
+ {
+ if (msg.Message.IsText)
+ {
+ await _server.SendAsTextAsync(msg.Metadata.Id, msg.Message.Text ?? "", ct);
+ }
+ });
+
+ await _server.StartAsync();
+
+ _client = new ReactiveWebSocketClient(_serverUri)
+ {
+ IsReconnectionEnabled = false
+ };
+ var connectionHappenedTask = _client.ConnectionHappened.FirstAsync();
+ await _client.StartAsync();
+
+ await connectionHappenedTask;
+ }
+
+ [GlobalCleanup]
+ public async Task CleanupAsync()
+ {
+ await _client.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
+ await _server.StopAsync(WebSocketCloseStatus.NormalClosure, "Benchmark done");
+ await _client.DisposeAsync();
+ await _server.DisposeAsync();
+ }
+
+ // -----------------------------------------------------------------------
+ // 1) Single Round-Trip
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "Single text round-trip")]
+ public async Task SingleRoundTrip()
+ {
+ var echoTask = _client.MessageReceived
+ .Where(m => m is { IsText: true, Text: "ping" })
+ .FirstAsync();
+
+ await _client.SendAsTextAsync("ping");
+ return (await echoTask).Text;
+ }
+
+ // -----------------------------------------------------------------------
+ // 2) N sequentielle Round-Trips
+ // -----------------------------------------------------------------------
+ [Params(10, 50)] public int RoundTrips { get; set; }
+
+ [Benchmark(Description = "Sequential round-trips (N messages)")]
+ public async Task SequentialRoundTrips()
+ {
+ var received = 0;
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+
+ using var sub = _client.MessageReceived
+ .Where(m => m.IsText)
+ .Subscribe(_ => Interlocked.Increment(ref received));
+
+ for (var i = 0; i < RoundTrips; i++)
+ {
+ await _client.SendAsTextAsync($"msg-{i}", cts.Token);
+ }
+
+ while (received < RoundTrips && !cts.IsCancellationRequested)
+ {
+ await Task.Delay(1, cts.Token).ConfigureAwait(false);
+ }
+
+ return received;
+ }
+
+ // -----------------------------------------------------------------------
+ // 3) TrySendAsText throughput (fire-and-forget, no echo wait)
+ // Measures pure Channel.TryWrite speed including the SendLoop
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "TrySendAsText throughput (no echo wait)")]
+ public int TrySendAsText_Throughput()
+ {
+ var sent = 0;
+ for (var i = 0; i < 1_000; i++)
+ {
+ if (_client.TrySendAsText($"msg-{i}"))
+ {
+ sent++;
+ }
+ }
+
+ return sent;
+ }
+
+ // -----------------------------------------------------------------------
+ // 4) ConnectAsync + StopAsync overhead (connection setup and teardown)
+ // Creates a fresh client each time without waiting for an echo
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "ConnectAsync + StopAsync (latency)")]
+ public async Task ConnectAndStop()
+ {
+ var client = new ReactiveWebSocketClient(_serverUri)
+ {
+ IsReconnectionEnabled = false
+ };
+ await client.StartAsync();
+ await client.StopAsync(WebSocketCloseStatus.NormalClosure, "done");
+ client.Dispose();
+ }
+
+ private static int FreeTcpPort()
+ {
+ var l = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
+ l.Start();
+ var port = ((IPEndPoint)l.LocalEndpoint).Port;
+ l.Stop();
+ return port;
+ }
+}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs
new file mode 100644
index 0000000..e6ad8d0
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/EventStreamBenchmarks.cs
@@ -0,0 +1,132 @@
+using System;
+using BenchmarkDotNet.Attributes;
+using R3;
+
+namespace WebSocket.Rx.Benchmarks;
+
+///
+/// Measures the overhead of the ConnectionHappened,
+/// DisconnectionHappened and ErrorOccurred event streams.
+///
+/// These events are emitted via R3 Subject<T>.
+/// We use StreamFakeMessage as a reference and call the internal
+/// subjects directly via reflection – exercising exactly the same code
+/// paths as the production code does.
+///
+[ShortRunJob]
+[MemoryDiagnoser]
+[HideColumns("Job", "RatioSD", "Error")]
+public class EventStreamBenchmarks
+{
+ [Params(10, 100)]
+ public int EventCount { get; set; }
+
+ private ReactiveWebSocketClient _client = null!;
+
+ private Subject _connectionSource = null!;
+ private Subject _disconnectionSource = null!;
+ private Subject _errorSource = null!;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ _client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999"));
+
+ var t = typeof(ReactiveWebSocketClient);
+ _connectionSource = (Subject)t.GetField("ConnectionHappenedSource",
+ System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
+ _disconnectionSource = (Subject)t.GetField("DisconnectionHappenedSource",
+ System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
+ _errorSource = (Subject)t.GetField("ErrorOccurredSource",
+ System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!.GetValue(_client)!;
+ }
+
+ [GlobalCleanup]
+ public void Cleanup() => _client.Dispose();
+
+ // -----------------------------------------------------------------------
+ // 1) Baseline: ConnectionHappened without subscriber
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "ConnectionHappened (no subscriber)")]
+ public void Emit_Connection_NoSubscriber()
+ {
+ for (var i = 0; i < EventCount; i++)
+ {
+ _connectionSource.OnNext(new Connected(ConnectReason.Initialized));
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // 2) ConnectionHappened with one subscriber
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "ConnectionHappened → 1 subscriber")]
+ public int Emit_Connection_OneSubscriber()
+ {
+ var count = 0;
+ using var sub = _client.ConnectionHappened.Subscribe(_ => count++);
+ for (var i = 0; i < EventCount; i++)
+ {
+ _connectionSource.OnNext(new Connected(ConnectReason.Initialized));
+ }
+ return count;
+ }
+
+ // -----------------------------------------------------------------------
+ // 3) DisconnectionHappened with one subscriber + Where filter
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "DisconnectionHappened → Where(ServerInitiated)")]
+ public int Emit_Disconnection_Filtered()
+ {
+ var count = 0;
+ using var sub = _client.DisconnectionHappened
+ .Where(d => d.Reason == DisconnectReason.ServerInitiated)
+ .Subscribe(_ => count++);
+
+ for (var i = 0; i < EventCount; i++)
+ {
+ _disconnectionSource.OnNext(new Disconnected(
+ i % 2 == 0 ? DisconnectReason.ServerInitiated : DisconnectReason.ClientInitiated));
+ }
+ return count;
+ }
+
+ // -----------------------------------------------------------------------
+ // 4) ErrorOccurred with Select (extract error source)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "ErrorOccurred → Select(Source)")]
+ public int Emit_Error_Select()
+ {
+ var count = 0;
+ using var sub = _client.ErrorOccurred
+ .Select(e => e.Source)
+ .Subscribe(_ => count++);
+
+ for (var i = 0; i < EventCount; i++)
+ {
+ _errorSource.OnNext(new ErrorOccurred(ErrorSource.ReceiveLoop,
+ new InvalidOperationException("test")));
+ }
+ return count;
+ }
+
+ // -----------------------------------------------------------------------
+ // 5) All three streams simultaneously with one subscriber each
+ // (mirrors typical production code)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "All 3 streams → 1 subscriber each")]
+ public int Emit_AllStreams()
+ {
+ var count = 0;
+ using var s1 = _client.ConnectionHappened.Subscribe(_ => count++);
+ using var s2 = _client.DisconnectionHappened.Subscribe(_ => count++);
+ using var s3 = _client.ErrorOccurred.Subscribe(_ => count++);
+
+ for (var i = 0; i < EventCount; i++)
+ {
+ _connectionSource.OnNext(new Connected(ConnectReason.Reconnected));
+ _disconnectionSource.OnNext(new Disconnected(DisconnectReason.Dropped));
+ }
+
+ return count;
+ }
+}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs
new file mode 100644
index 0000000..48f984a
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs
@@ -0,0 +1,146 @@
+using System;
+using System.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+using R3;
+
+namespace WebSocket.Rx.Benchmarks;
+
+///
+/// Measures the overhead of the incoming message pipeline.
+///
+/// StreamFakeMessage calls MessageReceivedSource.OnNext directly –
+/// bypassing the network so we measure only:
+/// - R3 Subject overhead (not System.Reactive!)
+/// - ReceivedMessage allocation (Text vs. Binary)
+/// - Rx operator overhead per subscriber (Where, Select, …)
+///
+[ShortRunJob]
+[MemoryDiagnoser]
+[HideColumns("Job", "RatioSD", "Error")]
+public class MessagePipelineBenchmarks
+{
+ [Params(100, 1_000)] public int MessageCount { get; set; }
+
+ [Params(64, 512, 4_096)] public int PayloadSize { get; set; }
+
+ private ReactiveWebSocketClient _client = null!;
+ private ReceivedMessage _textMessage = null!;
+ private ReceivedMessage _binaryMessage = null!;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ _client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999"));
+ _textMessage = ReceivedMessage.TextMessage(new string('x', PayloadSize));
+ _binaryMessage = ReceivedMessage.BinaryMessage(new byte[PayloadSize]);
+ }
+
+ [GlobalCleanup]
+ public void Cleanup() => _client.Dispose();
+
+ // -----------------------------------------------------------------------
+ // 1) Baseline: OnNext without subscriber
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "StreamFakeMessage (no subscriber)")]
+ public void StreamFakeMessage_NoSubscriber()
+ {
+ for (var i = 0; i < MessageCount; i++)
+ {
+ _client.StreamFakeMessage(_textMessage);
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // 2) One Subscribe, no filter
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "→ 1 subscriber (raw)")]
+ public int OneSubscriber_Raw()
+ {
+ var count = 0;
+ using var sub = _client.MessageReceived.Subscribe(_ => count++);
+ for (var i = 0; i < MessageCount; i++)
+ {
+ _client.StreamFakeMessage(_textMessage);
+ }
+
+ return count;
+ }
+
+ // -----------------------------------------------------------------------
+ // 3) Where filter (IsText)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "→ Where(IsText)")]
+ public int OneSubscriber_WhereText()
+ {
+ var count = 0;
+ using var sub = _client.MessageReceived
+ .Where(m => m.IsText)
+ .Subscribe(_ => count++);
+ for (var i = 0; i < MessageCount; i++)
+ {
+ _client.StreamFakeMessage(_textMessage);
+ }
+
+ return count;
+ }
+
+ // -----------------------------------------------------------------------
+ // 4) Where + Select (extract text string)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "→ Where + Select(Text)")]
+ public int OneSubscriber_WhereAndSelect()
+ {
+ var count = 0;
+ using var sub = _client.MessageReceived
+ .Where(m => m.IsText)
+ .Select(m => m.Text)
+ .Subscribe(_ => count++);
+ for (var i = 0; i < MessageCount; i++)
+ {
+ _client.StreamFakeMessage(_textMessage);
+ }
+
+ return count;
+ }
+
+ // -----------------------------------------------------------------------
+ // 5) Binary messages (no encoding overhead in the ReceiveLoop)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "→ Binary messages")]
+ public int OneSubscriber_Binary()
+ {
+ var count = 0;
+ using var sub = _client.MessageReceived.Subscribe(_ => count++);
+ for (var i = 0; i < MessageCount; i++)
+ {
+ _client.StreamFakeMessage(_binaryMessage);
+ }
+
+ return count;
+ }
+
+ // -----------------------------------------------------------------------
+ // 6) 5 concurrent subscribers (multicast overhead in R3 Subject)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "→ 5 concurrent subscribers")]
+ public int FiveSubscribers()
+ {
+ var count = 0;
+ var subs = Enumerable.Range(0, 5)
+ .Select(_ => _client.MessageReceived.Subscribe(_ => Interlocked.Increment(ref count)))
+ .ToList();
+
+ for (var i = 0; i < MessageCount; i++)
+ {
+ _client.StreamFakeMessage(_textMessage);
+ }
+
+ foreach (var s in subs)
+ {
+ s.Dispose();
+ }
+
+ return count;
+ }
+}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/Program.cs b/src/WebSocket.Rx.Benchmarks/Program.cs
new file mode 100644
index 0000000..4893ce3
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/Program.cs
@@ -0,0 +1,14 @@
+using BenchmarkDotNet.Configs;
+using BenchmarkDotNet.Jobs;
+using BenchmarkDotNet.Running;
+
+var config = DefaultConfig.Instance
+ .AddJob(Job.ShortRun
+ .WithWarmupCount(1)
+ .WithIterationCount(3)
+ .WithInvocationCount(1)
+ .WithUnrollFactor(1));
+
+BenchmarkSwitcher
+ .FromAssembly(typeof(Program).Assembly)
+ .Run(args, config);
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs
new file mode 100644
index 0000000..b70e39a
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs
@@ -0,0 +1,160 @@
+using System;
+using System.Threading.Tasks;
+using BenchmarkDotNet.Attributes;
+
+namespace WebSocket.Rx.Benchmarks;
+
+///
+/// Measures the overhead of the send methods on ReactiveWebSocketClient.
+///
+/// There are four variants:
+/// - TrySendAsBinary / TrySendAsText → Channel.TryWrite (synchronous, non-blocking)
+/// - SendAsBinaryAsync / SendAsTextAsync → Channel.WriteAsync (async, buffered)
+/// - SendInstantAsync → bypasses the queue, goes directly to NativeClient.SendAsync
+///
+/// Since no real WebSocket server is running, we test the queue methods
+/// with IsRunning = true and a Channel without a consumer,
+/// which isolates the pure enqueue latency.
+///
+/// NOTE: The Channel is UnboundedChannel (SingleReader) –
+/// without a consumer it fills up, but writes never block.
+///
+[ShortRunJob]
+[MemoryDiagnoser]
+[HideColumns("Job", "RatioSD", "Error")]
+public class SendChannelBenchmarks
+{
+ [Params(100, 1_000)]
+ public int MessageCount { get; set; }
+
+ [Params(64, 1_024)]
+ public int PayloadSize { get; set; }
+
+ private ReactiveWebSocketClient _client = null!;
+ private string _textPayload = null!;
+ private byte[] _binaryPayload = null!;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ _client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999"));
+
+ // IsRunning must be true so TrySend/SendAsync does not immediately return false.
+ // We set the internal property via reflection (no public setter available).
+ typeof(ReactiveWebSocketClient)
+ .GetProperty(nameof(ReactiveWebSocketClient.IsRunning))!
+ .SetValue(_client, true);
+
+ _textPayload = new string('a', PayloadSize);
+ _binaryPayload = new byte[PayloadSize];
+ }
+
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ // Reset IsRunning so Dispose completes cleanly
+ typeof(ReactiveWebSocketClient)
+ .GetProperty(nameof(ReactiveWebSocketClient.IsRunning))!
+ .SetValue(_client, false);
+ _client.Dispose();
+ }
+
+ // -----------------------------------------------------------------------
+ // 1) Baseline: TrySendAsBinary(string) – synchronous Channel.TryWrite
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "TrySendAsBinary(string)")]
+ public bool TrySendAsBinary_String()
+ {
+ var result = false;
+ for (var i = 0; i < MessageCount; i++)
+ {
+ result = _client.TrySendAsBinary(_textPayload);
+ }
+ return result;
+ }
+
+ // -----------------------------------------------------------------------
+ // 2) TrySendAsBinary(byte[])
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "TrySendAsBinary(byte[])")]
+ public bool TrySendAsBinary_Bytes()
+ {
+ var result = false;
+ for (var i = 0; i < MessageCount; i++)
+ {
+ result = _client.TrySendAsBinary(_binaryPayload);
+ }
+ return result;
+ }
+
+ // -----------------------------------------------------------------------
+ // 3) TrySendAsText(string)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "TrySendAsText(string)")]
+ public bool TrySendAsText_String()
+ {
+ var result = false;
+ for (var i = 0; i < MessageCount; i++)
+ {
+ result = _client.TrySendAsText(_textPayload);
+ }
+ return result;
+ }
+
+ // -----------------------------------------------------------------------
+ // 4) TrySendAsText(byte[])
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "TrySendAsText(byte[])")]
+ public bool TrySendAsText_Bytes()
+ {
+ var result = false;
+ for (var i = 0; i < MessageCount; i++)
+ {
+ result = _client.TrySendAsText(_binaryPayload);
+ }
+ return result;
+ }
+
+ // -----------------------------------------------------------------------
+ // 5) SendAsBinaryAsync(string) – Channel.WriteAsync (ValueTask)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "SendAsBinaryAsync(string)")]
+ public async Task SendAsBinaryAsync_String()
+ {
+ var result = false;
+ for (var i = 0; i < MessageCount; i++)
+ {
+ result = await _client.SendAsBinaryAsync(_textPayload);
+ }
+ return result;
+ }
+
+ // -----------------------------------------------------------------------
+ // 6) SendAsTextAsync(string) – Channel.WriteAsync (ValueTask)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "SendAsTextAsync(string)")]
+ public async Task SendAsTextAsync_String()
+ {
+ var result = false;
+ for (var i = 0; i < MessageCount; i++)
+ {
+ result = await _client.SendAsTextAsync(_textPayload);
+ }
+ return result;
+ }
+
+ // -----------------------------------------------------------------------
+ // 7) String → byte[] encoding overhead (Encoding.UTF8.GetBytes)
+ // – isolates the encoding cost hidden inside TrySend/SendAsync
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "Encoding.UTF8.GetBytes (overhead only)")]
+ public byte[] EncodingOverhead()
+ {
+ byte[] result = [];
+ for (var i = 0; i < MessageCount; i++)
+ {
+ result = System.Text.Encoding.UTF8.GetBytes(_textPayload);
+ }
+ return result;
+ }
+}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs
new file mode 100644
index 0000000..12cb111
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs
@@ -0,0 +1,103 @@
+using System.Net;
+using System.Net.WebSockets;
+using System.Threading.Tasks;
+using BenchmarkDotNet.Attributes;
+
+namespace WebSocket.Rx.Benchmarks;
+
+///
+/// Measures the overhead of the server broadcast methods on
+/// ReactiveWebSocketServer without real network connections.
+///
+/// Since the broadcast methods internally iterate the client list and
+/// use Task.WhenAll + LINQ, the overhead can already be meaningfully
+/// measured by calling them directly on a started server with no connected clients.
+///
+/// For a full test with N connected clients see EndToEndBenchmarks.
+///
+[ShortRunJob]
+[MemoryDiagnoser]
+[HideColumns("Job", "RatioSD", "Error")]
+public class ServerBroadcastBenchmarks
+{
+ [Params(64, 1_024, 4_096)] public int PayloadSize { get; set; }
+
+ private ReactiveWebSocketServer _server = null!;
+ private string _textPayload = null!;
+ private byte[] _binaryPayload = null!;
+
+ [GlobalSetup]
+ public async Task SetupAsync()
+ {
+ var port = FreeTcpPort();
+ _server = new ReactiveWebSocketServer($"http://localhost:{port}/ws/");
+ await _server.StartAsync();
+
+ _textPayload = new string('x', PayloadSize);
+ _binaryPayload = new byte[PayloadSize];
+ }
+
+ [GlobalCleanup]
+ public async Task CleanupAsync()
+ {
+ await _server.StopAsync(WebSocketCloseStatus.NormalClosure, "done");
+ await _server.DisposeAsync();
+ }
+
+ // -----------------------------------------------------------------------
+ // 1) Baseline: BroadcastAsTextAsync – 0 clients (method overhead only)
+ // -----------------------------------------------------------------------
+ [Benchmark(Baseline = true, Description = "BroadcastAsTextAsync (0 clients)")]
+ public async Task BroadcastAsTextAsync_Empty()
+ {
+ return await _server.BroadcastAsTextAsync(_textPayload);
+ }
+
+ // -----------------------------------------------------------------------
+ // 2) BroadcastAsBinaryAsync with byte[]
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "BroadcastAsBinaryAsync(byte[]) (0 clients)")]
+ public async Task BroadcastAsBinaryAsync_Bytes_Empty()
+ {
+ return await _server.BroadcastAsBinaryAsync(_binaryPayload);
+ }
+
+ // -----------------------------------------------------------------------
+ // 3) TryBroadcastAsText – synchronous, no await
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "TryBroadcastAsText(string) (0 clients)")]
+ public bool TryBroadcastAsText_Empty()
+ {
+ return _server.TryBroadcastAsText(_textPayload);
+ }
+
+ // -----------------------------------------------------------------------
+ // 4) TryBroadcastAsBinary with byte[]
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "TryBroadcastAsBinary(byte[]) (0 clients)")]
+ public bool TryBroadcastAsBinary_Empty()
+ {
+ return _server.TryBroadcastAsBinary(_binaryPayload);
+ }
+
+ // -----------------------------------------------------------------------
+ // 5) ClientCount query + ConnectedClients dictionary snapshot
+ // (ConcurrentDictionary.ToDictionary() – called on every broadcast)
+ // -----------------------------------------------------------------------
+ [Benchmark(Description = "ClientCount + ConnectedClients snapshot")]
+ public int ClientSnapshot()
+ {
+ var count = _server.ClientCount;
+ var snapshot = _server.ConnectedClients;
+ return count + snapshot.Count;
+ }
+
+ private static int FreeTcpPort()
+ {
+ var l = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
+ l.Start();
+ var port = ((IPEndPoint)l.LocalEndpoint).Port;
+ l.Stop();
+ return port;
+ }
+}
\ No newline at end of file
diff --git a/src/WebSocket.Rx.Benchmarks/WebSocket.Rx.Benchmarks.csproj b/src/WebSocket.Rx.Benchmarks/WebSocket.Rx.Benchmarks.csproj
new file mode 100644
index 0000000..4e2d883
--- /dev/null
+++ b/src/WebSocket.Rx.Benchmarks/WebSocket.Rx.Benchmarks.csproj
@@ -0,0 +1,19 @@
+
+
+
+ Exe
+ net10.0
+ enable
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/WebSocket.Rx.sln b/src/WebSocket.Rx.sln
index c6451bd..b375a71 100644
--- a/src/WebSocket.Rx.sln
+++ b/src/WebSocket.Rx.sln
@@ -18,6 +18,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WebSocket.Rx.UnitTests", "W
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WebSocket.Rx.IntegrationTests", "WebSocket.Rx.IntegrationTests\WebSocket.Rx.IntegrationTests.csproj", "{EB7FC348-3836-483C-B365-7B8111DEE533}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WebSocket.Rx.Benchmarks", "WebSocket.Rx.Benchmarks\WebSocket.Rx.Benchmarks.csproj", "{3D454EFD-013B-4297-95CD-7C9905A4C16E}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -64,6 +66,18 @@ Global
{EB7FC348-3836-483C-B365-7B8111DEE533}.Release|x64.Build.0 = Release|Any CPU
{EB7FC348-3836-483C-B365-7B8111DEE533}.Release|x86.ActiveCfg = Release|Any CPU
{EB7FC348-3836-483C-B365-7B8111DEE533}.Release|x86.Build.0 = Release|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Debug|x64.Build.0 = Debug|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Debug|x86.Build.0 = Debug|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Release|x64.ActiveCfg = Release|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Release|x64.Build.0 = Release|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Release|x86.ActiveCfg = Release|Any CPU
+ {3D454EFD-013B-4297-95CD-7C9905A4C16E}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/WebSocket.Rx/ReceivedMessage.cs b/src/WebSocket.Rx/ReceivedMessage.cs
index 44d8a8e..fc06fce 100644
--- a/src/WebSocket.Rx/ReceivedMessage.cs
+++ b/src/WebSocket.Rx/ReceivedMessage.cs
@@ -11,6 +11,10 @@ private ReceivedMessage(byte[]? binary, string? text, WebSocketMessageType messa
MessageType = messageType;
}
+ public bool IsBinary => Binary?.Length > 0 && MessageType is WebSocketMessageType.Binary;
+
+ public bool IsText => Text?.Length > 0 && MessageType is WebSocketMessageType.Text;
+
public string? Text { get; }
public byte[]? Binary { get; }