Skip to content

Commit

Permalink
Avoid an open Peer with a closed Client (flutter#65)
Browse files Browse the repository at this point in the history
May fix dart-lang/sdk#43012

If a `Peer` is created with a `StreamChannel` that does not follow the
stated contract it's possible that the `sink` gets closed without
receiving a done event from the `channel` which leaves the `Peer`
instance in a state that's inconsistent with the underlying `Client`.
The result is that it's possible to get a bad state trying to send a
message even with `isClosed` returns `false`.

- Make `isClosed` and `done` forward to the `_client` and `_peer` fields
  so that they can't be inconsistent.
- Forward errors to the `_server` so that it can forward them through
  `done` without an extra `Completer` to manage.
- Avoid closing the `sink` in the `Peer`. It will end up being closed by
  the server when it is handling the error, and it's the same `sink`
  instance in both places.
- Add a test that ensures that `isClosed` behaves as expected following
  a call to `close()` even when the `StreamChannel` does not follow it's
  contract.
  • Loading branch information
natebosch authored Aug 11, 2020
1 parent 413ee9d commit 995611c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
20 changes: 10 additions & 10 deletions lib/src/peer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class Peer implements Client, Server {
/// they're responses.
final _clientIncomingForwarder = StreamController(sync: true);

final _done = Completer<void>();
Future<void> _done;
@override
Future get done => _done.future;
Future get done => _done ??= Future.wait([_client.done, _server.done]);
@override
bool get isClosed => _done.isCompleted;
bool get isClosed => _client.isClosed || _server.isClosed;

@override
ErrorCallback get onUnhandledError => _server?.onUnhandledError;
Expand Down Expand Up @@ -142,15 +142,15 @@ class Peer implements Client, Server {
_serverIncomingForwarder.add(message);
}
}, onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
}, onDone: () {
if (!_done.isCompleted) _done.complete();
close();
});
_serverIncomingForwarder.addError(error, stackTrace);
}, onDone: close);
return done;
}

@override
Future close() => Future.wait([_client.close(), _server.close()]);
Future close() {
_client.close();
_server.close();
return done;
}
}
15 changes: 15 additions & 0 deletions test/peer_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ void main() {
await peer.close();
});

test('considered closed with misbehaving StreamChannel', () async {
// If a StreamChannel does not enforce the guarantees stated in it's
// contract - specifically that "Closing the sink causes the stream to close
// before it emits any more events." - The `Peer` should still understand
// when it has been closed manually.
var channel = StreamChannel(
StreamController().stream,
StreamController(),
);
var peer = json_rpc.Peer.withoutJson(channel);
unawaited(peer.listen());
unawaited(peer.close());
expect(peer.isClosed, true);
});

group('like a server,', () {
test('can receive a call and return a response', () {
expect(outgoing.first,
Expand Down

0 comments on commit 995611c

Please sign in to comment.