Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose CTS when the Stream is claimed or timeout occurs #59652

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions src/Components/Server/src/Circuits/RemoteJSRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal partial class RemoteJSRuntime : JSRuntime
private readonly CircuitOptions _options;
private readonly ILogger<RemoteJSRuntime> _logger;
private CircuitClientProxy _clientProxy;
private readonly ConcurrentDictionary<long, DotNetStreamReference> _pendingDotNetToJSStreams = new();
private readonly ConcurrentDictionary<long, CancelableDotNetStreamReference> _pendingDotNetToJSStreams = new();
private bool _permanentlyDisconnected;
private readonly long _maximumIncomingBytes;
private int _byteArraysToBeRevivedTotalBytes;
Expand Down Expand Up @@ -152,21 +152,27 @@ protected override void ReceiveByteArray(int id, byte[] data)

protected override async Task TransmitStreamAsync(long streamId, DotNetStreamReference dotNetStreamReference)
{
if (!_pendingDotNetToJSStreams.TryAdd(streamId, dotNetStreamReference))
var cancelableStreamReference = new CancelableDotNetStreamReference(dotNetStreamReference);
if (!_pendingDotNetToJSStreams.TryAdd(streamId, cancelableStreamReference))
{
throw new ArgumentException($"The stream {streamId} is already pending.");
}

// SignalR only supports streaming being initiated from the JS side, so we have to ask it to
// start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up
// and discard it.
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;
cancellationToken.Register(() =>
CancellationTokenSource cancellationTokenSource = new(TimeSpan.FromSeconds(10));

// Store CTS to dispose later.
cancelableStreamReference.CancellationTokenSource = cancellationTokenSource;

cancellationTokenSource.Token.Register(() =>
{
// If by now the stream hasn't been claimed for sending, stop tracking it
if (_pendingDotNetToJSStreams.TryRemove(streamId, out var timedOutStream) && !timedOutStream.LeaveOpen)
if (_pendingDotNetToJSStreams.TryRemove(streamId, out var timedOutCancelableStreamReference))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing code appears to prefer to check LeaveOpen and dispose the stream directly rather than just calling Dispose (which does the same check). I'm not familiar with the history but is there a guiding style principle here @javiercn

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there was any guiding principle here, I think this is a matter of whomever wrote the code not taking into account that DotnetStreamRerefence already did this

{
timedOutStream.Stream.Dispose();
timedOutCancelableStreamReference.StreamReference.Dispose();
timedOutCancelableStreamReference.CancellationTokenSource?.Dispose();
}
});

Expand All @@ -175,8 +181,13 @@ protected override async Task TransmitStreamAsync(long streamId, DotNetStreamRef

public bool TryClaimPendingStreamForSending(long streamId, out DotNetStreamReference pendingStream)
{
if (_pendingDotNetToJSStreams.TryRemove(streamId, out pendingStream))
if (_pendingDotNetToJSStreams.TryRemove(streamId, out var cancelableStreamReference))
{
pendingStream = cancelableStreamReference.StreamReference;

// Dispose CTS for claimed Stream.
cancelableStreamReference.CancellationTokenSource?.Dispose();

return true;
}

Expand All @@ -193,6 +204,18 @@ public void MarkPermanentlyDisconnected()
protected override async Task<Stream> ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, CancellationToken cancellationToken = default)
=> await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, cancellationToken);

private class CancelableDotNetStreamReference
{
public CancelableDotNetStreamReference(DotNetStreamReference streamReference)
{
StreamReference = streamReference;
}

public CancellationTokenSource? CancellationTokenSource { get; set; }

public DotNetStreamReference StreamReference { get; }
}

public static partial class Log
{
[LoggerMessage(1, LogLevel.Debug, "Begin invoke JS interop '{AsyncHandle}': '{FunctionIdentifier}'", EventName = "BeginInvokeJS")]
Expand Down
Loading