Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 213 additions & 114 deletions .slopwatch/baseline.json

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

**A powerful .NET library for reactive WebSocket communication using R3 (Reactive Extensions)**

[![NuGet](https://img.shields.io/nuget/v/WebSocket.Rx.svg)](https://www.nuget.org/packages/WebSocket.Rx/)
[![Build Status](https://img.shields.io/github/actions/workflow/status/st0o0/WebSocket.Rx/build-and-release.yml?branch=main)](https://github.com/st0o0/WebSocket.Rx/actions)
[![License](https://img.shields.io/github/license/st0o0/WebSocket.Rx)](LICENSE)
[![Downloads](https://img.shields.io/nuget/dt/WebSocket.Rx.svg)](https://www.nuget.org/packages/WebSocket.Rx/)

[![NuGet](https://img.shields.io/nuget/v/WebSocket.Rx.svg?style=flat-square)](https://www.nuget.org/packages/WebSocket.Rx/)
[![License](https://img.shields.io/github/license/st0o0/WebSocket.Rx?style=flat-square)](LICENSE)
[![Downloads](https://img.shields.io/nuget/dt/WebSocket.Rx.svg?style=flat-square)](https://www.nuget.org/packages/WebSocket.Rx/)
[![Dotnet](https://img.shields.io/badge/dotnet-10.0-5027d5?style=flat-square)](https://dotnet.microsoft.com)
</div>

---
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using WebSocket.Rx.Tests.Internal;
using WebSocket.Rx.IntegrationTests.Internal;

namespace WebSocket.Rx.Tests;
namespace WebSocket.Rx.IntegrationTests;

public class IntegrationDisposeTests(ITestOutputHelper output) : ReactiveWebSocketServerTestBase(output)
{
Expand All @@ -9,8 +9,9 @@ public async Task Integration_ServerAndClient_BothDispose_ShouldCleanupProperly(
{
// Arrange
var client = new ReactiveWebSocketClient(new Uri(WebSocketUrl));
var connectionTask = WaitForEventAsync(Server.ClientConnected);
await client.StartAsync(TestContext.Current.CancellationToken);
await Task.Delay(50, TestContext.Current.CancellationToken);
await connectionTask;

// Act
await client.DisposeAsync();
Expand All @@ -29,11 +30,12 @@ public async Task Integration_MultipleClientsAndServer_AllDispose_ShouldCleanupP
for (var i = 0; i < 5; i++)
{
var client = new ReactiveWebSocketClient(new Uri(WebSocketUrl));
var connectionTask = WaitForEventAsync(Server.ClientConnected);
await client.StartAsync(TestContext.Current.CancellationToken);
await connectionTask;
clients.Add(client);
}

await Task.Delay(50, TestContext.Current.CancellationToken);
Assert.Equal(5, Server.ClientCount);

// Act
Expand All @@ -59,8 +61,9 @@ public async Task Integration_DisposeUnderLoad_ShouldHandleGracefully()
{
// Arrange
var client = new ReactiveWebSocketClient(new Uri(WebSocketUrl));
var connectionTask = WaitForEventAsync(Server.ClientConnected);
await client.StartAsync(TestContext.Current.CancellationToken);
await Task.Delay(50, TestContext.Current.CancellationToken);
await connectionTask;

var sendTask = Task.Run(async () =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace WebSocket.Rx.Tests.Internal;
namespace WebSocket.Rx.IntegrationTests.Internal;

public abstract class ReactiveWebSocketClientTestBase(ITestOutputHelper output) : TestBase(output), IAsyncLifetime
{
Expand All @@ -19,5 +19,6 @@ public async ValueTask DisposeAsync()
}

await Server.DisposeAsync();
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Net.WebSockets;
using System.Text;

namespace WebSocket.Rx.Tests.Internal;
namespace WebSocket.Rx.IntegrationTests.Internal;

public abstract class ReactiveWebSocketServerTestBase(ITestOutputHelper output) : TestBase(output), IAsyncLifetime
{
Expand Down Expand Up @@ -37,6 +37,7 @@ public async ValueTask InitializeAsync()
public async ValueTask DisposeAsync()
{
await Server.DisposeAsync();
await Task.Delay(TimeSpan.FromMilliseconds(100));
}

protected static int GetAvailablePort()
Expand All @@ -56,8 +57,15 @@ protected async Task<ClientWebSocket> ConnectClientAsync(CancellationToken ct =
protected static async Task<string> ReceiveTextAsync(ClientWebSocket client, CancellationToken ct = default)
{
var buffer = new byte[1024 * 64];
var result = await client.ReceiveAsync(buffer, ct);
return Encoding.UTF8.GetString(buffer, 0, result.Count);
using var ms = new MemoryStream();
WebSocketReceiveResult result;
do
{
result = await client.ReceiveAsync(buffer, ct);
ms.Write(buffer, 0, result.Count);
} while (!result.EndOfMessage);

return Encoding.UTF8.GetString(ms.ToArray());
}

protected static async Task SendTextAsync(ClientWebSocket client, string message, CancellationToken ct = default)
Expand Down
140 changes: 140 additions & 0 deletions src/WebSocket.Rx.IntegrationTests/Internal/TestBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
using R3;

namespace WebSocket.Rx.IntegrationTests.Internal;

public abstract class TestBase(ITestOutputHelper output)
{
protected readonly ITestOutputHelper Output = output;
protected const int DefaultTimeoutMs = 30000;

protected async Task<T> WaitForEventAsync<T>(
Observable<T> observable,
Func<T, bool>? predicate = null,
int? timeoutMs = null)
{
var timeout = timeoutMs ?? DefaultTimeoutMs;
var tcs = new TaskCompletionSource<T>();
using var cts = new CancellationTokenSource(timeout);
await using var registration = cts.Token.Register(() =>
{
var msg = $"Event {typeof(T).Name} not received within {timeout}ms";
try
{
Output.WriteLine($"[TIMEOUT] {msg}");
}
catch
{
// Ignored if test is already finished
}

tcs.TrySetException(new TimeoutException(msg));
});

using var subscription = observable.Subscribe(value =>
{
if (predicate == null || predicate(value))
{
tcs.TrySetResult(value);
}
});

return await tcs.Task;
}

protected async Task WaitUntilAsync<T>(
Observable<T> observable,
Func<bool> condition,
int? timeoutMs = null)
{
var timeout = timeoutMs ?? DefaultTimeoutMs;
var tcs = new TaskCompletionSource<bool>();
using var cts = new CancellationTokenSource(timeout);
await using var registration = cts.Token.Register(() =>
{
var msg = $"Condition not met within {timeout}ms";
try
{
Output.WriteLine($"[TIMEOUT] {msg}");
}
catch
{
// noop
}

tcs.TrySetException(new TimeoutException(msg));
});

if (condition()) return;

// Fallback polling for robustness
using var intervalSubscription = Observable.Interval(TimeSpan.FromMilliseconds(50)).Subscribe(_ =>
{
try
{
if (condition()) tcs.TrySetResult(true);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});

IDisposable? eventSubscription = null;
try
{
eventSubscription = observable.Subscribe(_ =>
{
try
{
if (condition()) tcs.TrySetResult(true);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
}
catch (ObjectDisposedException)
{
// noop
}

using (eventSubscription)
{
if (condition())
{
tcs.TrySetResult(true);
return;
}

await tcs.Task;
}
}

protected async Task WaitForConditionAsync(
Func<bool> condition,
TimeSpan? timeout = null,
string? errorMessage = null)
{
timeout ??= TimeSpan.FromMilliseconds(DefaultTimeoutMs);
var endTime = DateTime.UtcNow.Add(timeout.Value);
var count = 0;

while (!condition() && DateTime.UtcNow < endTime)
{
await Task.Delay(10);
count++;
if (count % 100 == 0) // Log every 1 second
{
Output.WriteLine($"Still waiting for condition... ({count * 10}ms elapsed)");
}
}

if (!condition())
{
var msg = errorMessage ?? $"Condition was not met within {timeout.Value.TotalSeconds}s";
Output.WriteLine($"[TIMEOUT] {msg}");
throw new TimeoutException(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Net.WebSockets;
using System.Text;

namespace WebSocket.Rx.Tests.Internal;
namespace WebSocket.Rx.IntegrationTests.Internal;

public class WebSocketTestServer(int? port = null) : IAsyncDisposable
{
Expand All @@ -16,6 +16,7 @@ public class WebSocketTestServer(int? port = null) : IAsyncDisposable
public int Port { get; private set; } = port ?? 0;
public string Url => $"http://127.0.0.1:{Port}/";
public string WebSocketUrl => $"ws://127.0.0.1:{Port}/";
public int ClientCount => _clients.Count(c => c.State == WebSocketState.Open);
public event Action<string>? OnMessageReceived;
public event Action<byte[]>? OnBytesReceived;

Expand Down Expand Up @@ -145,9 +146,9 @@ await webSocket.CloseAsync(
}
}
}
catch (Exception ex)
catch (Exception)
{
_ = ex;
// noop
}
}

Expand All @@ -167,9 +168,9 @@ await client.SendAsync(
CancellationToken.None
);
}
catch (Exception ex)
catch (Exception)
{
_ = ex;
// noop
}
}
}
Expand All @@ -188,9 +189,9 @@ await client.SendAsync(
CancellationToken.None
);
}
catch (Exception ex)
catch (Exception)
{
_ = ex;
// noop
}
}
}
Expand All @@ -207,9 +208,9 @@ public async Task DisconnectAllAsync()
client.Abort();
}
}
catch (Exception ex)
catch (Exception)
{
_ = ex;
// noop
}
});

Expand All @@ -221,9 +222,9 @@ public async Task DisconnectAllAsync()
{
client.Dispose();
}
catch (Exception ex)
catch (Exception)
{
_ = ex;
// noop
}
}

Expand All @@ -242,10 +243,40 @@ private static int GetAvailablePort()
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
_httpListener.Stop();
_httpListener.Close();

try
{
await DisconnectAllAsync();
}
catch
{
// noop
}

try
{
_httpListener.Stop();
_httpListener.Close();
}
catch
{
// noop
}

_cts.Dispose();
await (_serverTask?.WaitAsync(TimeSpan.FromSeconds(2)) ?? Task.CompletedTask);

if (_serverTask != null)
{
try
{
await _serverTask.WaitAsync(TimeSpan.FromSeconds(5));
}
catch
{
// noop
}
}

GC.SuppressFinalize(this);
}
}
Loading
Loading