@@ -93,10 +93,18 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
9393 /// The controller for this channel.
9494 final _mainController = new StreamChannelController (sync : true );
9595
96- /// A map from virtual channel ids to [StreamChannelController] s that should
97- /// be used to communicate over those channels.
96+ /// A map from input IDs to [StreamChannelController] s that should be used to
97+ /// communicate over those channels.
9898 final _controllers = < int , StreamChannelController > {};
9999
100+ /// Input IDs of controllers in [_controllers] that we've received messages
101+ /// for but that have not yet had a local [virtualChannel] created.
102+ final _pendingIds = new Set <int >();
103+
104+ /// Input IDs of virtual channels that used to exist but have since been
105+ /// closed.
106+ final _closedIds = new Set <int >();
107+
100108 /// The next id to use for a local virtual channel.
101109 ///
102110 /// Ids are used to identify virtual channels. Each message is tagged with an
@@ -114,8 +122,9 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
114122 /// The trick is that each endpoint only uses odd ids for its own channels.
115123 /// When sending a message over a channel that was created by the remote
116124 /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
117- /// knows that if an incoming message has an odd id, it's using the local id
118- /// scheme, but if it has an even id, it's using the remote id scheme.
125+ /// knows that if an incoming message has an odd id, it's coming from a
126+ /// channel that was originally created remotely, but if it has an even id,
127+ /// it's coming from a channel that was originally created locally.
119128 var _nextId = 1 ;
120129
121130 _MultiChannel (this ._inner) {
@@ -128,21 +137,28 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
128137
129138 _innerStreamSubscription = _inner.stream.listen ((message) {
130139 var id = message[0 ];
131- var controller = _controllers[id];
132140
133- // A controller might not exist if the channel was closed before an
134- // incoming message was processed.
135- if (controller == null ) return ;
141+ // If the channel was closed before an incoming message was processed,
142+ // ignore that message.
143+ if (_closedIds.contains (id)) return ;
144+
145+ var controller = _controllers.putIfAbsent (id, () {
146+ // If we receive a message for a controller that doesn't have a local
147+ // counterpart yet, create a controller for it to buffer incoming
148+ // messages for when a local connection is created.
149+ _pendingIds.add (id);
150+ return new StreamChannelController (sync : true );
151+ });
152+
136153 if (message.length > 1 ) {
137154 controller.local.sink.add (message[1 ]);
138- return ;
155+ } else {
156+ // A message without data indicates that the channel has been closed. We
157+ // can just close the sink here without doing any more cleanup, because
158+ // the sink closing will cause the stream to emit a done event which
159+ // will trigger more cleanup.
160+ controller.local.sink.close ();
139161 }
140-
141- // A message without data indicates that the channel has been closed. We
142- // can only close the sink here without doing any more cleanup, because
143- // the sink closing will cause the stream to emit a done event which will
144- // trigger more cleanup.
145- controller.local.sink.close ();
146162 },
147163 onDone: _closeInnerChannel,
148164 onError: _mainController.local.sink.addError);
@@ -173,23 +189,30 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
173189 this , inputId, new Stream .empty (), new NullStreamSink ());
174190 }
175191
176- if (_controllers.containsKey (inputId)) {
192+ StreamChannelController controller;
193+ if (_pendingIds.remove (inputId)) {
194+ // If we've already received messages for this channel, use the controller
195+ // where those messages are buffered.
196+ controller = _controllers[inputId];
197+ } else if (_controllers.containsKey (inputId) ||
198+ _closedIds.contains (inputId)) {
177199 throw new ArgumentError ("A virtual channel with id $id already exists." );
200+ } else {
201+ controller = new StreamChannelController (sync : true );
202+ _controllers[inputId] = controller;
178203 }
179204
180- var controller = new StreamChannelController (sync : true );
181- _controllers[inputId] = controller;
182205 controller.local.stream.listen (
183206 (message) => _inner.sink.add ([outputId, message]),
184207 onDone: () => _closeChannel (inputId, outputId));
185-
186208 return new VirtualChannel ._(
187209 this , outputId, controller.foreign.stream, controller.foreign.sink);
188210 }
189211
190212 /// Closes the virtual channel for which incoming messages have [inputId] and
191213 /// outgoing messages have [outputId] .
192214 void _closeChannel (int inputId, int outputId) {
215+ _closedIds.add (inputId);
193216 var controller = _controllers.remove (inputId);
194217 controller.local.sink.close ();
195218
0 commit comments