diff --git a/.travis.yml b/.travis.yml index 21c6fe6a06e7f..3b66d0dc0396b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ language: dart sudo: required dist: trusty addons: + chrome: stable apt: sources: - google-chrome @@ -29,7 +30,7 @@ before_install: - sh -e /etc/init.d/xvfb start before_script: - - wget http://chromedriver.storage.googleapis.com/2.35/chromedriver_linux64.zip + - wget http://chromedriver.storage.googleapis.com/2.46/chromedriver_linux64.zip - unzip chromedriver_linux64.zip - export PATH=$PATH:$PWD - ./tool/travis-setup.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 77c5538b82532..ba13a265fa329 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 2.0.0 + +- No longer expose `close` and `onClose` on an `SseConnection`. This is simply + handled by the underlying `stream` / `sink`. +- Fix a bug where resources of the `SseConnection` were not properly closed. + ## 1.0.0 - Internal cleanup. diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index a683a2f632c2f..a3067f6da17fe 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -21,24 +21,28 @@ String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n' /// A bi-directional SSE connection between server and browser. class SseConnection extends StreamChannelMixin { + /// Incoming messages from the Browser client. final _incomingController = StreamController(); + + /// Outgoing messages to the Browser client. final _outgoingController = StreamController(); - final _closeCompleter = Completer(); + final Sink _sink; - final String _clientId; - SseConnection(this._sink, this._clientId) { + final _closedCompleter = Completer(); + + SseConnection(this._sink) { _outgoingController.stream.listen((data) { - if (!_closeCompleter.isCompleted) { + if (!_closedCompleter.isCompleted) { // JSON encode the message to escape new lines. _sink.add('data: ${json.encode(data)}\n'); _sink.add('\n'); } }); + _outgoingController.onCancel = _close; + _incomingController.onCancel = _close; } - Future get onClose => _closeCompleter.future; - /// The message added to the sink has to be JSON encodable. @override StreamSink get sink => _outgoingController.sink; @@ -50,8 +54,13 @@ class SseConnection extends StreamChannelMixin { @override Stream get stream => _incomingController.stream; - void close() { - if (!_closeCompleter.isCompleted) _closeCompleter.complete(); + void _close() { + if (!_closedCompleter.isCompleted) { + _closedCompleter.complete(); + _sink.close(); + if (!_outgoingController.isClosed) _outgoingController.close(); + if (!_incomingController.isClosed) _incomingController.close(); + } } } @@ -63,15 +72,15 @@ class SseConnection extends StreamChannelMixin { class SseHandler { final _logger = Logger('SseHandler'); final Uri _uri; - - final Set _connections = Set(); - + final _connections = {}; final _connectionController = StreamController(); + StreamQueue _connectionsStream; + SseHandler(this._uri); StreamQueue get connections => - StreamQueue(_connectionController.stream); + _connectionsStream ??= StreamQueue(_connectionController.stream); shelf.Handler get handler => _handle; @@ -82,19 +91,22 @@ class SseHandler { var sink = utf8.encoder.startChunkedConversion(channel.sink); sink.add(_sseHeaders(req.headers['origin'])); var clientId = req.url.queryParameters['sseClientId']; - var connection = SseConnection(sink, clientId); - _connections.add(connection); - unawaited(connection.onClose.then((_) { - _connections.remove(connection); + var connection = SseConnection(sink); + _connections[clientId] = connection; + unawaited(connection._closedCompleter.future.then((_) { + _connections.remove(clientId); })); + // Remove connection when it is remotely closed or the stream is + // cancelled. channel.stream.listen((_) { // SSE is unidirectional. Responses are handled through POST requests. }, onDone: () { - connection.close(); + connection._close(); }); + _connectionController.add(connection); }); - return null; + return shelf.Response.notFound(''); } String _getOriginalPath(shelf.Request req) => req.requestedUri.path; @@ -122,11 +134,7 @@ class SseHandler { var clientId = req.url.queryParameters['sseClientId']; var message = await req.readAsString(); var jsonObject = json.decode(message) as String; - for (var connection in _connections) { - if (connection._clientId == clientId) { - connection._incomingController.add(jsonObject); - } - } + _connections[clientId]?._incomingController?.add(jsonObject); } catch (e, st) { _logger.fine('Failed to handle incoming message. $e $st'); } diff --git a/pubspec.yaml b/pubspec.yaml index a86543f758a2e..054332bf34796 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: sse -version: 1.0.0 +version: 2.0.0 author: Dart Team homepage: https://github.com/dart-lang/sse description: >- diff --git a/test/sse_test.dart b/test/sse_test.dart index d6e04451b3719..16a9a0ce80c6d 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -17,6 +17,21 @@ void main() { HttpServer server; WebDriver webdriver; SseHandler handler; + Process chromeDriver; + + setUpAll(() async { + try { + chromeDriver = await Process.start( + 'chromedriver', ['--port=4444', '--url-base=wd/hub']); + } catch (e) { + throw StateError( + 'Could not start ChromeDriver. Is it installed?\nError: $e'); + } + }); + + tearDownAll(() { + chromeDriver.kill(); + }); setUp(() async { handler = SseHandler(Uri.parse('/test')); @@ -28,7 +43,11 @@ void main() { listDirectories: true, defaultDocument: 'index.html')); server = await io.serve(cascade.handler, 'localhost', 0); - webdriver = await createDriver(); + webdriver = await createDriver(desired: { + 'chromeOptions': { + 'args': ['--headless'] + } + }); }); tearDown(() async { @@ -55,12 +74,12 @@ void main() { var connections = handler.connections; await webdriver.get('http://localhost:${server.port}'); var connectionA = await connections.next; + connectionA.sink.add('foo'); + expect(await connectionA.stream.first, 'foo'); + await webdriver.get('http://localhost:${server.port}'); var connectionB = await connections.next; - - connectionA.sink.add('foo'); connectionB.sink.add('bar'); - await connectionA.onClose; expect(await connectionB.stream.first, 'bar'); }); @@ -69,8 +88,8 @@ void main() { await webdriver.get('http://localhost:${server.port}'); var connection = await handler.connections.next; expect(handler.numberOfClients, 1); - connection.close(); - await connection.onClose; + await connection.sink.close(); + await pumpEventQueue(); expect(handler.numberOfClients, 0); }); @@ -83,7 +102,20 @@ void main() { var closeButton = await webdriver.findElement(const By.tagName('button')); await closeButton.click(); - await connection.onClose; + // Should complete since the connection is closed. + await connection.stream.toList(); + expect(handler.numberOfClients, 0); + }); + + test('Cancelling the listener closes the connection', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + var sub = connection.stream.listen((_) {}); + await sub.cancel(); + await pumpEventQueue(); expect(handler.numberOfClients, 0); }); diff --git a/tool/travis.sh b/tool/travis.sh index b9c9c9fd1a32a..9ebd3d425fc43 100755 --- a/tool/travis.sh +++ b/tool/travis.sh @@ -23,10 +23,6 @@ if [[ $ANALYSIS_STATUS -ne 0 ]]; then STATUS=$ANALYSIS_STATUS fi -# Start chromedriver. -chromedriver --port=4444 --url-base=wd/hub & -PIDC=$! - # Run tests. pub run test -r expanded -p vm -j 1 TEST_STATUS=$?