From 995611cf006c927d51cc53cb28f1aa4356d5414f Mon Sep 17 00:00:00 2001 From: Nate Bosch Date: Tue, 11 Aug 2020 16:23:49 -0700 Subject: [PATCH] Avoid an open Peer with a closed Client (#65) May fix https://github.com/dart-lang/sdk/issues/43012 If a `Peer` is created with a `StreamChannel` that does not follow the stated contract it's possible that the `sink` gets closed without receiving a done event from the `channel` which leaves the `Peer` instance in a state that's inconsistent with the underlying `Client`. The result is that it's possible to get a bad state trying to send a message even with `isClosed` returns `false`. - Make `isClosed` and `done` forward to the `_client` and `_peer` fields so that they can't be inconsistent. - Forward errors to the `_server` so that it can forward them through `done` without an extra `Completer` to manage. - Avoid closing the `sink` in the `Peer`. It will end up being closed by the server when it is handling the error, and it's the same `sink` instance in both places. - Add a test that ensures that `isClosed` behaves as expected following a call to `close()` even when the `StreamChannel` does not follow it's contract. --- lib/src/peer.dart | 20 ++++++++++---------- test/peer_test.dart | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/lib/src/peer.dart b/lib/src/peer.dart index 31e89365163e9..4cf6aae5b5351 100644 --- a/lib/src/peer.dart +++ b/lib/src/peer.dart @@ -35,11 +35,11 @@ class Peer implements Client, Server { /// they're responses. final _clientIncomingForwarder = StreamController(sync: true); - final _done = Completer(); + Future _done; @override - Future get done => _done.future; + Future get done => _done ??= Future.wait([_client.done, _server.done]); @override - bool get isClosed => _done.isCompleted; + bool get isClosed => _client.isClosed || _server.isClosed; @override ErrorCallback get onUnhandledError => _server?.onUnhandledError; @@ -142,15 +142,15 @@ class Peer implements Client, Server { _serverIncomingForwarder.add(message); } }, onError: (error, stackTrace) { - _done.completeError(error, stackTrace); - _channel.sink.close(); - }, onDone: () { - if (!_done.isCompleted) _done.complete(); - close(); - }); + _serverIncomingForwarder.addError(error, stackTrace); + }, onDone: close); return done; } @override - Future close() => Future.wait([_client.close(), _server.close()]); + Future close() { + _client.close(); + _server.close(); + return done; + } } diff --git a/test/peer_test.dart b/test/peer_test.dart index 7284330f5a022..e20976314adef 100644 --- a/test/peer_test.dart +++ b/test/peer_test.dart @@ -113,6 +113,21 @@ void main() { await peer.close(); }); + test('considered closed with misbehaving StreamChannel', () async { + // If a StreamChannel does not enforce the guarantees stated in it's + // contract - specifically that "Closing the sink causes the stream to close + // before it emits any more events." - The `Peer` should still understand + // when it has been closed manually. + var channel = StreamChannel( + StreamController().stream, + StreamController(), + ); + var peer = json_rpc.Peer.withoutJson(channel); + unawaited(peer.listen()); + unawaited(peer.close()); + expect(peer.isClosed, true); + }); + group('like a server,', () { test('can receive a call and return a response', () { expect(outgoing.first,