diff --git a/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs index 8ce9fea..68b417e 100644 --- a/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs +++ b/src/WebSocket.Rx.Benchmarks/EndToEndBenchmarks.cs @@ -46,7 +46,7 @@ public async Task SetupAsync() { if (msg.Message.IsText) { - await _server.SendAsTextAsync(msg.Metadata.Id, msg.Message.Text ?? "", ct); + await _server.SendAsync(msg.Metadata.Id, msg.Message.Text, WebSocketMessageType.Text, ct); } }); @@ -78,11 +78,12 @@ public async Task CleanupAsync() public async Task SingleRoundTrip() { var echoTask = _client.MessageReceived - .Where(m => m is { IsText: true, Text: "ping" }) + .Where(m => m is { IsText: true }) + .Where(m => m.Text.ToString() is "ping") .FirstAsync(); - await _client.SendAsTextAsync("ping"); - return (await echoTask).Text; + await _client.SendAsync("ping".AsMemory(), WebSocketMessageType.Text); + return (await echoTask).Text.ToString(); } // ----------------------------------------------------------------------- @@ -102,7 +103,7 @@ public async Task SequentialRoundTrips() for (var i = 0; i < RoundTrips; i++) { - await _client.SendAsTextAsync($"msg-{i}", cts.Token); + await _client.SendAsync($"msg-{i}".AsMemory(), WebSocketMessageType.Text, cts.Token); } while (received < RoundTrips && !cts.IsCancellationRequested) @@ -123,7 +124,7 @@ public int TrySendAsText_Throughput() var sent = 0; for (var i = 0; i < 1_000; i++) { - if (_client.TrySendAsText($"msg-{i}")) + if (_client.TrySend($"msg-{i}".AsMemory(), WebSocketMessageType.Text)) { sent++; } diff --git a/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs index 48f984a..5b1c9be 100644 --- a/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs +++ b/src/WebSocket.Rx.Benchmarks/MessagePipelineBenchmarks.cs @@ -25,15 +25,15 @@ public class MessagePipelineBenchmarks [Params(64, 512, 4_096)] public int PayloadSize { get; set; } private ReactiveWebSocketClient _client = null!; - private ReceivedMessage _textMessage = null!; - private ReceivedMessage _binaryMessage = null!; + private Message _textMessage = null!; + private Message _binaryMessage = null!; [GlobalSetup] public void Setup() { _client = new ReactiveWebSocketClient(new Uri("ws://localhost:9999")); - _textMessage = ReceivedMessage.TextMessage(new string('x', PayloadSize)); - _binaryMessage = ReceivedMessage.BinaryMessage(new byte[PayloadSize]); + _textMessage = Message.Create(new string('x', PayloadSize)); + _binaryMessage = Message.Create(new byte[PayloadSize]); } [GlobalCleanup] diff --git a/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs index b70e39a..6f5709a 100644 --- a/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs +++ b/src/WebSocket.Rx.Benchmarks/SendChannelBenchmarks.cs @@ -1,4 +1,5 @@ using System; +using System.Net.WebSockets; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; @@ -68,8 +69,9 @@ public bool TrySendAsBinary_String() var result = false; for (var i = 0; i < MessageCount; i++) { - result = _client.TrySendAsBinary(_textPayload); + result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Binary); } + return result; } @@ -82,8 +84,9 @@ public bool TrySendAsBinary_Bytes() var result = false; for (var i = 0; i < MessageCount; i++) { - result = _client.TrySendAsBinary(_binaryPayload); + result = _client.TrySend(_binaryPayload, WebSocketMessageType.Binary); } + return result; } @@ -96,8 +99,9 @@ public bool TrySendAsText_String() var result = false; for (var i = 0; i < MessageCount; i++) { - result = _client.TrySendAsText(_textPayload); + result = _client.TrySend(_textPayload.AsMemory(), WebSocketMessageType.Text); } + return result; } @@ -110,8 +114,9 @@ public bool TrySendAsText_Bytes() var result = false; for (var i = 0; i < MessageCount; i++) { - result = _client.TrySendAsText(_binaryPayload); + result = _client.TrySend(_binaryPayload, WebSocketMessageType.Text); } + return result; } @@ -124,8 +129,9 @@ public async Task SendAsBinaryAsync_String() var result = false; for (var i = 0; i < MessageCount; i++) { - result = await _client.SendAsBinaryAsync(_textPayload); + result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Binary); } + return result; } @@ -138,8 +144,9 @@ public async Task SendAsTextAsync_String() var result = false; for (var i = 0; i < MessageCount; i++) { - result = await _client.SendAsTextAsync(_textPayload); + result = await _client.SendAsync(_textPayload.AsMemory(), WebSocketMessageType.Text); } + return result; } @@ -155,6 +162,7 @@ public byte[] EncodingOverhead() { result = System.Text.Encoding.UTF8.GetBytes(_textPayload); } + return result; } } \ No newline at end of file diff --git a/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs b/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs index 12cb111..0d110c9 100644 --- a/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs +++ b/src/WebSocket.Rx.Benchmarks/ServerBroadcastBenchmarks.cs @@ -1,4 +1,5 @@ -using System.Net; +using System; +using System.Net; using System.Net.WebSockets; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; @@ -50,7 +51,7 @@ public async Task CleanupAsync() [Benchmark(Baseline = true, Description = "BroadcastAsTextAsync (0 clients)")] public async Task BroadcastAsTextAsync_Empty() { - return await _server.BroadcastAsTextAsync(_textPayload); + return await _server.BroadcastAsync(_textPayload.AsMemory(), WebSocketMessageType.Text); } // ----------------------------------------------------------------------- @@ -59,7 +60,7 @@ public async Task BroadcastAsTextAsync_Empty() [Benchmark(Description = "BroadcastAsBinaryAsync(byte[]) (0 clients)")] public async Task BroadcastAsBinaryAsync_Bytes_Empty() { - return await _server.BroadcastAsBinaryAsync(_binaryPayload); + return await _server.BroadcastAsync(_binaryPayload, WebSocketMessageType.Binary); } // ----------------------------------------------------------------------- @@ -68,7 +69,7 @@ public async Task BroadcastAsBinaryAsync_Bytes_Empty() [Benchmark(Description = "TryBroadcastAsText(string) (0 clients)")] public bool TryBroadcastAsText_Empty() { - return _server.TryBroadcastAsText(_textPayload); + return _server.TryBroadcast(_textPayload.AsMemory(), WebSocketMessageType.Binary); } // ----------------------------------------------------------------------- @@ -77,7 +78,7 @@ public bool TryBroadcastAsText_Empty() [Benchmark(Description = "TryBroadcastAsBinary(byte[]) (0 clients)")] public bool TryBroadcastAsBinary_Empty() { - return _server.TryBroadcastAsBinary(_binaryPayload); + return _server.TryBroadcast(_binaryPayload, WebSocketMessageType.Binary); } // ----------------------------------------------------------------------- diff --git a/src/WebSocket.Rx.IntegrationTests/Integration.DisposeTests.cs b/src/WebSocket.Rx.IntegrationTests/Integration.DisposeTests.cs index 7a98574..8a57a30 100644 --- a/src/WebSocket.Rx.IntegrationTests/Integration.DisposeTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/Integration.DisposeTests.cs @@ -1,4 +1,5 @@ -using WebSocket.Rx.IntegrationTests.Internal; +using System.Net.WebSockets; +using WebSocket.Rx.IntegrationTests.Internal; namespace WebSocket.Rx.IntegrationTests; @@ -71,7 +72,7 @@ public async Task Integration_DisposeUnderLoad_ShouldHandleGracefully() { try { - await client.SendAsTextAsync($"Message {i}"); + await client.SendAsync($"Message {i}".AsMemory(), WebSocketMessageType.Text); await Task.Delay(10, TestContext.Current.CancellationToken); } catch (ObjectDisposedException) @@ -111,4 +112,4 @@ public async Task Integration_ConcurrentDispose_ShouldBeThreadSafe() // Assert Assert.True(Server.IsDisposed); } -} +} \ No newline at end of file diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.EncodingTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.EncodingTests.cs index b992d8e..b87ff9c 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.EncodingTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.EncodingTests.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Net.WebSockets; +using System.Text; using R3; using WebSocket.Rx.IntegrationTests.Internal; @@ -26,7 +27,7 @@ public async Task MessageEncoding_CustomEncoding_ShouldUseCustomEncoding() receivedMessage = Encoding.ASCII.GetString(msg); return receivedMessage == "ASCII Text"; }); - Client.TrySendAsBinary("ASCII Text"); + Client.TrySend("ASCII Text".AsMemory(), WebSocketMessageType.Binary); await receiveTask; // Assert @@ -41,9 +42,9 @@ public async Task MessageReceived_WithTextConversion_ShouldConvertToText() Client.IsTextMessageConversionEnabled = true; string? receivedText = null; - Client.MessageReceived.Subscribe(msg => receivedText = msg.Text); + Client.MessageReceived.Subscribe(msg => receivedText = msg.Text.ToString()); - var messageTask = WaitForEventAsync(Client.MessageReceived, msg => msg.Text == "Converted Text"); + var messageTask = WaitForEventAsync(Client.MessageReceived, msg => msg.Text.ToString() == "Converted Text"); await Client.StartOrFailAsync(TestContext.Current.CancellationToken); @@ -52,7 +53,7 @@ public async Task MessageReceived_WithTextConversion_ShouldConvertToText() var received = await messageTask; // Assert - Assert.Equal("Converted Text", received.Text); + Assert.Equal("Converted Text", received.Text.ToString()); } [Fact(Timeout = DefaultTimeoutMs)] @@ -62,12 +63,12 @@ public async Task MessageReceived_WithoutTextConversion_ShouldNotConvertToText() Client = new ReactiveWebSocketClient(new Uri(Server.WebSocketUrl)); Client.IsTextMessageConversionEnabled = false; - string? receivedText = null; + char[]? receivedText = null; byte[]? receivedBinary = null; Client.MessageReceived.Subscribe(msg => { - receivedText = msg.Text; - receivedBinary = msg.Binary; + receivedText = msg.Text.ToArray(); + receivedBinary = msg.Binary.ToArray(); }); var connectTask = WaitForEventAsync(Client.ConnectionHappened); await Client.StartOrFailAsync(TestContext.Current.CancellationToken); @@ -79,7 +80,9 @@ public async Task MessageReceived_WithoutTextConversion_ShouldNotConvertToText() await receiveTask; // Assert - Assert.Null(receivedText); + Assert.NotNull(receivedText); + Assert.Empty(receivedText); Assert.NotNull(receivedBinary); + Assert.NotEmpty(receivedBinary); } } \ No newline at end of file diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ErrorHandlingTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ErrorHandlingTests.cs index bcfbcda..f83b26c 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ErrorHandlingTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ErrorHandlingTests.cs @@ -1,4 +1,5 @@ -using R3; +using System.Net.WebSockets; +using R3; using WebSocket.Rx.IntegrationTests.Internal; namespace WebSocket.Rx.IntegrationTests; @@ -16,7 +17,7 @@ public async Task Send_EmptyByteArray_ShouldReturnFalse() await Client.StartOrFailAsync(TestContext.Current.CancellationToken); // Act - var result = Client.TrySendAsBinary([]); + var result = Client.TrySend(new ReadOnlyMemory([]), WebSocketMessageType.Binary); // Assert Assert.False(result); @@ -30,7 +31,7 @@ public async Task SendAsText_EmptyByteArray_ShouldReturnFalse() await Client.StartOrFailAsync(TestContext.Current.CancellationToken); // Act - var result = Client.TrySendAsText([]); + var result = Client.TrySend(new ReadOnlyMemory([]), WebSocketMessageType.Text); // Assert Assert.False(result); @@ -43,8 +44,11 @@ public async Task SendInstant_WhenNotConnected_ShouldNotThrow() Client = new ReactiveWebSocketClient(new Uri(InvalidUrl)); // Act & Assert - await Client.SendInstantAsync("test", TestContext.Current.CancellationToken); - await Client.SendInstantAsync([1, 2, 3], TestContext.Current.CancellationToken); + await Client.SendInstantAsync("test".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); + await Client.SendInstantAsync(new byte[] { 1, 2, 3 }, WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); + Assert.True(true); } @@ -97,7 +101,7 @@ public void Send_NullString_ShouldReturnFalse() Client = new ReactiveWebSocketClient(new Uri(InvalidUrl)); // Act - var result = Client.TrySendAsText((string)null!); + var result = Client.TrySend((ReadOnlyMemory)null!, WebSocketMessageType.Text); // Assert Assert.False(result); @@ -111,7 +115,8 @@ public async Task SendInstant_NullString_ShouldNotThrow() await Client.StartOrFailAsync(TestContext.Current.CancellationToken); // Act & Assert - await Client.SendInstantAsync((string)null!, TestContext.Current.CancellationToken); + await Client.SendInstantAsync((ReadOnlyMemory)null!, WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); Assert.True(true); } -} +} \ No newline at end of file diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.KeepAliveTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.KeepAliveTests.cs index 173b0c7..f569df8 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.KeepAliveTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.KeepAliveTests.cs @@ -147,7 +147,7 @@ public async Task KeepAlive_WithZeroInterval_ShouldDisableKeepAlive() public async Task KeepAlive_ConnectionShouldStayAliveWithinInterval() { // Arrange - var receivedMessages = new List(); + var receivedMessages = new List(); Client = new ReactiveWebSocketClient(new Uri(Server.WebSocketUrl)) { KeepAliveInterval = TimeSpan.FromMilliseconds(500), @@ -158,7 +158,7 @@ public async Task KeepAlive_ConnectionShouldStayAliveWithinInterval() // Act await Client.StartOrFailAsync(TestContext.Current.CancellationToken); - var messageTask = WaitForEventAsync(Client.MessageReceived, m => m.Text == "test"); + var messageTask = WaitForEventAsync(Client.MessageReceived, m => m.Text.ToString() == "test"); await Server.SendToAllAsync("test"); await messageTask; @@ -166,7 +166,7 @@ public async Task KeepAlive_ConnectionShouldStayAliveWithinInterval() Assert.True(Client.IsRunning); Assert.Equal(WebSocketState.Open, Client.NativeClient.State); Assert.Single(receivedMessages); - Assert.Equal("test", receivedMessages[0].Text); + Assert.Equal("test", receivedMessages[0].Text.ToString()); } [Fact(Timeout = DefaultTimeoutMs)] @@ -266,7 +266,8 @@ public async Task KeepAlive_DuringMessageExchange_ShouldNotInterfere() // Act for (var i = 0; i < 5; i++) { - await Client.SendInstantAsync($"Message {i}", TestContext.Current.CancellationToken); + await Client.SendInstantAsync($"Message {i}".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); await Task.Delay(50, TestContext.Current.CancellationToken); } @@ -485,7 +486,8 @@ public async Task KeepAlive_ServerRespondsToClientMessages_ShouldResetKeepAliveT for (var i = 0; i < 5; i++) { - await Client.SendInstantAsync($"msg{i}", TestContext.Current.CancellationToken); + await Client.SendInstantAsync($"msg{i}".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); await Task.Delay(100, TestContext.Current.CancellationToken); } diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.LifecycleTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.LifecycleTests.cs index f38859b..66bae8a 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.LifecycleTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.LifecycleTests.cs @@ -220,7 +220,7 @@ public async Task AfterDispose_OperationsShouldThrowOrReturnFalse() await exceptionSource.Task.WaitAsync(TimeSpan.FromSeconds(1), TestContext.Current.CancellationToken); Assert.IsType(taskResult.Exception); - var result = Client.TrySendAsText("test"); + var result = Client.TrySend("test".AsMemory(), WebSocketMessageType.Text); Assert.False(result); } diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ReceivingTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ReceivingTests.cs index 50e71a6..e984deb 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ReceivingTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.ReceivingTests.cs @@ -13,7 +13,7 @@ public async Task MessageReceived_WhenServerSendsMessage_ShouldReceive() Client.IsTextMessageConversionEnabled = true; var receivedMessage = ""; - Client.MessageReceived.Subscribe(msg => receivedMessage = msg.Text ?? ""); + Client.MessageReceived.Subscribe(msg => receivedMessage = msg.Text.ToString()); await Client.StartOrFailAsync(TestContext.Current.CancellationToken); await Task.Delay(50, TestContext.Current.CancellationToken); @@ -34,7 +34,7 @@ public async Task MessageReceived_BinaryMessage_ShouldReceiveBinary() Client.IsTextMessageConversionEnabled = false; byte[]? receivedBytes = null; - Client.MessageReceived.Subscribe(msg => receivedBytes = msg.Binary); + Client.MessageReceived.Subscribe(msg => receivedBytes = msg.Binary.ToArray()); await Client.StartOrFailAsync(TestContext.Current.CancellationToken); await Task.Delay(50, TestContext.Current.CancellationToken); @@ -58,7 +58,7 @@ public async Task StreamFakeMessage_ShouldTriggerObservable() var received = false; Client.MessageReceived.Subscribe(_ => received = true); - var fakeMessage = ReceivedMessage.TextMessage("Fake"); + var fakeMessage = Message.Create("Fake".AsMemory()); // Act Client.StreamFakeMessage(fakeMessage); @@ -67,4 +67,4 @@ public async Task StreamFakeMessage_ShouldTriggerObservable() // Assert Assert.True(received); } -} +} \ No newline at end of file diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.SendingTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.SendingTests.cs index e81cb7d..bccc88d 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.SendingTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.SendingTests.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Net.WebSockets; +using System.Text; using R3; using WebSocket.Rx.IntegrationTests.Internal; @@ -18,7 +19,7 @@ public async Task Send_String_WhenConnected_ShouldSendMessage() Server.OnMessageReceived += msg => tcs.TrySetResult(msg); // Act - var result = Client.TrySendAsText("Hello World"); + var result = Client.TrySend("Hello World".AsMemory(), WebSocketMessageType.Text); var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); // Assert @@ -40,7 +41,7 @@ public async Task Send_ByteArray_WhenConnected_ShouldSendMessage() var testData = new byte[] { 1, 2, 3, 4, 5 }; // Act - var result = Client.TrySendAsBinary(testData); + var result = Client.TrySend(testData, WebSocketMessageType.Binary); var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); // Assert @@ -55,7 +56,7 @@ public void Send_WhenNotRunning_ShouldReturnFalse() Client = new ReactiveWebSocketClient(new Uri(Server.WebSocketUrl)); // Act - var result = Client.TrySendAsText("test"); + var result = Client.TrySend("test".AsMemory(), WebSocketMessageType.Binary); // Assert Assert.False(result); @@ -69,7 +70,7 @@ public async Task Send_EmptyString_ShouldReturnFalse() await Client.StartOrFailAsync(TestContext.Current.CancellationToken); // Act - var result = Client.TrySendAsText(""); + var result = Client.TrySend("".AsMemory(), WebSocketMessageType.Text); // Assert Assert.False(result); @@ -87,7 +88,8 @@ public async Task SendInstant_String_WhenConnected_ShouldSendImmediately() Server.OnBytesReceived += msg => tcs.TrySetResult(Encoding.UTF8.GetString(msg)); // Act - await Client.SendInstantAsync("Instant", TestContext.Current.CancellationToken); + await Client.SendInstantAsync("Instant".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); // Assert @@ -107,7 +109,8 @@ public async Task SendInstantAsync_ByteArray_WhenConnected_ShouldSendMessage() var testData = new byte[] { 5, 4, 3, 2, 1 }; // Act - var result = await Client.SendInstantAsync(testData, TestContext.Current.CancellationToken); + var result = await Client.SendInstantAsync(testData, WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); var receivedBytes = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); // Assert @@ -126,7 +129,8 @@ public async Task SendAsBinaryAsync_String_WhenConnected_ShouldSendMessageAsBina Server.OnBytesReceived += bytes => tcs.TrySetResult(bytes); // Act - var result = await Client.SendAsBinaryAsync("BinaryString", TestContext.Current.CancellationToken); + var result = await Client.SendAsync("BinaryString".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); var receivedBytes = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); // Assert @@ -145,7 +149,8 @@ public async Task SendAsTextAsync_String_WhenConnected_ShouldSendMessageAsText() Server.OnMessageReceived += msg => tcs.TrySetResult(msg); // Act - var result = await Client.SendAsTextAsync("Hello Text", TestContext.Current.CancellationToken); + var result = await Client.SendAsync("Hello Text".AsMemory(), WebSocketMessageType.Text, + TestContext.Current.CancellationToken); var receivedText = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); // Assert @@ -168,7 +173,8 @@ public async Task SendInstant_Observable_ShouldSendMessages() if (receivedMessages.Count == 2) tcs.TrySetResult(true); }; - var messages = Observable.Return("Msg1").Concat(Observable.Return("Msg2")); + var messages = Observable.Return(Message.Create("Msg1"u8.ToArray())) + .Concat(Observable.Return(Message.Create("Msg2"u8.ToArray()))); // Act using var subscription = Client.SendInstant(messages).Subscribe(); @@ -178,5 +184,4 @@ public async Task SendInstant_Observable_ShouldSendMessages() Assert.Contains("Msg1", receivedMessages); Assert.Contains("Msg2", receivedMessages); } - } \ No newline at end of file diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.StressTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.StressTests.cs index c56af36..36a6f6b 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.StressTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketClient.StressTests.cs @@ -14,7 +14,7 @@ public async Task LargeMessage_ShouldSendAndReceiveCorrectly() Client.IsTextMessageConversionEnabled = true; var largeMessage = new string('A', 1024 * 1024); - var messageReceivedTask = WaitForEventAsync(Client.MessageReceived, msg => msg.Text == largeMessage); + var messageReceivedTask = WaitForEventAsync(Client.MessageReceived, msg => msg.Text.ToString() == largeMessage); await Client.StartOrFailAsync(TestContext.Current.CancellationToken); @@ -23,7 +23,7 @@ public async Task LargeMessage_ShouldSendAndReceiveCorrectly() var received = await messageReceivedTask; // Assert - Assert.Equal(largeMessage, received.Text); + Assert.Equal(largeMessage, received.Text.ToString()); } [Fact(Timeout = DefaultTimeoutMs)] @@ -36,7 +36,8 @@ public async Task RapidConnectDisconnect_ShouldHandleGracefully() for (var i = 0; i < 5; i++) { await Client.StartOrFailAsync(TestContext.Current.CancellationToken); - await Client.StopAsync(WebSocketCloseStatus.NormalClosure, "Rapid test", TestContext.Current.CancellationToken); + await Client.StopAsync(WebSocketCloseStatus.NormalClosure, "Rapid test", + TestContext.Current.CancellationToken); await WaitUntilAsync(Client.DisconnectionHappened, () => !Client.IsRunning); } @@ -90,4 +91,4 @@ public async Task MultipleReconnects_InParallel_ShouldNotCauseConcurrencyIssues( Assert.True(Client.IsStarted); Assert.True(Client.IsRunning); } -} +} \ No newline at end of file diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.BroadcastTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.BroadcastTests.cs index b47edf8..072f4a1 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.BroadcastTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.BroadcastTests.cs @@ -1,4 +1,5 @@ -using WebSocket.Rx.IntegrationTests.Internal; +using System.Net.WebSockets; +using WebSocket.Rx.IntegrationTests.Internal; namespace WebSocket.Rx.IntegrationTests; @@ -17,7 +18,8 @@ public async Task BroadcastInstantAsync_WithMultipleClients_ShouldSendToAll() var receiveTask2 = ReceiveTextAsync(client2, TestContext.Current.CancellationToken); // Act - await Server.BroadcastInstantAsync("Broadcast Message", TestContext.Current.CancellationToken); + await Server.BroadcastInstantAsync("Broadcast Message".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert Assert.Equal("Broadcast Message", await receiveTask1); @@ -28,7 +30,8 @@ public async Task BroadcastInstantAsync_WithMultipleClients_ShouldSendToAll() public async Task BroadcastInstantAsync_WithNoClients_ShouldReturnTrue() { // Act - var result = await Server.BroadcastInstantAsync("test", TestContext.Current.CancellationToken); + var result = await Server.BroadcastInstantAsync("test".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert Assert.True(result); @@ -43,7 +46,8 @@ public async Task BroadcastInstantAsync_WithSingleClient_ShouldSendCorrectly() await connectionTask; // Act - await Server.BroadcastInstantAsync("Single Broadcast", TestContext.Current.CancellationToken); + await Server.BroadcastInstantAsync("Single Broadcast".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert Assert.Equal("Single Broadcast", await ReceiveTextAsync(client, TestContext.Current.CancellationToken)); @@ -61,7 +65,8 @@ public async Task BroadcastInstantAsync_ByteArray_ShouldSendToAll() var binaryData = new byte[] { 10, 20, 30 }; // Act - await Server.BroadcastInstantAsync(binaryData, TestContext.Current.CancellationToken); + await Server.BroadcastInstantAsync(binaryData, WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert var buffer1 = new byte[1024]; @@ -83,7 +88,8 @@ public async Task BroadcastAsBinaryAsync_WithMultipleClients_ShouldSendToAll() await connectionTask; // Act - await Server.BroadcastAsBinaryAsync("Binary Broadcast", TestContext.Current.CancellationToken); + await Server.BroadcastAsync("Binary Broadcast".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert Assert.Equal("Binary Broadcast", await ReceiveTextAsync(client1, TestContext.Current.CancellationToken)); @@ -100,11 +106,11 @@ public async Task BroadcastAsTextAsync_WithMultipleClients_ShouldSendToAll() await connectionTask; // Act - await Server.BroadcastAsTextAsync("Text Broadcast", TestContext.Current.CancellationToken); + await Server.BroadcastAsync("Text Broadcast".AsMemory(), WebSocketMessageType.Text, + TestContext.Current.CancellationToken); // Assert Assert.Equal("Text Broadcast", await ReceiveTextAsync(client1, TestContext.Current.CancellationToken)); Assert.Equal("Text Broadcast", await ReceiveTextAsync(client2, TestContext.Current.CancellationToken)); } - } \ No newline at end of file diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.LifecycleTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.LifecycleTests.cs index 2b9a338..0578e2f 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.LifecycleTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.LifecycleTests.cs @@ -37,7 +37,8 @@ public async Task Should_Handle_Dispose_Without_Deadlock() await server.StartAsync(TestContext.Current.CancellationToken); // Act - await server.SendInstantAsync(Guid.Empty, "test", TestContext.Current.CancellationToken); + await server.SendInstantAsync(Guid.Empty, "test".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); await server.StopAsync(WebSocketCloseStatus.NormalClosure, "test", TestContext.Current.CancellationToken); server.Dispose(); diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ReceivingTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ReceivingTests.cs index 7eaf9d8..c7b47df 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ReceivingTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ReceivingTests.cs @@ -19,7 +19,7 @@ public async Task Should_Receive_Text_Message_From_Client() // Assert var receivedMessage = await messageTask; Assert.NotNull(receivedMessage); - Assert.Equal("Hello Server", receivedMessage.Message.Text); + Assert.Equal("Hello Server", receivedMessage.Message.Text.ToString()); Assert.NotNull(receivedMessage.Metadata); } @@ -41,7 +41,6 @@ await client.SendAsync( // Assert var receivedMessage = await messageTask; Assert.NotNull(receivedMessage); - Assert.NotNull(receivedMessage.Message.Binary); Assert.Equal(binaryData, receivedMessage.Message.Binary); } @@ -49,7 +48,7 @@ await client.SendAsync( public async Task Should_Receive_Multiple_Messages_From_Same_Client() { // Arrange - var messages = new List(); + var messages = new List(); using var subscription = Server.Messages.Subscribe(messages.Add); using var client = await ConnectClientAsync(TestContext.Current.CancellationToken); @@ -62,8 +61,8 @@ public async Task Should_Receive_Multiple_Messages_From_Same_Client() // Assert Assert.Equal(2, messages.Count); - Assert.Equal("Message 1", messages[0].Message.Text); - Assert.Equal("Message 2", messages[1].Message.Text); + Assert.Equal("Message 1", messages[0].Message.Text.ToString()); + Assert.Equal("Message 2", messages[1].Message.Text.ToString()); } [Fact(Timeout = DefaultTimeoutMs)] @@ -103,7 +102,7 @@ await client.SendAsync( // Assert var receivedMessage = await messageTask; - Assert.Empty(receivedMessage.Message.Binary!); + Assert.Empty(receivedMessage.Message.Binary.ToArray()); } [Fact(Timeout = DefaultTimeoutMs)] @@ -114,7 +113,10 @@ public async Task Should_Maintain_Message_Order() var receivedTexts = new List(); using var subscription = Server.Messages.Subscribe(msg => { - if (msg.Message.Text != null) receivedTexts.Add(msg.Message.Text); + if (!msg.Message.Text.IsEmpty) + { + receivedTexts.Add(msg.Message.Text.ToString()); + } }); // Act diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ScenarioTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ScenarioTests.cs index 7bc79ba..ca35baf 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ScenarioTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.ScenarioTests.cs @@ -12,7 +12,7 @@ public async Task EchoServer_Should_Reply_To_All_Messages() // Arrange using var subscription = Server.Messages.SubscribeAwait(async (msg, ct) => { - await Server.SendInstantAsync(msg.Metadata.Id, msg.Message.Text!, ct); + await Server.SendInstantAsync(msg.Metadata.Id, msg.Message.Text, msg.Message.Type, ct); }); using var client = await ConnectClientAsync(TestContext.Current.CancellationToken); @@ -34,7 +34,8 @@ public async Task ChatRoom_Should_Broadcast_Messages_To_Other_Clients() var otherClients = Server.ConnectedClients.Keys.Where(id => id != msg.Metadata.Id); foreach (var clientId in otherClients) { - await Server.SendInstantAsync(clientId, $"{msg.Metadata.Id}: {msg.Message.Text}", + await Server.SendInstantAsync(clientId, $"{msg.Metadata.Id}: {msg.Message.Text}".AsMemory(), + WebSocketMessageType.Binary, ct); } }); @@ -42,7 +43,7 @@ await Server.SendInstantAsync(clientId, $"{msg.Metadata.Id}: {msg.Message.Text}" var connectionTask1 = WaitUntilAsync(Server.ClientConnected, () => Server.ClientCount >= 1); using var client1 = await ConnectClientAsync(TestContext.Current.CancellationToken); await connectionTask1; - + var connectionTask2 = WaitUntilAsync(Server.ClientConnected, () => Server.ClientCount >= 2); using var client2 = await ConnectClientAsync(TestContext.Current.CancellationToken); await connectionTask2; diff --git a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.SendingTests.cs b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.SendingTests.cs index 563a76c..7c948e1 100644 --- a/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.SendingTests.cs +++ b/src/WebSocket.Rx.IntegrationTests/ReactiveWebSocketServer.SendingTests.cs @@ -1,4 +1,5 @@ -using WebSocket.Rx.IntegrationTests.Internal; +using System.Net.WebSockets; +using WebSocket.Rx.IntegrationTests.Internal; namespace WebSocket.Rx.IntegrationTests; @@ -14,7 +15,8 @@ public async Task Should_Send_Text_To_Specific_Client() var clientId = Server.ConnectedClients.Keys.First(); // Act - await Server.SendInstantAsync(clientId, "Hello Client", TestContext.Current.CancellationToken); + await Server.SendInstantAsync(clientId, "Hello Client".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert var received = await ReceiveTextAsync(client, TestContext.Current.CancellationToken); @@ -32,7 +34,8 @@ public async Task Should_Send_Binary_To_Specific_Client() var binaryData = new byte[] { 5, 4, 3, 2, 1 }; // Act - await Server.SendInstantAsync(clientId, binaryData, TestContext.Current.CancellationToken); + await Server.SendInstantAsync(clientId, binaryData, WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert var buffer = new byte[1024]; @@ -44,7 +47,8 @@ public async Task Should_Send_Binary_To_Specific_Client() public async Task Should_Return_False_When_Sending_To_Non_Existent_Client() { // Act - var result = await Server.SendInstantAsync(Guid.NewGuid(), "test", TestContext.Current.CancellationToken); + var result = await Server.SendInstantAsync(Guid.NewGuid(), "test".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert Assert.False(result); @@ -60,8 +64,10 @@ public async Task Should_Send_Multiple_Messages_To_Same_Client() var clientId = Server.ConnectedClients.Keys.First(); // Act - await Server.SendInstantAsync(clientId, "Msg 1", TestContext.Current.CancellationToken); - await Server.SendInstantAsync(clientId, "Msg 2", TestContext.Current.CancellationToken); + await Server.SendInstantAsync(clientId, "Msg 1".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); + await Server.SendInstantAsync(clientId, "Msg 2".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert Assert.Equal("Msg 1", await ReceiveTextAsync(client, TestContext.Current.CancellationToken)); @@ -78,7 +84,7 @@ public async Task TrySendAsText_Should_Return_True_For_Existing_Client() var clientId = Server.ConnectedClients.Keys.First(); // Act - var result = Server.TrySendAsText(clientId, "test"); + var result = Server.TrySend(clientId, "test".AsMemory(), WebSocketMessageType.Text); // Assert Assert.True(result); @@ -89,7 +95,7 @@ public async Task TrySendAsText_Should_Return_True_For_Existing_Client() public void TrySendAsText_Should_Return_False_For_Non_Existent_Client() { // Act - var result = Server.TrySendAsText(Guid.NewGuid(), "test"); + var result = Server.TrySend(Guid.NewGuid(), "test".AsMemory(), WebSocketMessageType.Binary); // Assert Assert.False(result); @@ -105,7 +111,7 @@ public async Task TrySendAsBinary_Should_Return_True_For_Existing_Client() var clientId = Server.ConnectedClients.Keys.First(); // Act - var result = Server.TrySendAsBinary(clientId, "test"); + var result = Server.TrySend(clientId, "test".AsMemory(), WebSocketMessageType.Binary); // Assert Assert.True(result); @@ -122,7 +128,8 @@ public async Task SendInstantAsync_Should_Send_Message_Immediately() var clientId = Server.ConnectedClients.Keys.First(); // Act - await Server.SendInstantAsync(clientId, "Instant message", TestContext.Current.CancellationToken); + await Server.SendInstantAsync(clientId, "Instant message".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert var received = await ReceiveTextAsync(client, TestContext.Current.CancellationToken); @@ -141,9 +148,10 @@ public async Task Should_Handle_Client_Send_After_Disconnect() await WaitUntilAsync(Server.ClientDisconnected, () => Server.ClientCount == 0); // Act - var result = await Server.SendInstantAsync(clientId, "test", TestContext.Current.CancellationToken); + var result = await Server.SendInstantAsync(clientId, "test".AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert Assert.False(result); } -} +} \ No newline at end of file diff --git a/src/WebSocket.Rx.UnitTests/ServerWebSocketAdapter.MessageTests.cs b/src/WebSocket.Rx.UnitTests/ServerWebSocketAdapter.MessageTests.cs index b6469fc..9b72e09 100644 --- a/src/WebSocket.Rx.UnitTests/ServerWebSocketAdapter.MessageTests.cs +++ b/src/WebSocket.Rx.UnitTests/ServerWebSocketAdapter.MessageTests.cs @@ -16,7 +16,7 @@ public void Send_WithByteArray_ShouldQueueMessage() var testData = new byte[] { 1, 2, 3, 4 }; // Act - var result = Adapter.TrySendAsBinary(testData); + var result = Adapter.TrySend(testData.AsMemory(), WebSocketMessageType.Binary); // Assert Assert.True(result); @@ -31,7 +31,7 @@ public void Send_WithString_ShouldQueueEncodedMessage() const string testMessage = "Test message"; // Act - var result = Adapter.TrySendAsBinary(testMessage); + var result = Adapter.TrySend(testMessage.AsMemory(), WebSocketMessageType.Binary); // Assert Assert.True(result); @@ -45,7 +45,7 @@ public void Send_WithEmptyByteArray_ShouldReturnFalse() new Metadata(Guid.Empty, IPAddress.Any, 0)); // Act - var result = Adapter.TrySendAsBinary([]); + var result = Adapter.TrySend(new ReadOnlyMemory([]), WebSocketMessageType.Binary); // Assert Assert.False(result); @@ -60,7 +60,7 @@ public void SendAsText_WithValidMessage_ShouldQueueTextMessage() const string testMessage = "Text message"; // Act - var result = Adapter.TrySendAsText(testMessage); + var result = Adapter.TrySend(testMessage.AsMemory(), WebSocketMessageType.Text); // Assert Assert.True(result); @@ -71,22 +71,23 @@ public async Task SendInstant_WithByteArray_ShouldSendImmediately() { // Arrange MockWebSocket.SendAsync( - Arg.Any>(), + Arg.Any>(), Arg.Any(), Arg.Any(), Arg.Any()) - .Returns(Task.CompletedTask); + .Returns(ValueTask.CompletedTask); Adapter = new ReactiveWebSocketServer.ServerWebSocketAdapter(MockWebSocket, new Metadata(Guid.Empty, IPAddress.Any, 0)); var testData = new byte[] { 1, 2, 3 }; // Act - await Adapter.SendInstantAsync(testData, TestContext.Current.CancellationToken); + await Adapter.SendInstantAsync(testData.AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert await MockWebSocket.Received(1).SendAsync( - Arg.Any>(), + Arg.Any>(), WebSocketMessageType.Binary, true, Arg.Any()); @@ -97,22 +98,23 @@ public async Task SendInstant_WithString_ShouldSendEncodedMessage() { // Arrange MockWebSocket.SendAsync( - Arg.Any>(), + Arg.Any>(), Arg.Any(), Arg.Any(), Arg.Any()) - .Returns(Task.CompletedTask); + .Returns(ValueTask.CompletedTask); Adapter = new ReactiveWebSocketServer.ServerWebSocketAdapter(MockWebSocket, new Metadata(Guid.Empty, IPAddress.Any, 0)); const string testMessage = "Instant message"; // Act - await Adapter.SendInstantAsync(testMessage, TestContext.Current.CancellationToken); + await Adapter.SendInstantAsync(testMessage.AsMemory(), WebSocketMessageType.Binary, + TestContext.Current.CancellationToken); // Assert await MockWebSocket.Received(1).SendAsync( - Arg.Any>(), + Arg.Any>(), WebSocketMessageType.Binary, true, Arg.Any()); @@ -128,11 +130,11 @@ public async Task SendInstant_WithClosedSocket_ShouldNotSend() var testData = new byte[] { 1, 2, 3 }; // Act - await Adapter.SendInstantAsync(testData, TestContext.Current.CancellationToken); + await Adapter.SendInstantAsync(testData, WebSocketMessageType.Binary, TestContext.Current.CancellationToken); // Assert await MockWebSocket.DidNotReceive().SendAsync( - Arg.Any>(), + Arg.Any>(), Arg.Any(), Arg.Any(), Arg.Any()); diff --git a/src/WebSocket.Rx/Disconnected.cs b/src/WebSocket.Rx/Disconnected.cs index ff2541f..4843cf2 100644 --- a/src/WebSocket.Rx/Disconnected.cs +++ b/src/WebSocket.Rx/Disconnected.cs @@ -9,9 +9,9 @@ public record Disconnected( string? SubProtocol = null, WebSocketException? Exception = null) { + internal bool IsClosingCanceled { get; private set; } + internal bool IsReconnectionCanceled { get; private set; } + public void CancelClosing() => IsClosingCanceled = true; public void CancelReconnection() => IsReconnectionCanceled = true; - - public bool IsClosingCanceled { get; private set; } - public bool IsReconnectionCanceled { get; private set; } } \ No newline at end of file diff --git a/src/WebSocket.Rx/Extensions.cs b/src/WebSocket.Rx/Extensions.cs index 9d5ebaf..c8e7ae6 100644 --- a/src/WebSocket.Rx/Extensions.cs +++ b/src/WebSocket.Rx/Extensions.cs @@ -1,4 +1,8 @@ +using System.Buffers; +using System.Net.WebSockets; +using System.Text; using R3; +using WebSocket.Rx.Internal; namespace WebSocket.Rx; @@ -6,154 +10,138 @@ public static class Extensions { extension(IReactiveWebSocketClient client) { - public Observable SendInstant(Observable messages) + public Observable SendInstant(Observable messages) { - return Observable.Create(observer => + return messages.SelectAwait(async (send, ct) => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.SubscribeAwait(async (msg, ct) => + return send.Type switch { - var result = await client.SendInstantAsync(msg, ct).ConfigureAwait(false); - observer.OnNext(result); - })); - - return disposables; - }); + _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct), + _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct), + _ => false + }; + }, maxConcurrent: 1); } - public Observable SendInstant(Observable messages) + public Observable Send(Observable messages) { - return Observable.Create(observer => + return messages.SelectAwait(async (send, ct) => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.SubscribeAwait(async (msg, ct) => + return send.Type switch { - var result = await client.SendInstantAsync(msg, ct).ConfigureAwait(false); - observer.OnNext(result); - })); - - return disposables; - }); + _ when send.IsText => await client.SendAsync(send.Text, send.Type, ct), + _ when send.IsBinary => await client.SendAsync(send.Binary, send.Type, ct), + _ => false + }; + }, maxConcurrent: 1); } - public Observable SendAsBinary(Observable messages) + public Observable TrySend(Observable messages) { - return Observable.Create(observer => + return messages.Select(send => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.SubscribeAwait(async (msg, ct) => + return send.Type switch { - var result = await client.SendAsBinaryAsync(msg, ct).ConfigureAwait(false); - observer.OnNext(result); - })); - - return disposables; + _ when send.IsText => client.TrySend(send.Text, send.Type), + _ when send.IsBinary => client.TrySend(send.Binary, send.Type), + _ => false + }; }); } + } - public Observable SendAsBinary(Observable messages) + extension(IReactiveWebSocketServer server) + { + public Observable SendInstant(Observable messages) { - return Observable.Create(observer => + return messages.SelectAwait(async (send, ct) => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.SubscribeAwait(async (msg, ct) => + var msg = send.Message; + return msg.Type switch { - var result = await client.SendAsBinaryAsync(msg, ct).ConfigureAwait(false); - observer.OnNext(result); - })); - - return disposables; - }); + _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct), + _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct), + _ => false + }; + }, maxConcurrent: 1); } - public Observable SendAsText(Observable messages) + public Observable Send(Observable messages) { - return Observable.Create(observer => + return messages.SelectAwait(async (send, ct) => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.SubscribeAwait(async (msg, ct) => + var msg = send.Message; + return msg.Type switch { - var result = await client.SendAsTextAsync(msg, ct).ConfigureAwait(false); - observer.OnNext(result); - })); - - return disposables; - }); + _ when msg.IsText => await server.SendAsync(send.Metadata.Id, msg.Text, msg.Type, ct), + _ when msg.IsBinary => await server.SendAsync(send.Metadata.Id, msg.Binary, msg.Type, ct), + _ => false + }; + }, maxConcurrent: 1); } - public Observable SendAsText(Observable messages) + public Observable TrySend(Observable messages) { - return Observable.Create(observer => + return messages.Select(send => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.SubscribeAwait(async (msg, ct) => + var msg = send.Message; + return msg.Type switch { - var result = await client.SendAsTextAsync(msg, ct).ConfigureAwait(false); - observer.OnNext(result); - })); - - return disposables; + _ when msg.IsText => server.TrySend(send.Metadata.Id, msg.Text, msg.Type), + _ when msg.IsBinary => server.TrySend(send.Metadata.Id, msg.Binary, msg.Type), + _ => false + }; }); } - public Observable TrySendAsBinary(Observable messages) + public Observable BroadcastInstant(Observable messages) { - return Observable.Create(observer => + return messages.SelectAwait(async (send, ct) => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.Subscribe(msg => + var msg = send.Message; + return msg.Type switch { - var result = client.TrySendAsBinary(msg); - observer.OnNext(result); - })); - - return disposables; + _ when msg.IsText => await server.BroadcastInstantAsync(msg.Text, msg.Type, ct), + _ when msg.IsBinary => await server.BroadcastInstantAsync(msg.Binary, msg.Type, ct), + _ => false + }; }); } - public Observable TrySendAsBinary(Observable messages) + public Observable BroadcastAsync(Observable messages) { - return Observable.Create(observer => + return messages.SelectAwait(async (send, ct) => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.Subscribe(msg => + var msg = send.Message; + return msg.Type switch { - var result = client.TrySendAsBinary(msg); - observer.OnNext(result); - })); - - return disposables; + _ when msg.IsText => await server.BroadcastAsync(msg.Text, msg.Type, ct), + _ when msg.IsBinary => await server.BroadcastAsync(msg.Binary, msg.Type, ct), + _ => false + }; }); } - public Observable TrySendAsText(Observable messages) + public Observable TryBroadcast(Observable messages) { - return Observable.Create(observer => + return messages.Select(send => { - var disposables = new CompositeDisposable(); - disposables.Add(messages.Subscribe(msg => + var msg = send.Message; + return msg.Type switch { - var result = client.TrySendAsText(msg); - observer.OnNext(result); - })); - - return disposables; + _ when msg.IsText => server.TryBroadcast(msg.Text, msg.Type), + _ when msg.IsBinary => server.TryBroadcast(msg.Binary, msg.Type), + _ => false + }; }); } + } - public Observable TrySendAsText(Observable messages) - { - return Observable.Create(observer => - { - var disposables = new CompositeDisposable(); - disposables.Add(messages.Subscribe(msg => - { - var result = client.TrySendAsText(msg); - observer.OnNext(result); - })); - - return disposables; - }); - } + internal static Payload ToPayload(this ReadOnlyMemory value, Encoding encoding, WebSocketMessageType type) + { + var maxByteCount = encoding.GetMaxByteCount(value.Length); + var rentedBuffer = ArrayPool.Shared.Rent(maxByteCount); + var actualBytes = encoding.GetBytes(value.Span, rentedBuffer); + return new Payload(rentedBuffer, actualBytes, type); } } \ No newline at end of file diff --git a/src/WebSocket.Rx/IReactiveWebSocketClient.cs b/src/WebSocket.Rx/IReactiveWebSocketClient.cs index 1463913..4178bf8 100644 --- a/src/WebSocket.Rx/IReactiveWebSocketClient.cs +++ b/src/WebSocket.Rx/IReactiveWebSocketClient.cs @@ -7,7 +7,7 @@ namespace WebSocket.Rx; public interface IReactiveWebSocketClient : IDisposable, IAsyncDisposable { Uri Url { get; set; } - Observable MessageReceived { get; } + Observable MessageReceived { get; } Observable ConnectionHappened { get; } Observable DisconnectionHappened { get; } Observable ErrorOccurred { get; } @@ -26,33 +26,31 @@ public interface IReactiveWebSocketClient : IDisposable, IAsyncDisposable Task StartOrFailAsync(CancellationToken cancellationToken = default); - Task StopAsync(WebSocketCloseStatus status, string statusDescription, CancellationToken cancellationToken = default); + Task StopAsync(WebSocketCloseStatus status, string statusDescription, + CancellationToken cancellationToken = default); - Task StopOrFailAsync(WebSocketCloseStatus status, string statusDescription, CancellationToken cancellationToken = default); + Task StopOrFailAsync(WebSocketCloseStatus status, string statusDescription, + CancellationToken cancellationToken = default); Task ReconnectAsync(CancellationToken cancellationToken = default); Task ReconnectOrFailAsync(CancellationToken cancellationToken = default); - Task SendInstantAsync(byte[] message, CancellationToken cancellationToken = default); + Task SendInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - Task SendInstantAsync(string message, CancellationToken cancellationToken = default); + Task SendInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - Task SendAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default); + Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - Task SendAsBinaryAsync(string message, CancellationToken cancellationToken = default); + Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - Task SendAsTextAsync(byte[] message, CancellationToken cancellationToken = default); + bool TrySend(ReadOnlyMemory message, WebSocketMessageType type); - Task SendAsTextAsync(string message, CancellationToken cancellationToken = default); + bool TrySend(ReadOnlyMemory message, WebSocketMessageType type); - bool TrySendAsBinary(string message); - - bool TrySendAsBinary(byte[] message); - - bool TrySendAsText(byte[] message); - - bool TrySendAsText(string message); - - void StreamFakeMessage(ReceivedMessage message); + void StreamFakeMessage(Message message); } \ No newline at end of file diff --git a/src/WebSocket.Rx/IReactiveWebSocketServer.cs b/src/WebSocket.Rx/IReactiveWebSocketServer.cs index a6305f3..946c8e4 100644 --- a/src/WebSocket.Rx/IReactiveWebSocketServer.cs +++ b/src/WebSocket.Rx/IReactiveWebSocketServer.cs @@ -14,70 +14,43 @@ public interface IReactiveWebSocketServer : IDisposable, IAsyncDisposable IReadOnlyDictionary ConnectedClients { get; } Observable ClientConnected { get; } Observable ClientDisconnected { get; } - Observable Messages { get; } + Observable Messages { get; } Observable ErrorOccurred { get; } Task StartAsync(CancellationToken cancellationToken = default); - Task StopAsync(WebSocketCloseStatus status, string statusDescription, CancellationToken cancellationToken = default); + Task StopAsync(WebSocketCloseStatus status, string statusDescription, + CancellationToken cancellationToken = default); - Task SendInstantAsync(Guid clientId, string message, CancellationToken cancellationToken = default); + Task SendInstantAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - Task SendInstantAsync(Guid clientId, byte[] message, CancellationToken cancellationToken = default); + Task SendInstantAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - Task SendAsBinaryAsync(Guid clientId, byte[] message, CancellationToken cancellationToken = default); + Task SendAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - Task SendAsBinaryAsync(Guid clientId, string message, CancellationToken cancellationToken = default); + Task SendAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); + + bool TrySend(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type); - Task SendAsTextAsync(Guid clientId, byte[] message, CancellationToken cancellationToken = default); + bool TrySend(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type); - Task SendAsTextAsync(Guid clientId, string message, CancellationToken cancellationToken = default); + Task BroadcastInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - bool TrySendAsBinary(Guid clientId, string message); + Task BroadcastInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - bool TrySendAsBinary(Guid clientId, byte[] data); + Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - bool TrySendAsText(Guid clientId, string message); + Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default); - bool TrySendAsText(Guid clientId, byte[] data); + bool TryBroadcast(ReadOnlyMemory message, WebSocketMessageType type); - Observable SendInstant(Guid clientId, Observable messages); - - Observable SendInstant(Guid clientId, Observable messages); - - Observable SendAsBinary(Guid clientId, Observable messages); - - Observable SendAsBinary(Guid clientId, Observable messages); - - Observable SendAsText(Guid clientId, Observable messages); - - Observable SendAsText(Guid clientId, Observable messages); - - Observable TrySendAsBinary(Guid clientId, Observable messages); - - Observable TrySendAsBinary(Guid clientId, Observable messages); - - Observable TrySendAsText(Guid clientId, Observable messages); - - Observable TrySendAsText(Guid clientId, Observable messages); - - Task BroadcastInstantAsync(byte[] message, CancellationToken cancellationToken = default); - - Task BroadcastInstantAsync(string message, CancellationToken cancellationToken = default); - - Task BroadcastAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default); - - Task BroadcastAsBinaryAsync(string message, CancellationToken cancellationToken = default); - - Task BroadcastAsTextAsync(byte[] message, CancellationToken cancellationToken = default); - - Task BroadcastAsTextAsync(string message, CancellationToken cancellationToken = default); - - bool TryBroadcastAsBinary(string message); - - bool TryBroadcastAsBinary(byte[] message); - - bool TryBroadcastAsText(byte[] message); - - bool TryBroadcastAsText(string message); + bool TryBroadcast(ReadOnlyMemory message, WebSocketMessageType type); } \ No newline at end of file diff --git a/src/WebSocket.Rx/Message.cs b/src/WebSocket.Rx/Message.cs new file mode 100644 index 0000000..b571d01 --- /dev/null +++ b/src/WebSocket.Rx/Message.cs @@ -0,0 +1,37 @@ +using System.Net.WebSockets; + +namespace WebSocket.Rx; + +public record Message +{ + private Message(ReadOnlyMemory? binary, ReadOnlyMemory? text, WebSocketMessageType type) + { + Binary = binary ?? new ReadOnlyMemory([]); + Text = text ?? new ReadOnlyMemory([]); + Type = type; + } + + public bool IsText => !Text.IsEmpty && Type is WebSocketMessageType.Text; + + public bool IsBinary => !Binary.IsEmpty && Type is WebSocketMessageType.Binary; + + public ReadOnlyMemory Text { get; } + + public ReadOnlyMemory Binary { get; } + + public WebSocketMessageType Type { get; } + + public override string ToString() + { + return Type is WebSocketMessageType.Text ? Text.ToString() : $"Type binary, length: {Binary.Length}"; + } + + public static Message Create(string message) + => Create(message.AsMemory()); + + public static Message Create(ReadOnlyMemory message) + => new(null, message, WebSocketMessageType.Text); + + public static Message Create(ReadOnlyMemory message) + => new(message, null, WebSocketMessageType.Binary); +} \ No newline at end of file diff --git a/src/WebSocket.Rx/Metadata.cs b/src/WebSocket.Rx/Metadata.cs index 5096b9a..5f3a7f2 100644 --- a/src/WebSocket.Rx/Metadata.cs +++ b/src/WebSocket.Rx/Metadata.cs @@ -2,4 +2,4 @@ namespace WebSocket.Rx; -public record Metadata(Guid Id, IPAddress Address, int Port); \ No newline at end of file +public record Metadata(Guid Id, IPAddress? Address = null, int? Port = null); \ No newline at end of file diff --git a/src/WebSocket.Rx/Payload.cs b/src/WebSocket.Rx/Payload.cs index 1c768d2..8fca904 100644 --- a/src/WebSocket.Rx/Payload.cs +++ b/src/WebSocket.Rx/Payload.cs @@ -1,5 +1,34 @@ +using System.Buffers; using System.Net.WebSockets; namespace WebSocket.Rx; -public sealed record Payload(byte[] Data, WebSocketMessageType Type); \ No newline at end of file +public readonly struct Payload : IDisposable +{ + private readonly byte[]? _rentedBuffer; + + public ReadOnlyMemory Data { get; } + public WebSocketMessageType Type { get; } + + public Payload(ReadOnlyMemory data, WebSocketMessageType messageType) + { + Data = data; + Type = messageType; + _rentedBuffer = null; + } + + public Payload(byte[] rentedBuffer, int length, WebSocketMessageType messageType) + { + Data = rentedBuffer.AsMemory(0, length); + Type = messageType; + _rentedBuffer = rentedBuffer; + } + + public void Dispose() + { + if (_rentedBuffer is not null) + { + ArrayPool.Shared.Return(_rentedBuffer); + } + } +} \ No newline at end of file diff --git a/src/WebSocket.Rx/ReactiveWebSocketClient.cs b/src/WebSocket.Rx/ReactiveWebSocketClient.cs index 5bbd2a2..40339f0 100644 --- a/src/WebSocket.Rx/ReactiveWebSocketClient.cs +++ b/src/WebSocket.Rx/ReactiveWebSocketClient.cs @@ -20,14 +20,14 @@ public class ReactiveWebSocketClient : IReactiveWebSocketClient protected bool IsReconnecting; - protected readonly Subject MessageReceivedSource = new(); + protected readonly Subject MessageReceivedSource = new(); protected readonly Subject ConnectionHappenedSource = new(); protected readonly Subject DisconnectionHappenedSource = new(); protected readonly Subject ErrorOccurredSource = new(); - protected ChannelWriter SendWriter => SendChannel.Writer; + internal ChannelWriter SendWriter => SendChannel.Writer; - protected Channel SendChannel = + internal Channel SendChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); protected Task? SendLoopTask; @@ -53,7 +53,7 @@ public ReactiveWebSocketClient(Uri url, RecyclableMemoryStreamManager? memoryStr public Encoding MessageEncoding { get; set; } = Encoding.UTF8; public ClientWebSocket NativeClient { get; private set; } = new(); - public Observable MessageReceived => MessageReceivedSource.AsObservable(); + public Observable MessageReceived => MessageReceivedSource.AsObservable(); public Observable ConnectionHappened => ConnectionHappenedSource.AsObservable(); public Observable DisconnectionHappened => DisconnectionHappenedSource.AsObservable(); public Observable ErrorOccurred => ErrorOccurredSource.AsObservable(); @@ -327,7 +327,10 @@ protected async Task SendLoopAsync(CancellationToken ct) break; } - await SendAsync(payload.Data, payload.Type, true, ct); + using (payload) + { + await SendAsync(payload.Data, payload.Type, true, ct); + } } } catch (ChannelClosedException) @@ -349,7 +352,8 @@ protected async Task SendLoopAsync(CancellationToken ct) } } - protected virtual async Task SendAsync(byte[] data, WebSocketMessageType type, bool endOfMessage, + protected virtual async Task SendAsync(ReadOnlyMemory data, WebSocketMessageType type, + bool endOfMessage, CancellationToken cancellationToken = default) { if (NativeClient.State is not WebSocketState.Open) return false; @@ -399,16 +403,15 @@ await NativeClient } var messageBytes = ms.GetBuffer().AsMemory(0, (int)ms.Length); - ReceivedMessage message; + Message message; if (IsTextMessageConversionEnabled && result.MessageType == WebSocketMessageType.Text) { - var text = MessageEncoding.GetString(messageBytes.Span); - message = ReceivedMessage.TextMessage(text); + message = Message.Create(MessageEncoding.GetString(messageBytes.Span).AsMemory()); } else { - message = ReceivedMessage.BinaryMessage(messageBytes.ToArray()); + message = Message.Create(messageBytes); } MessageReceivedSource.OnNext(message); @@ -461,20 +464,22 @@ await NativeClient #region Send Methods - public async Task SendInstantAsync(string message, CancellationToken cancellationToken = default) + public async Task SendInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) { - if (string.IsNullOrEmpty(message)) + if (message.IsEmpty) { return false; } + var maxByteCount = MessageEncoding.GetMaxByteCount(message.Length); + var rent = ArrayPool.Shared.Rent(maxByteCount); + try { - using var connectedCts = - CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, - MainCts?.Token ?? CancellationToken.None); - return await SendAsync(MessageEncoding.GetBytes(message), WebSocketMessageType.Binary, true, - cancellationToken) + var actualBytes = MessageEncoding.GetBytes(message.Span, rent); + + return await SendAsync(rent.AsMemory(0, actualBytes), type, true, cancellationToken) .ConfigureAwait(false); } catch (Exception ex) @@ -482,22 +487,23 @@ public async Task SendInstantAsync(string message, CancellationToken cance ErrorOccurredSource.OnNext(new ErrorOccurred(ErrorSource.Send, ex)); return false; } + finally + { + ArrayPool.Shared.Return(rent); + } } - public async Task SendInstantAsync(byte[] message, CancellationToken cancellationToken = default) + public async Task SendInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) { - if (message.Length == 0) + if (message.IsEmpty) { return false; } try { - using var connectedCts = - CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, - MainCts?.Token ?? CancellationToken.None); - return await SendAsync(message, WebSocketMessageType.Binary, true, connectedCts.Token) - .ConfigureAwait(false); + return await SendAsync(message, type, true, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -506,79 +512,33 @@ public async Task SendInstantAsync(byte[] message, CancellationToken cance } } - public async Task SendAsBinaryAsync(string message, CancellationToken cancellationToken = default) - { - if (string.IsNullOrEmpty(message)) - { - return false; - } + public async Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) + => !message.IsEmpty && await WriteAsync(message.ToPayload(MessageEncoding, type), cancellationToken); - return await SendAsBinaryAsync(MessageEncoding.GetBytes(message), cancellationToken); - } + public async Task SendAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) + => !message.IsEmpty && await WriteAsync(new Payload(message, type), cancellationToken); - public async Task SendAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default) - { - if (!IsRunning || message.Length == 0) - { - return false; - } + public bool TrySend(ReadOnlyMemory message, WebSocketMessageType type) + => !message.IsEmpty && TryWrite(message.ToPayload(MessageEncoding, type)); - await SendWriter.WriteAsync(new Payload(message, WebSocketMessageType.Binary), cancellationToken); - return true; - } + public bool TrySend(ReadOnlyMemory message, WebSocketMessageType type) + => !message.IsEmpty && TryWrite(new Payload(message, type)); - public async Task SendAsTextAsync(string message, CancellationToken cancellationToken = default) + private async Task WriteAsync(Payload payload, CancellationToken cancellationToken = default) { - if (string.IsNullOrEmpty(message)) - { - return false; - } - - return await SendAsTextAsync(MessageEncoding.GetBytes(message), cancellationToken); - } - - public async Task SendAsTextAsync(byte[] message, CancellationToken cancellationToken = default) - { - if (!IsRunning || message.Length == 0) - { - return false; - } - - await SendWriter.WriteAsync(new Payload(message, WebSocketMessageType.Text), cancellationToken); + if (!IsRunning) return false; + await SendWriter.WriteAsync(payload, cancellationToken); return true; } - public bool TrySendAsBinary(string message) - { - return !string.IsNullOrEmpty(message) && TrySendAsBinary(MessageEncoding.GetBytes(message)); - } - - public bool TrySendAsBinary(byte[] message) + private bool TryWrite(Payload payload) { - if (!IsRunning || message.Length == 0) - { - return false; - } - - return SendWriter.TryWrite(new Payload(message, WebSocketMessageType.Binary)); - } - - public bool TrySendAsText(string message) - { - return !string.IsNullOrEmpty(message) && TrySendAsText(MessageEncoding.GetBytes(message)); - } - - public bool TrySendAsText(byte[] message) - { - if (!IsRunning || message.Length == 0) - { - return false; - } - - return SendWriter.TryWrite(new Payload(message, WebSocketMessageType.Text)); + return IsRunning && SendWriter.TryWrite(payload); } - public void StreamFakeMessage(ReceivedMessage message) + public void StreamFakeMessage(Message message) { MessageReceivedSource.OnNext(message); } diff --git a/src/WebSocket.Rx/ReactiveWebSocketServer.cs b/src/WebSocket.Rx/ReactiveWebSocketServer.cs index d95164a..0d2cac9 100644 --- a/src/WebSocket.Rx/ReactiveWebSocketServer.cs +++ b/src/WebSocket.Rx/ReactiveWebSocketServer.cs @@ -31,7 +31,7 @@ public async ValueTask DisposeAsync() private readonly Subject _clientConnectedSource = new(); private readonly Subject _clientDisconnectedSource = new(); - private readonly Subject _messageReceivedSource = new(); + private readonly Subject _messageReceivedSource = new(); private readonly Subject _errorOccurredSource = new(); private CancellationTokenSource? _mainCts; @@ -58,7 +58,7 @@ public IReadOnlyDictionary ConnectedClients public Observable ClientConnected => _clientConnectedSource.AsObservable(); public Observable ClientDisconnected => _clientDisconnectedSource.AsObservable(); - public Observable Messages => _messageReceivedSource.AsObservable(); + public Observable Messages => _messageReceivedSource.AsObservable(); public Observable ErrorOccurred => _errorOccurredSource.AsObservable(); #endregion @@ -203,7 +203,7 @@ private async Task HandleWebSocketAsync(HttpListenerContext context, Metadata me var disposables = new CompositeDisposable { socket.MessageReceived - .Select(x => new ServerReceivedMessage(metadata, x)) + .Select(x => new ServerMessage(metadata, x)) .Subscribe(msg => { lock (_messageReceivedSource) @@ -254,214 +254,96 @@ private async Task HandleWebSocketAsync(HttpListenerContext context, Metadata me #region Send Client Methods - public async Task SendInstantAsync(Guid clientId, string message, + public async Task SendInstantAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendInstantAsync(message, cancellationToken); + await client.Socket.SendInstantAsync(message, type, cancellationToken); return true; } - public async Task SendInstantAsync(Guid clientId, byte[] message, + public async Task SendInstantAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendInstantAsync(message, cancellationToken); + await client.Socket.SendInstantAsync(message, type, cancellationToken); return true; } - public async Task SendAsBinaryAsync(Guid clientId, byte[] message, + public async Task SendAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendAsBinaryAsync(message, cancellationToken); + await client.Socket.SendAsync(message, type, cancellationToken); return true; } - public async Task SendAsBinaryAsync(Guid clientId, string message, + public async Task SendAsync(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type, CancellationToken cancellationToken = default) { if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendAsBinaryAsync(message, cancellationToken); + await client.Socket.SendAsync(message, type, cancellationToken); return true; } - public async Task SendAsTextAsync(Guid clientId, byte[] message, - CancellationToken cancellationToken = default) - { - if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendAsTextAsync(message, cancellationToken); - return true; - } - - public async Task SendAsTextAsync(Guid clientId, string message, - CancellationToken cancellationToken = default) - { - if (!_clients.TryGetValue(clientId, out var client)) return false; - await client.Socket.SendAsTextAsync(message, cancellationToken); - return true; - } - - public bool TrySendAsBinary(Guid clientId, string message) - { - if (!_clients.TryGetValue(clientId, out var client)) return false; - client.Socket.TrySendAsBinary(message); - return true; - } - - public bool TrySendAsBinary(Guid clientId, byte[] data) - { - if (!_clients.TryGetValue(clientId, out var client)) return false; - client.Socket.TrySendAsBinary(data); - return true; - } - - public bool TrySendAsText(Guid clientId, string message) + public bool TrySend(Guid clientId, ReadOnlyMemory message, WebSocketMessageType type) { if (!_clients.TryGetValue(clientId, out var client)) return false; - client.Socket.TrySendAsText(message); + client.Socket.TrySend(message, type); return true; } - public bool TrySendAsText(Guid clientId, byte[] data) + public bool TrySend(Guid clientId, ReadOnlyMemory data, WebSocketMessageType type) { if (!_clients.TryGetValue(clientId, out var client)) return false; - client.Socket.TrySendAsText(data); + client.Socket.TrySend(data, type); return true; } - public Observable SendInstant(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.SendInstant(messages); - } - - public Observable SendInstant(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.SendInstant(messages); - } - - public Observable SendAsBinary(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.SendAsBinary(messages); - } - - public Observable SendAsBinary(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.SendAsBinary(messages); - } - - public Observable SendAsText(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.SendAsText(messages); - } - - public Observable SendAsText(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.SendAsText(messages); - } - - public Observable TrySendAsBinary(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.TrySendAsBinary(messages); - } - - public Observable TrySendAsBinary(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.TrySendAsBinary(messages); - } - - public Observable TrySendAsText(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.TrySendAsText(messages); - } - - public Observable TrySendAsText(Guid clientId, Observable messages) - { - return !_clients.TryGetValue(clientId, out var client) - ? Observable.Empty() - : client.Socket.TrySendAsText(messages); - } - #endregion #region Broadcast Methods - public async Task BroadcastInstantAsync(byte[] message, CancellationToken cancellationToken = default) - { - var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendInstantAsync(message, ct), x => x, cancellationToken); - } - - public async Task BroadcastInstantAsync(string message, CancellationToken cancellationToken = default) - { - var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendInstantAsync(message, ct), x => x, cancellationToken); - } - - public async Task BroadcastAsBinaryAsync(byte[] message, CancellationToken cancellationToken = default) - { - var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendAsBinaryAsync(message, ct), x => x, cancellationToken); - } - - public async Task BroadcastAsBinaryAsync(string message, CancellationToken cancellationToken = default) - { - var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendAsBinaryAsync(message, ct), x => x, cancellationToken); - } - - public async Task BroadcastAsTextAsync(byte[] message, CancellationToken cancellationToken = default) + public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendAsTextAsync(message, ct), x => x, cancellationToken); + return await sockets.Async((client, ct) => client.SendInstantAsync(message, type, ct), x => x, + cancellationToken); } - public async Task BroadcastAsTextAsync(string message, CancellationToken cancellationToken = default) + public async Task BroadcastInstantAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return await sockets.Async((client, ct) => client.SendAsTextAsync(message, ct), x => x, cancellationToken); + return await sockets.Async((client, ct) => client.SendInstantAsync(message, type, ct), x => x, + cancellationToken); } - public bool TryBroadcastAsBinary(string message) + public async Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return sockets.Select(x => x.TrySendAsBinary(message)).All(x => x); + return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken); } - public bool TryBroadcastAsBinary(byte[] message) + public async Task BroadcastAsync(ReadOnlyMemory message, WebSocketMessageType type, + CancellationToken cancellationToken = default) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return sockets.Select(x => x.TrySendAsBinary(message)).All(x => x); + return await sockets.Async((client, ct) => client.SendAsync(message, type, ct), x => x, cancellationToken); } - public bool TryBroadcastAsText(byte[] message) + public bool TryBroadcast(ReadOnlyMemory message, WebSocketMessageType type) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return sockets.Select(x => x.TrySendAsText(message)).All(x => x); + return sockets.Select(x => x.TrySend(message, type)).All(x => x); } - public bool TryBroadcastAsText(string message) + public bool TryBroadcast(ReadOnlyMemory message, WebSocketMessageType type) { var sockets = _clients.Values.Select(x => x.Socket).ToArray(); - return sockets.Select(x => x.TrySendAsText(message)).All(x => x); + return sockets.Select(x => x.TrySend(message, type)).All(x => x); } #endregion @@ -616,7 +498,8 @@ public void Start() ReceiveLoopTask = Task.Run(() => ReceiveLoopAdapterAsync(_adapterCts.Token), CancellationToken.None); } - protected override async Task SendAsync(byte[] data, WebSocketMessageType type, bool endOfMessage, + protected override async Task SendAsync(ReadOnlyMemory data, WebSocketMessageType type, + bool endOfMessage, CancellationToken cancellationToken = default) { if (NativeServerSocket.State is not WebSocketState.Open) return false; @@ -661,8 +544,8 @@ await NativeServerSocket var messageBytes = ms.GetBuffer().AsMemory(0, (int)ms.Length); var message = IsTextMessageConversionEnabled && result.MessageType == WebSocketMessageType.Text - ? ReceivedMessage.TextMessage(MessageEncoding.GetString(messageBytes.Span)) - : ReceivedMessage.BinaryMessage(messageBytes.ToArray()); + ? Message.Create(MessageEncoding.GetString(messageBytes.Span).AsMemory()) + : Message.Create(messageBytes); MessageReceivedSource.OnNext(message); } diff --git a/src/WebSocket.Rx/ReceivedMessage.cs b/src/WebSocket.Rx/ReceivedMessage.cs deleted file mode 100644 index fc06fce..0000000 --- a/src/WebSocket.Rx/ReceivedMessage.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System.Net.WebSockets; - -namespace WebSocket.Rx; - -public class ReceivedMessage -{ - private ReceivedMessage(byte[]? binary, string? text, WebSocketMessageType messageType) - { - Binary = binary; - Text = text; - MessageType = messageType; - } - - public bool IsBinary => Binary?.Length > 0 && MessageType is WebSocketMessageType.Binary; - - public bool IsText => Text?.Length > 0 && MessageType is WebSocketMessageType.Text; - - public string? Text { get; } - - public byte[]? Binary { get; } - - public WebSocketMessageType MessageType { get; } - - public override string ToString() - { - if (MessageType == WebSocketMessageType.Text) - { - return Text ?? string.Empty; - } - - return $"Type binary, length: {Binary?.Length}"; - } - - public static ReceivedMessage TextMessage(string? data) - => new(null, data, WebSocketMessageType.Text); - - public static ReceivedMessage BinaryMessage(byte[]? data) - => new(data, null, WebSocketMessageType.Binary); -} \ No newline at end of file diff --git a/src/WebSocket.Rx/ServerMessage.cs b/src/WebSocket.Rx/ServerMessage.cs new file mode 100644 index 0000000..a390e92 --- /dev/null +++ b/src/WebSocket.Rx/ServerMessage.cs @@ -0,0 +1,3 @@ +namespace WebSocket.Rx; + +public record ServerMessage(Metadata Metadata, Message Message); \ No newline at end of file diff --git a/src/WebSocket.Rx/ServerReceivedMessage.cs b/src/WebSocket.Rx/ServerReceivedMessage.cs deleted file mode 100644 index cb2b90c..0000000 --- a/src/WebSocket.Rx/ServerReceivedMessage.cs +++ /dev/null @@ -1,3 +0,0 @@ -namespace WebSocket.Rx; - -public record ServerReceivedMessage(Metadata Metadata, ReceivedMessage Message); \ No newline at end of file