diff --git a/.travis.yml b/.travis.yml index e0371f64fc9bd..d6bce8f492b82 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ addons: dart: - dev - - 2.1.0 + - 2.2.0 with_content_shell: false diff --git a/CHANGELOG.md b/CHANGELOG.md index f46643e1947a0..022f12028de6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 3.1.0 + +- Add optional `keepAlive` parameter to the `SseHandler`. If `keepAlive` is + supplied, the connection will remain active for this period after a + disconnect and can be reconnected transparently. If there is no reconnect + within that period, the connection will be closed normally. + ## 3.0.0 - Add retry logic. diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 1a5ef068170ca..bfed9350b48c1 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -2,151 +2,4 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import 'dart:async'; -import 'dart:convert'; - -import 'package:async/async.dart'; -import 'package:logging/logging.dart'; -import 'package:pedantic/pedantic.dart'; -import 'package:shelf/shelf.dart' as shelf; -import 'package:stream_channel/stream_channel.dart'; - -// RFC 2616 requires carriage return delimiters. -String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n' - 'Content-Type: text/event-stream\r\n' - 'Cache-Control: no-cache\r\n' - 'Connection: keep-alive\r\n' - 'Access-Control-Allow-Credentials: true\r\n' - 'Access-Control-Allow-Origin: $origin\r\n' - '\r\n\r\n'; - -/// A bi-directional SSE connection between server and browser. -class SseConnection extends StreamChannelMixin { - /// Incoming messages from the Browser client. - final _incomingController = StreamController(); - - /// Outgoing messages to the Browser client. - final _outgoingController = StreamController(); - - final Sink _sink; - - final _closedCompleter = Completer(); - - SseConnection(this._sink) { - _outgoingController.stream.listen((data) { - if (!_closedCompleter.isCompleted) { - // JSON encode the message to escape new lines. - _sink.add('data: ${json.encode(data)}\n'); - _sink.add('\n'); - } - }); - _outgoingController.onCancel = _close; - _incomingController.onCancel = _close; - } - - /// The message added to the sink has to be JSON encodable. - @override - StreamSink get sink => _outgoingController.sink; - - // Add messages to this [StreamSink] to send them to the server. - /// [Stream] of messages sent from the server to this client. - /// - /// A message is a decoded JSON object. - @override - Stream get stream => _incomingController.stream; - - void _close() { - if (!_closedCompleter.isCompleted) { - _closedCompleter.complete(); - _sink.close(); - if (!_outgoingController.isClosed) _outgoingController.close(); - if (!_incomingController.isClosed) _incomingController.close(); - } - } -} - -/// [SseHandler] handles requests on a user defined path to create -/// two-way communications of JSON encodable data between server and clients. -/// -/// A server sends messages to a client through an SSE channel, while -/// a client sends message to a server through HTTP POST requests. -class SseHandler { - final _logger = Logger('SseHandler'); - final Uri _uri; - final _connections = {}; - final _connectionController = StreamController(); - - StreamQueue _connectionsStream; - - SseHandler(this._uri); - - StreamQueue get connections => - _connectionsStream ??= StreamQueue(_connectionController.stream); - - shelf.Handler get handler => _handle; - - int get numberOfClients => _connections.length; - - shelf.Response _createSseConnection(shelf.Request req, String path) { - req.hijack((channel) async { - var sink = utf8.encoder.startChunkedConversion(channel.sink); - sink.add(_sseHeaders(req.headers['origin'])); - var clientId = req.url.queryParameters['sseClientId']; - var connection = SseConnection(sink); - _connections[clientId] = connection; - unawaited(connection._closedCompleter.future.then((_) { - _connections.remove(clientId); - })); - // Remove connection when it is remotely closed or the stream is - // cancelled. - channel.stream.listen((_) { - // SSE is unidirectional. Responses are handled through POST requests. - }, onDone: () { - connection._close(); - }); - - _connectionController.add(connection); - }); - return shelf.Response.notFound(''); - } - - String _getOriginalPath(shelf.Request req) => req.requestedUri.path; - - Future _handle(shelf.Request req) async { - var path = _getOriginalPath(req); - if (_uri.path != path) { - return shelf.Response.notFound(''); - } - - if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') { - return _createSseConnection(req, path); - } - - if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') { - return _handleIncomingMessage(req, path); - } - - return shelf.Response.notFound(''); - } - - Future _handleIncomingMessage( - shelf.Request req, String path) async { - try { - var clientId = req.url.queryParameters['sseClientId']; - var message = await req.readAsString(); - var jsonObject = json.decode(message) as String; - _connections[clientId]?._incomingController?.add(jsonObject); - } catch (e, st) { - _logger.fine('Failed to handle incoming message. $e $st'); - } - return shelf.Response.ok('', headers: { - 'access-control-allow-credentials': 'true', - 'access-control-allow-origin': _originFor(req), - }); - } - - String _originFor(shelf.Request req) => - // Firefox does not set header "origin". - // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661 - req.headers['origin'] ?? req.headers['host']; -} +export 'package:sse/src/server/sse_handler.dart' show SseConnection, SseHandler; diff --git a/lib/src/server/sse_handler.dart b/lib/src/server/sse_handler.dart new file mode 100644 index 0000000000000..e8cd523b93277 --- /dev/null +++ b/lib/src/server/sse_handler.dart @@ -0,0 +1,221 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:async/async.dart'; +import 'package:logging/logging.dart'; +import 'package:pedantic/pedantic.dart'; +import 'package:shelf/shelf.dart' as shelf; +import 'package:stream_channel/stream_channel.dart'; + +// RFC 2616 requires carriage return delimiters. +String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n' + 'Content-Type: text/event-stream\r\n' + 'Cache-Control: no-cache\r\n' + 'Connection: keep-alive\r\n' + 'Access-Control-Allow-Credentials: true\r\n' + 'Access-Control-Allow-Origin: $origin\r\n' + '\r\n\r\n'; + +/// A bi-directional SSE connection between server and browser. +class SseConnection extends StreamChannelMixin { + /// Incoming messages from the Browser client. + final _incomingController = StreamController(); + + /// Outgoing messages to the Browser client. + final _outgoingController = StreamController(); + + Sink _sink; + + /// How long to wait after a connection drops before considering it closed. + final Duration _keepAlive; + + /// A timer counting down the KeepAlive period (null if hasn't disconnected). + Timer _keepAliveTimer; + + /// Whether this connection is currently in the KeepAlive timeout period. + bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false; + + final _closedCompleter = Completer(); + + /// Creates an [SseConnection] for the supplied [_sink]. + /// + /// If [keepAlive] is supplied, the connection will remain active for this + /// period after a disconnect and can be reconnected transparently. If there + /// is no reconnect within that period, the connection will be closed normally. + /// + /// If [keepAlive] is not supplied, the connection will be closed immediately + /// after a disconnect. + SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { + unawaited(_setUpListener()); + _outgoingController.onCancel = _close; + _incomingController.onCancel = _close; + } + + Future _setUpListener() async { + var outgoingStreamQueue = StreamQueue(_outgoingController.stream); + while (await outgoingStreamQueue.hasNext) { + // If we're in a KeepAlive timeout, there's nowhere to send messages so + // wait a short period and check again. + if (isInKeepAlivePeriod) { + await Future.delayed(const Duration(milliseconds: 200)); + continue; + } + + // Peek the data so we don't remove it from the stream if we're unable to + // send it. + final data = await outgoingStreamQueue.peek; + try { + // JSON encode the message to escape new lines. + _sink.add('data: ${json.encode(data)}\n'); + _sink.add('\n'); + await outgoingStreamQueue.next; // Consume from stream if no errors. + } catch (StateError) { + if (_keepAlive == null || _closedCompleter.isCompleted) { + rethrow; + } + // If we got here then the sink may have closed but the stream.onDone + // hasn't fired yet, so pause the subscription and skip calling + // `next` so the message remains in the queue to try again. + _handleDisconnect(); + } + } + } + + /// The message added to the sink has to be JSON encodable. + @override + StreamSink get sink => _outgoingController.sink; + + // Add messages to this [StreamSink] to send them to the server. + /// [Stream] of messages sent from the server to this client. + /// + /// A message is a decoded JSON object. + @override + Stream get stream => _incomingController.stream; + + void _acceptReconnection(Sink sink) { + _keepAliveTimer?.cancel(); + _sink = sink; + } + + void _handleDisconnect() { + if (_keepAlive == null) { + // Close immediately if we're not keeping alive. + _close(); + } else if (!isInKeepAlivePeriod) { + // Otherwise if we didn't already have an active timer, set a timer to + // close after the timeout period. If the connection comes back, this will + // be cancelled and all messages left in the queue tried again. + _keepAliveTimer = Timer(_keepAlive, _close); + } + } + + void _close() { + if (!_closedCompleter.isCompleted) { + _closedCompleter.complete(); + _sink.close(); + if (!_outgoingController.isClosed) _outgoingController.close(); + if (!_incomingController.isClosed) _incomingController.close(); + } + } +} + +/// [SseHandler] handles requests on a user defined path to create +/// two-way communications of JSON encodable data between server and clients. +/// +/// A server sends messages to a client through an SSE channel, while +/// a client sends message to a server through HTTP POST requests. +class SseHandler { + final _logger = Logger('SseHandler'); + final Uri _uri; + final Duration _keepAlive; + final _connections = {}; + final _connectionController = StreamController(); + + StreamQueue _connectionsStream; + + SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive; + + StreamQueue get connections => + _connectionsStream ??= StreamQueue(_connectionController.stream); + + shelf.Handler get handler => _handle; + + int get numberOfClients => _connections.length; + + shelf.Response _createSseConnection(shelf.Request req, String path) { + req.hijack((channel) async { + var sink = utf8.encoder.startChunkedConversion(channel.sink); + sink.add(_sseHeaders(req.headers['origin'])); + var clientId = req.url.queryParameters['sseClientId']; + + // Check if we already have a connection for this ID that is in the process + // of timing out (in which case we can reconnect it transparently). + if (_connections[clientId] != null && + _connections[clientId].isInKeepAlivePeriod) { + _connections[clientId]._acceptReconnection(sink); + } else { + var connection = SseConnection(sink, keepAlive: _keepAlive); + _connections[clientId] = connection; + unawaited(connection._closedCompleter.future.then((_) { + _connections.remove(clientId); + })); + // Remove connection when it is remotely closed or the stream is + // cancelled. + channel.stream.listen((_) { + // SSE is unidirectional. Responses are handled through POST requests. + }, onDone: () { + connection._handleDisconnect(); + }); + + _connectionController.add(connection); + } + }); + return shelf.Response.notFound(''); + } + + String _getOriginalPath(shelf.Request req) => req.requestedUri.path; + + Future _handle(shelf.Request req) async { + var path = _getOriginalPath(req); + if (_uri.path != path) { + return shelf.Response.notFound(''); + } + + if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') { + return _createSseConnection(req, path); + } + + if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') { + return _handleIncomingMessage(req, path); + } + + return shelf.Response.notFound(''); + } + + Future _handleIncomingMessage( + shelf.Request req, String path) async { + try { + var clientId = req.url.queryParameters['sseClientId']; + var message = await req.readAsString(); + var jsonObject = json.decode(message) as String; + _connections[clientId]?._incomingController?.add(jsonObject); + } catch (e, st) { + _logger.fine('Failed to handle incoming message. $e $st'); + } + return shelf.Response.ok('', headers: { + 'access-control-allow-credentials': 'true', + 'access-control-allow-origin': _originFor(req), + }); + } + + String _originFor(shelf.Request req) => + // Firefox does not set header "origin". + // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661 + req.headers['origin'] ?? req.headers['host']; +} + +void closeSink(SseConnection connection) => connection._sink.close(); diff --git a/pubspec.yaml b/pubspec.yaml index 33fc5fb427ed8..9c164f79e7ce8 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: sse -version: 3.0.0 +version: 3.1.0 author: Dart Team homepage: https://github.com/dart-lang/sse description: >- @@ -8,7 +8,7 @@ description: >- requests. environment: - sdk: ">=2.1.0 <3.0.0" + sdk: ">=2.2.0 <3.0.0" dependencies: async: ^2.0.8 diff --git a/test/sse_test.dart b/test/sse_test.dart index 62fe4c6cb3272..0d4040f2897c6 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -10,6 +10,7 @@ import 'package:shelf/shelf.dart' as shelf; import 'package:shelf/shelf_io.dart' as io; import 'package:shelf_static/shelf_static.dart'; import 'package:sse/server/sse_handler.dart'; +import 'package:sse/src/server/sse_handler.dart' show closeSink; import 'package:test/test.dart'; import 'package:webdriver/io.dart'; @@ -33,99 +34,180 @@ void main() { chromeDriver.kill(); }); - setUp(() async { - handler = SseHandler(Uri.parse('/test')); - - var cascade = shelf.Cascade().add(handler.handler).add(_faviconHandler).add( - createStaticHandler('test/web', - listDirectories: true, defaultDocument: 'index.html')); - - server = await io.serve(cascade.handler, 'localhost', 0); - var capabilities = Capabilities.chrome - ..addAll({ - Capabilities.chromeOptions: { - 'args': ['--headless'] - } - }); - webdriver = await createDriver(desired: capabilities); + group('SSE', () { + setUp(() async { + handler = SseHandler(Uri.parse('/test')); + + var cascade = shelf.Cascade() + .add(handler.handler) + .add(_faviconHandler) + .add(createStaticHandler('test/web', + listDirectories: true, defaultDocument: 'index.html')); + + server = await io.serve(cascade.handler, 'localhost', 0); + var capabilities = Capabilities.chrome + ..addAll({ + Capabilities.chromeOptions: { + 'args': ['--headless'] + } + }); + webdriver = await createDriver(desired: capabilities); + }); + + tearDown(() async { + await webdriver.quit(); + await server.close(); + }); + + test('Can round trip messages', () async { + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + connection.sink.add('blah'); + expect(await connection.stream.first, 'blah'); + }); + + test('Multiple clients can connect', () async { + var connections = handler.connections; + await webdriver.get('http://localhost:${server.port}'); + await connections.next; + await webdriver.get('http://localhost:${server.port}'); + await connections.next; + }); + + test('Routes data correctly', () async { + var connections = handler.connections; + await webdriver.get('http://localhost:${server.port}'); + var connectionA = await connections.next; + connectionA.sink.add('foo'); + expect(await connectionA.stream.first, 'foo'); + + await webdriver.get('http://localhost:${server.port}'); + var connectionB = await connections.next; + connectionB.sink.add('bar'); + expect(await connectionB.stream.first, 'bar'); + }); + + test('Can close from the server', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + await connection.sink.close(); + await pumpEventQueue(); + expect(handler.numberOfClients, 0); + }); + + test('Client reconnects after being disconnected', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + await connection.sink.close(); + await pumpEventQueue(); + expect(handler.numberOfClients, 0); + + // Ensure the client reconnects + await handler.connections.next; + }); + + test('Can close from the client-side', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + var closeButton = await webdriver.findElement(const By.tagName('button')); + await closeButton.click(); + + // Should complete since the connection is closed. + await connection.stream.toList(); + expect(handler.numberOfClients, 0); + }); + + test('Cancelling the listener closes the connection', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + var sub = connection.stream.listen((_) {}); + await sub.cancel(); + await pumpEventQueue(); + expect(handler.numberOfClients, 0); + }); + + test('Disconnects when navigating away', () async { + await webdriver.get('http://localhost:${server.port}'); + expect(handler.numberOfClients, 1); + + await webdriver.get('chrome://version/'); + expect(handler.numberOfClients, 0); + }); }); - tearDown(() async { - await webdriver.quit(); - await server.close(); - }); - - test('Can round trip messages', () async { - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - connection.sink.add('blah'); - expect(await connection.stream.first, 'blah'); - }); - - test('Multiple clients can connect', () async { - var connections = handler.connections; - await webdriver.get('http://localhost:${server.port}'); - await connections.next; - await webdriver.get('http://localhost:${server.port}'); - await connections.next; - }); - - test('Routes data correctly', () async { - var connections = handler.connections; - await webdriver.get('http://localhost:${server.port}'); - var connectionA = await connections.next; - connectionA.sink.add('foo'); - expect(await connectionA.stream.first, 'foo'); - - await webdriver.get('http://localhost:${server.port}'); - var connectionB = await connections.next; - connectionB.sink.add('bar'); - expect(await connectionB.stream.first, 'bar'); - }); - - test('Can close from the server', () async { - expect(handler.numberOfClients, 0); - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - expect(handler.numberOfClients, 1); - await connection.sink.close(); - await pumpEventQueue(); - expect(handler.numberOfClients, 0); - }); - - test('Can close from the client-side', () async { - expect(handler.numberOfClients, 0); - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - expect(handler.numberOfClients, 1); - - var closeButton = await webdriver.findElement(const By.tagName('button')); - await closeButton.click(); - - // Should complete since the connection is closed. - await connection.stream.toList(); - expect(handler.numberOfClients, 0); - }); - - test('Cancelling the listener closes the connection', () async { - expect(handler.numberOfClients, 0); - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - expect(handler.numberOfClients, 1); - - var sub = connection.stream.listen((_) {}); - await sub.cancel(); - await pumpEventQueue(); - expect(handler.numberOfClients, 0); - }); - - test('Disconnects when navigating away', () async { - await webdriver.get('http://localhost:${server.port}'); - expect(handler.numberOfClients, 1); - - await webdriver.get('chrome://version/'); - expect(handler.numberOfClients, 0); - }); + group('SSE with server keep-alive', () { + setUp(() async { + handler = + SseHandler(Uri.parse('/test'), keepAlive: const Duration(seconds: 5)); + + var cascade = shelf.Cascade() + .add(handler.handler) + .add(_faviconHandler) + .add(createStaticHandler('test/web', + listDirectories: true, defaultDocument: 'index.html')); + + server = await io.serve(cascade.handler, 'localhost', 0); + var capabilities = Capabilities.chrome + ..addAll({ + Capabilities.chromeOptions: { + 'args': ['--headless'] + } + }); + webdriver = await createDriver(desired: capabilities); + }); + + tearDown(() async { + await webdriver.quit(); + await server.close(); + }); + + test('Client reconnect use the same connection', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + // Close the underlying connection. + closeSink(connection); + await pumpEventQueue(); + + // Ensure there's still a connection. + expect(handler.numberOfClients, 1); + + // Ensure we can still round-trip data on the original connection. + connection.sink.add('bar'); + expect(await connection.stream.first, 'bar'); + }); + + test('Messages sent during disconnect arrive in-order', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + // Close the underlying connection. + closeSink(connection); + connection.sink.add('one'); + connection.sink.add('two'); + await pumpEventQueue(); + + // Ensure there's still a connection. + expect(handler.numberOfClients, 1); + + // Ensure messages arrive in the same order + expect(await connection.stream.take(2).toList(), equals(['one', 'two'])); + }); + }, timeout: const Timeout(Duration(seconds: 120))); } FutureOr _faviconHandler(shelf.Request request) {