Skip to content

Commit

Permalink
Revert backward breaking change from v2.9
Browse files Browse the repository at this point in the history
The new feature added in #536 that communicates channel failures to the remote party also caused channels to report failure if their owner `MultiplexingStream` was disposed of before the channel was. This broke several tests in the vs-servicehub repo and could theoretically break shipping code as well.
In this change I hide this particular behavioral change behind a setting that requires opt-in.
  • Loading branch information
AArnott committed Jun 20, 2023
1 parent 48cd9f0 commit 0ee2425
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 26 deletions.
20 changes: 20 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class Options
/// </summary>
private bool startSuspended;

/// <summary>
/// Backing field for the <see cref="FaultOpenChannelsOnStreamDisposal"/> property.
/// </summary>
private bool faultOpenChannelsOnStreamDisposal;

/// <summary>
/// Initializes a new instance of the <see cref="Options"/> class.
/// </summary>
Expand All @@ -83,6 +88,7 @@ public Options(Options copyFrom)
this.defaultChannelTraceSourceFactory = copyFrom.defaultChannelTraceSourceFactory;
this.defaultChannelTraceSourceFactoryWithQualifier = copyFrom.defaultChannelTraceSourceFactoryWithQualifier;
this.startSuspended = copyFrom.startSuspended;
this.faultOpenChannelsOnStreamDisposal = copyFrom.faultOpenChannelsOnStreamDisposal;
this.SeededChannels = copyFrom.SeededChannels.ToList();
}

Expand Down Expand Up @@ -226,6 +232,20 @@ public bool StartSuspended
/// </remarks>
public IList<ChannelOptions> SeededChannels { get; private set; }

/// <summary>
/// Gets or sets a value indicating whether any open channels should be faulted (i.e. their <see cref="Channel.Completion"/> task will be faulted)
/// when the <see cref="MultiplexingStream"/> is disposed.
/// </summary>
public bool FaultOpenChannelsOnStreamDisposal
{
get => this.faultOpenChannelsOnStreamDisposal;
set
{
this.ThrowIfFrozen();
this.faultOpenChannelsOnStreamDisposal = value;
}
}

/// <summary>
/// Gets a value indicating whether this instance is frozen.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/Nerdbank.Streams/MultiplexingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public partial class MultiplexingStream : IDisposableObservable, System.IAsyncDi
/// </summary>
private readonly int protocolMajorVersion;

/// <summary>
/// A value indicating whether any open channels should be faulted (i.e. their <see cref="Channel.Completion"/> task will be faulted)
/// when the <see cref="MultiplexingStream"/> is disposed.
/// </summary>
private readonly bool faultOpenChannelsOnStreamDisposal;

/// <summary>
/// The last number assigned to a channel.
/// Each use of this should increment by two, if <see cref="isOdd"/> has a value.
Expand Down Expand Up @@ -131,6 +137,7 @@ private MultiplexingStream(Formatter formatter, bool? isOdd, Options options)
}

this.TraceSource = options.TraceSource;
this.faultOpenChannelsOnStreamDisposal = options.FaultOpenChannelsOnStreamDisposal;

this.DefaultChannelTraceSourceFactory =
options.DefaultChannelTraceSourceFactoryWithQualifier
Expand Down Expand Up @@ -689,7 +696,7 @@ public async ValueTask DisposeAsync()
{
foreach (KeyValuePair<QualifiedChannelId, Channel> entry in this.openChannels)
{
entry.Value.Dispose(new ObjectDisposedException(nameof(MultiplexingStream)));
entry.Value.Dispose(this.faultOpenChannelsOnStreamDisposal ? new ObjectDisposedException(nameof(MultiplexingStream)) : null);
}

foreach (KeyValuePair<string, Queue<TaskCompletionSource<Channel>>> entry in this.acceptingChannels)
Expand Down
2 changes: 2 additions & 0 deletions src/Nerdbank.Streams/netstandard2.0/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Nerdbank.Streams.BufferWriterExtensions
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.get -> bool
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.set -> void
Nerdbank.Streams.ReadOnlySequenceExtensions
Nerdbank.Streams.StreamPipeReader
Nerdbank.Streams.StreamPipeReader.Read() -> System.IO.Pipelines.ReadResult
Expand Down
2 changes: 2 additions & 0 deletions src/Nerdbank.Streams/netstandard2.1/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Nerdbank.Streams.BufferWriterExtensions
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.get -> bool
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.set -> void
Nerdbank.Streams.ReadOnlySequenceExtensions
Nerdbank.Streams.StreamPipeReader
Nerdbank.Streams.StreamPipeReader.Read() -> System.IO.Pipelines.ReadResult
Expand Down
64 changes: 39 additions & 25 deletions test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,7 @@ public MultiplexingStreamTests(ITestOutputHelper logger)

public async Task InitializeAsync()
{
var mx1TraceSource = new TraceSource(nameof(this.mx1), SourceLevels.All);
var mx2TraceSource = new TraceSource(nameof(this.mx2), SourceLevels.All);

mx1TraceSource.Listeners.Add(new XunitTraceListener(this.Logger, this.TestId, this.TestTimer));
mx2TraceSource.Listeners.Add(new XunitTraceListener(this.Logger, this.TestId, this.TestTimer));

Func<string, MultiplexingStream.QualifiedChannelId, string, TraceSource> traceSourceFactory = (string mxInstanceName, MultiplexingStream.QualifiedChannelId id, string name) =>
{
var traceSource = new TraceSource(mxInstanceName + " channel " + id, SourceLevels.All);
traceSource.Listeners.Clear(); // remove DefaultTraceListener
traceSource.Listeners.Add(new XunitTraceListener(this.Logger, this.TestId, this.TestTimer));
return traceSource;
};

Func<MultiplexingStream.QualifiedChannelId, string, TraceSource> mx1TraceSourceFactory = (MultiplexingStream.QualifiedChannelId id, string name) => traceSourceFactory(nameof(this.mx1), id, name);
Func<MultiplexingStream.QualifiedChannelId, string, TraceSource> mx2TraceSourceFactory = (MultiplexingStream.QualifiedChannelId id, string name) => traceSourceFactory(nameof(this.mx2), id, name);

(this.transport1, this.transport2) = FullDuplexStream.CreatePair(new PipeOptions(pauseWriterThreshold: 2 * 1024 * 1024));
Task<MultiplexingStream>? mx1 = MultiplexingStream.CreateAsync(this.transport1, new MultiplexingStream.Options { ProtocolMajorVersion = this.ProtocolMajorVersion, TraceSource = mx1TraceSource, DefaultChannelTraceSourceFactoryWithQualifier = mx1TraceSourceFactory }, this.TimeoutToken);
Task<MultiplexingStream>? mx2 = MultiplexingStream.CreateAsync(this.transport2, new MultiplexingStream.Options { ProtocolMajorVersion = this.ProtocolMajorVersion, TraceSource = mx2TraceSource, DefaultChannelTraceSourceFactoryWithQualifier = mx2TraceSourceFactory }, this.TimeoutToken);
this.mx1 = await mx1;
this.mx2 = await mx2;
await this.ReinitializeMxStreamsAsync(new MultiplexingStream.Options());
}

public async Task DisposeAsync()
Expand Down Expand Up @@ -302,13 +281,14 @@ public async Task Disposal_DisposesTransportStream()
Assert.Throws<ObjectDisposedException>(() => this.transport1.Position);
}

[Fact]
public async Task Dispose_DisposesChannels()
[Theory, PairwiseData]
public async Task Dispose_DisposesChannels(bool channelFaulted)
{
await this.ReinitializeMxStreamsAsync(new MultiplexingStream.Options() { FaultOpenChannelsOnStreamDisposal = channelFaulted });
(MultiplexingStream.Channel channel1, MultiplexingStream.Channel channel2) = await this.EstablishChannelsAsync("A");
await this.mx1.DisposeAsync();
Assert.True(channel1.IsDisposed);
await VerifyChannelCompleted(channel1, new ObjectDisposedException(nameof(MultiplexingStream)).Message);
await VerifyChannelCompleted(channel1, channelFaulted ? new ObjectDisposedException(nameof(MultiplexingStream)).Message : null);

#pragma warning disable CS0618 // Type or member is obsolete
await channel1.Input.WaitForWriterCompletionAsync().WithCancellation(this.TimeoutToken);
Expand Down Expand Up @@ -1322,6 +1302,40 @@ protected async Task WaitForEphemeralChannelOfferToPropagateAsync()
return (channel1.AsStream(), channel2.AsStream());
}

private async Task ReinitializeMxStreamsAsync(MultiplexingStream.Options optionsTemplate)
{
await (this.mx1?.DisposeAsync() ?? default);
await (this.mx2?.DisposeAsync() ?? default);

var mx1TraceSource = new TraceSource(nameof(this.mx1), SourceLevels.All);
var mx2TraceSource = new TraceSource(nameof(this.mx2), SourceLevels.All);

mx1TraceSource.Listeners.Add(new XunitTraceListener(this.Logger, this.TestId, this.TestTimer));
mx2TraceSource.Listeners.Add(new XunitTraceListener(this.Logger, this.TestId, this.TestTimer));

Func<string, MultiplexingStream.QualifiedChannelId, string, TraceSource> traceSourceFactory = (string mxInstanceName, MultiplexingStream.QualifiedChannelId id, string name) =>
{
var traceSource = new TraceSource(mxInstanceName + " channel " + id, SourceLevels.All);
traceSource.Listeners.Clear(); // remove DefaultTraceListener
traceSource.Listeners.Add(new XunitTraceListener(this.Logger, this.TestId, this.TestTimer));
return traceSource;
};

Func<MultiplexingStream.QualifiedChannelId, string, TraceSource> mx1TraceSourceFactory = (MultiplexingStream.QualifiedChannelId id, string name) => traceSourceFactory(nameof(this.mx1), id, name);
Func<MultiplexingStream.QualifiedChannelId, string, TraceSource> mx2TraceSourceFactory = (MultiplexingStream.QualifiedChannelId id, string name) => traceSourceFactory(nameof(this.mx2), id, name);

optionsTemplate = new(optionsTemplate) { ProtocolMajorVersion = this.ProtocolMajorVersion };

var mx1Options = new MultiplexingStream.Options(optionsTemplate) { TraceSource = mx1TraceSource, DefaultChannelTraceSourceFactoryWithQualifier = mx1TraceSourceFactory };
var mx2Options = new MultiplexingStream.Options(optionsTemplate) { TraceSource = mx2TraceSource, DefaultChannelTraceSourceFactoryWithQualifier = mx2TraceSourceFactory };

(this.transport1, this.transport2) = FullDuplexStream.CreatePair(new PipeOptions(pauseWriterThreshold: 2 * 1024 * 1024));
Task<MultiplexingStream>? mx1 = MultiplexingStream.CreateAsync(this.transport1, mx1Options, this.TimeoutToken);
Task<MultiplexingStream>? mx2 = MultiplexingStream.CreateAsync(this.transport2, mx2Options, this.TimeoutToken);
this.mx1 = await mx1;
this.mx2 = await mx2;
}

protected class SlowPipeWriter : PipeWriter
{
private readonly Sequence<byte> writtenBytes = new Sequence<byte>();
Expand Down

0 comments on commit 0ee2425

Please sign in to comment.