Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task SetupAsync()
{
if (msg.Message.IsText)
{
await _server.SendAsTextAsync(msg.Metadata.Id, msg.Message.Text ?? "", ct);
await _server.SendAsync(msg.Metadata.Id, msg.Message.Text, WebSocketMessageType.Text, ct);
}
});

Expand Down Expand Up @@ -78,11 +78,12 @@ public async Task CleanupAsync()
public async Task<string?> SingleRoundTrip()
{
var echoTask = _client.MessageReceived
.Where(m => m is { IsText: true, Text: "ping" })
.Where(m => m is { IsText: true })
.Where(m => m.Text.ToString() is "ping")
.FirstAsync();

await _client.SendAsTextAsync("ping");
return (await echoTask).Text;
await _client.SendAsync("ping".AsMemory(), WebSocketMessageType.Text);
return (await echoTask).Text.ToString();
}

// -----------------------------------------------------------------------
Expand All @@ -102,7 +103,7 @@ public async Task<int> SequentialRoundTrips()

for (var i = 0; i < RoundTrips; i++)
{
await _client.SendAsTextAsync($"msg-{i}", cts.Token);
await _client.SendAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token);
}

while (received < RoundTrips && !cts.IsCancellationRequested)
Expand All @@ -123,7 +124,7 @@ public int TrySendAsText_Throughput()
var sent = 0;
for (var i = 0; i < 1_000; i++)
{
if (_client.TrySendAsText($"msg-{i}"))
if (_client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text))
{
sent++;
}
Expand Down
8 changes: 4 additions & 4 deletions src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ public class MessagePipelineBenchmarks
[Params(64, 512, 4_096)] public int PayloadSize { get; set; }

private ReactiveWebSocketClient _client = null!;
private ReceivedMessage _textMessage = null!;
private ReceivedMessage _binaryMessage = null!;
private Message _textMessage = null!;
private Message _binaryMessage = null!;

[GlobalSetup]
public void Setup()
{
_client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999"));
_textMessage = ReceivedMessage.TextMessage(new string('x', PayloadSize));
_binaryMessage = ReceivedMessage.BinaryMessage(new byte[PayloadSize]);
_textMessage = Message.Create(new string('x', PayloadSize));
_binaryMessage = Message.Create(new byte[PayloadSize]);
}

[GlobalCleanup]
Expand Down
20 changes: 14 additions & 6 deletions src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;

Expand Down Expand Up @@ -68,8 +69,9 @@ public bool TrySendAsBinary_String()
var result = false;
for (var i = 0; i < MessageCount; i++)
{
result = _client.TrySendAsBinary(_textPayload);
result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Binary);
}

return result;
}

Expand All @@ -82,8 +84,9 @@ public bool TrySendAsBinary_Bytes()
var result = false;
for (var i = 0; i < MessageCount; i++)
{
result = _client.TrySendAsBinary(_binaryPayload);
result = _client.TrySend(_binaryPayload, WebSocketMessageType.Binary);
}

return result;
}

Expand All @@ -96,8 +99,9 @@ public bool TrySendAsText_String()
var result = false;
for (var i = 0; i < MessageCount; i++)
{
result = _client.TrySendAsText(_textPayload);
result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Text);
}

return result;
}

Expand All @@ -110,8 +114,9 @@ public bool TrySendAsText_Bytes()
var result = false;
for (var i = 0; i < MessageCount; i++)
{
result = _client.TrySendAsText(_binaryPayload);
result = _client.TrySend(_binaryPayload, WebSocketMessageType.Text);
}

return result;
}

Expand All @@ -124,8 +129,9 @@ public async Task<bool> SendAsBinaryAsync_String()
var result = false;
for (var i = 0; i < MessageCount; i++)
{
result = await _client.SendAsBinaryAsync(_textPayload);
result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Binary);
}

return result;
}

Expand All @@ -138,8 +144,9 @@ public async Task<bool> SendAsTextAsync_String()
var result = false;
for (var i = 0; i < MessageCount; i++)
{
result = await _client.SendAsTextAsync(_textPayload);
result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Text);
}

return result;
}

Expand All @@ -155,6 +162,7 @@ public byte[] EncodingOverhead()
{
result = System.Text.Encoding.UTF8.GetBytes(_textPayload);
}

return result;
}
}
11 changes: 6 additions & 5 deletions src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net;
using System;
using System.Net;
using System.Net.WebSockets;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
Expand Down Expand Up @@ -50,7 +51,7 @@ public async Task CleanupAsync()
[Benchmark(Baseline = true, Description = "BroadcastAsTextAsync (0 clients)")]
public async Task<bool> BroadcastAsTextAsync_Empty()
{
return await _server.BroadcastAsTextAsync(_textPayload);
return await _server.BroadcastAsync(_textPayload.AsMemory(), WebSocketMessageType.Text);
}

// -----------------------------------------------------------------------
Expand All @@ -59,7 +60,7 @@ public async Task<bool> BroadcastAsTextAsync_Empty()
[Benchmark(Description = "BroadcastAsBinaryAsync(byte[]) (0 clients)")]
public async Task<bool> BroadcastAsBinaryAsync_Bytes_Empty()
{
return await _server.BroadcastAsBinaryAsync(_binaryPayload);
return await _server.BroadcastAsync(_binaryPayload, WebSocketMessageType.Binary);
}

// -----------------------------------------------------------------------
Expand All @@ -68,7 +69,7 @@ public async Task<bool> BroadcastAsBinaryAsync_Bytes_Empty()
[Benchmark(Description = "TryBroadcastAsText(string) (0 clients)")]
public bool TryBroadcastAsText_Empty()
{
return _server.TryBroadcastAsText(_textPayload);
return _server.TryBroadcast(_textPayload.AsMemory(), WebSocketMessageType.Binary);
}

// -----------------------------------------------------------------------
Expand All @@ -77,7 +78,7 @@ public bool TryBroadcastAsText_Empty()
[Benchmark(Description = "TryBroadcastAsBinary(byte[]) (0 clients)")]
public bool TryBroadcastAsBinary_Empty()
{
return _server.TryBroadcastAsBinary(_binaryPayload);
return _server.TryBroadcast(_binaryPayload, WebSocketMessageType.Binary);
}

// -----------------------------------------------------------------------
Expand Down
7 changes: 4 additions & 3 deletions src/WebSocket.Rx.IntegrationTests/Integration.DisposeTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using WebSocket.Rx.IntegrationTests.Internal;
using System.Net.WebSockets;
using WebSocket.Rx.IntegrationTests.Internal;

namespace WebSocket.Rx.IntegrationTests;

Expand Down Expand Up @@ -71,7 +72,7 @@ public async Task Integration_DisposeUnderLoad_ShouldHandleGracefully()
{
try
{
await client.SendAsTextAsync($"Message {i}");
await client.SendAsync($"Message {i}".AsMemory(), WebSocketMessageType.Text);
await Task.Delay(10, TestContext.Current.CancellationToken);
}
catch (ObjectDisposedException)
Expand Down Expand Up @@ -111,4 +112,4 @@ public async Task Integration_ConcurrentDispose_ShouldBeThreadSafe()
// Assert
Assert.True(Server.IsDisposed);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text;
using System.Net.WebSockets;
using System.Text;
using R3;
using WebSocket.Rx.IntegrationTests.Internal;

Expand Down Expand Up @@ -26,7 +27,7 @@ public async Task MessageEncoding_CustomEncoding_ShouldUseCustomEncoding()
receivedMessage = Encoding.ASCII.GetString(msg);
return receivedMessage == "ASCII Text";
});
Client.TrySendAsBinary("ASCII Text");
Client.TrySend("ASCII Text".AsMemory(), WebSocketMessageType.Binary);
await receiveTask;

// Assert
Expand All @@ -41,9 +42,9 @@ public async Task MessageReceived_WithTextConversion_ShouldConvertToText()
Client.IsTextMessageConversionEnabled = true;

string? receivedText = null;
Client.MessageReceived.Subscribe(msg => receivedText = msg.Text);
Client.MessageReceived.Subscribe(msg => receivedText = msg.Text.ToString());

var messageTask = WaitForEventAsync(Client.MessageReceived, msg => msg.Text == "Converted Text");
var messageTask = WaitForEventAsync(Client.MessageReceived, msg => msg.Text.ToString() == "Converted Text");

await Client.StartOrFailAsync(TestContext.Current.CancellationToken);

Expand All @@ -52,7 +53,7 @@ public async Task MessageReceived_WithTextConversion_ShouldConvertToText()
var received = await messageTask;

// Assert
Assert.Equal("Converted Text", received.Text);
Assert.Equal("Converted Text", received.Text.ToString());
}

[Fact(Timeout = DefaultTimeoutMs)]
Expand All @@ -62,12 +63,12 @@ public async Task MessageReceived_WithoutTextConversion_ShouldNotConvertToText()
Client = new ReactiveWebSocketClient(new Uri(Server.WebSocketUrl));
Client.IsTextMessageConversionEnabled = false;

string? receivedText = null;
char[]? receivedText = null;
byte[]? receivedBinary = null;
Client.MessageReceived.Subscribe(msg =>
{
receivedText = msg.Text;
receivedBinary = msg.Binary;
receivedText = msg.Text.ToArray();
receivedBinary = msg.Binary.ToArray();
});
var connectTask = WaitForEventAsync(Client.ConnectionHappened);
await Client.StartOrFailAsync(TestContext.Current.CancellationToken);
Expand All @@ -79,7 +80,9 @@ public async Task MessageReceived_WithoutTextConversion_ShouldNotConvertToText()
await receiveTask;

// Assert
Assert.Null(receivedText);
Assert.NotNull(receivedText);
Assert.Empty(receivedText);
Assert.NotNull(receivedBinary);
Assert.NotEmpty(receivedBinary);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using R3;
using System.Net.WebSockets;
using R3;
using WebSocket.Rx.IntegrationTests.Internal;

namespace WebSocket.Rx.IntegrationTests;
Expand All @@ -16,7 +17,7 @@ public async Task Send_EmptyByteArray_ShouldReturnFalse()
await Client.StartOrFailAsync(TestContext.Current.CancellationToken);

// Act
var result = Client.TrySendAsBinary([]);
var result = Client.TrySend(new ReadOnlyMemory<byte>([]), WebSocketMessageType.Binary);

// Assert
Assert.False(result);
Expand All @@ -30,7 +31,7 @@ public async Task SendAsText_EmptyByteArray_ShouldReturnFalse()
await Client.StartOrFailAsync(TestContext.Current.CancellationToken);

// Act
var result = Client.TrySendAsText([]);
var result = Client.TrySend(new ReadOnlyMemory<char>([]), WebSocketMessageType.Text);

// Assert
Assert.False(result);
Expand All @@ -43,8 +44,11 @@ public async Task SendInstant_WhenNotConnected_ShouldNotThrow()
Client = new ReactiveWebSocketClient(new Uri(InvalidUrl));

// Act & Assert
await Client.SendInstantAsync("test", TestContext.Current.CancellationToken);
await Client.SendInstantAsync([1, 2, 3], TestContext.Current.CancellationToken);
await Client.SendInstantAsync("test".AsMemory(), WebSocketMessageType.Binary,
TestContext.Current.CancellationToken);
await Client.SendInstantAsync(new byte[] { 1, 2, 3 }, WebSocketMessageType.Binary,
TestContext.Current.CancellationToken);

Assert.True(true);
}

Expand Down Expand Up @@ -97,7 +101,7 @@ public void Send_NullString_ShouldReturnFalse()
Client = new ReactiveWebSocketClient(new Uri(InvalidUrl));

// Act
var result = Client.TrySendAsText((string)null!);
var result = Client.TrySend((ReadOnlyMemory<char>)null!, WebSocketMessageType.Text);

// Assert
Assert.False(result);
Expand All @@ -111,7 +115,8 @@ public async Task SendInstant_NullString_ShouldNotThrow()
await Client.StartOrFailAsync(TestContext.Current.CancellationToken);

// Act & Assert
await Client.SendInstantAsync((string)null!, TestContext.Current.CancellationToken);
await Client.SendInstantAsync((ReadOnlyMemory<char>)null!, WebSocketMessageType.Binary,
TestContext.Current.CancellationToken);
Assert.True(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public async Task KeepAlive_WithZeroInterval_ShouldDisableKeepAlive()
public async Task KeepAlive_ConnectionShouldStayAliveWithinInterval()
{
// Arrange
var receivedMessages = new List<ReceivedMessage>();
var receivedMessages = new List<Message>();
Client = new ReactiveWebSocketClient(new Uri(Server.WebSocketUrl))
{
KeepAliveInterval = TimeSpan.FromMilliseconds(500),
Expand All @@ -158,15 +158,15 @@ public async Task KeepAlive_ConnectionShouldStayAliveWithinInterval()

// Act
await Client.StartOrFailAsync(TestContext.Current.CancellationToken);
var messageTask = WaitForEventAsync(Client.MessageReceived, m => m.Text == "test");
var messageTask = WaitForEventAsync(Client.MessageReceived, m => m.Text.ToString() == "test");
await Server.SendToAllAsync("test");
await messageTask;

// Assert
Assert.True(Client.IsRunning);
Assert.Equal(WebSocketState.Open, Client.NativeClient.State);
Assert.Single(receivedMessages);
Assert.Equal("test", receivedMessages[0].Text);
Assert.Equal("test", receivedMessages[0].Text.ToString());
}

[Fact(Timeout = DefaultTimeoutMs)]
Expand Down Expand Up @@ -266,7 +266,8 @@ public async Task KeepAlive_DuringMessageExchange_ShouldNotInterfere()
// Act
for (var i = 0; i < 5; i++)
{
await Client.SendInstantAsync($"Message {i}", TestContext.Current.CancellationToken);
await Client.SendInstantAsync($"Message {i}".AsMemory(), WebSocketMessageType.Binary,
TestContext.Current.CancellationToken);
await Task.Delay(50, TestContext.Current.CancellationToken);
}

Expand Down Expand Up @@ -485,7 +486,8 @@ public async Task KeepAlive_ServerRespondsToClientMessages_ShouldResetKeepAliveT

for (var i = 0; i < 5; i++)
{
await Client.SendInstantAsync($"msg{i}", TestContext.Current.CancellationToken);
await Client.SendInstantAsync($"msg{i}".AsMemory(), WebSocketMessageType.Binary,
TestContext.Current.CancellationToken);
await Task.Delay(100, TestContext.Current.CancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public async Task AfterDispose_OperationsShouldThrowOrReturnFalse()
await exceptionSource.Task.WaitAsync(TimeSpan.FromSeconds(1), TestContext.Current.CancellationToken);
Assert.IsType<ObjectDisposedException>(taskResult.Exception);

var result = Client.TrySendAsText("test");
var result = Client.TrySend("test".AsMemory(), WebSocketMessageType.Text);
Assert.False(result);
}

Expand Down
Loading
Loading