Skip to content

Commit

Permalink
Merge pull request flutter#4 from dart-lang/close-cleanup
Browse files Browse the repository at this point in the history
Close cleanup
  • Loading branch information
grouma authored Mar 11, 2019
2 parents ed65201 + 80b486a commit c7d4ecf
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 36 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ language: dart
sudo: required
dist: trusty
addons:
chrome: stable
apt:
sources:
- google-chrome
Expand All @@ -29,7 +30,7 @@ before_install:
- sh -e /etc/init.d/xvfb start

before_script:
- wget http://chromedriver.storage.googleapis.com/2.35/chromedriver_linux64.zip
- wget http://chromedriver.storage.googleapis.com/2.46/chromedriver_linux64.zip
- unzip chromedriver_linux64.zip
- export PATH=$PATH:$PWD
- ./tool/travis-setup.sh
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.0.0

- No longer expose `close` and `onClose` on an `SseConnection`. This is simply
handled by the underlying `stream` / `sink`.
- Fix a bug where resources of the `SseConnection` were not properly closed.

## 1.0.0

- Internal cleanup.
Expand Down
54 changes: 31 additions & 23 deletions lib/server/sse_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,28 @@ String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'

/// A bi-directional SSE connection between server and browser.
class SseConnection extends StreamChannelMixin<String> {
/// Incoming messages from the Browser client.
final _incomingController = StreamController<String>();

/// Outgoing messages to the Browser client.
final _outgoingController = StreamController<String>();
final _closeCompleter = Completer<Null>();

final Sink _sink;
final String _clientId;

SseConnection(this._sink, this._clientId) {
final _closedCompleter = Completer<void>();

SseConnection(this._sink) {
_outgoingController.stream.listen((data) {
if (!_closeCompleter.isCompleted) {
if (!_closedCompleter.isCompleted) {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
}
});
_outgoingController.onCancel = _close;
_incomingController.onCancel = _close;
}

Future get onClose => _closeCompleter.future;

/// The message added to the sink has to be JSON encodable.
@override
StreamSink<String> get sink => _outgoingController.sink;
Expand All @@ -50,8 +54,13 @@ class SseConnection extends StreamChannelMixin<String> {
@override
Stream<String> get stream => _incomingController.stream;

void close() {
if (!_closeCompleter.isCompleted) _closeCompleter.complete();
void _close() {
if (!_closedCompleter.isCompleted) {
_closedCompleter.complete();
_sink.close();
if (!_outgoingController.isClosed) _outgoingController.close();
if (!_incomingController.isClosed) _incomingController.close();
}
}
}

Expand All @@ -63,15 +72,15 @@ class SseConnection extends StreamChannelMixin<String> {
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;

final Set<SseConnection> _connections = Set<SseConnection>();

final _connections = <String, SseConnection>{};
final _connectionController = StreamController<SseConnection>();

StreamQueue<SseConnection> _connectionsStream;

SseHandler(this._uri);

StreamQueue<SseConnection> get connections =>
StreamQueue(_connectionController.stream);
_connectionsStream ??= StreamQueue(_connectionController.stream);

shelf.Handler get handler => _handle;

Expand All @@ -82,19 +91,22 @@ class SseHandler {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
sink.add(_sseHeaders(req.headers['origin']));
var clientId = req.url.queryParameters['sseClientId'];
var connection = SseConnection(sink, clientId);
_connections.add(connection);
unawaited(connection.onClose.then((_) {
_connections.remove(connection);
var connection = SseConnection(sink);
_connections[clientId] = connection;
unawaited(connection._closedCompleter.future.then((_) {
_connections.remove(clientId);
}));
// Remove connection when it is remotely closed or the stream is
// cancelled.
channel.stream.listen((_) {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
connection.close();
connection._close();
});

_connectionController.add(connection);
});
return null;
return shelf.Response.notFound('');
}

String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
Expand Down Expand Up @@ -122,11 +134,7 @@ class SseHandler {
var clientId = req.url.queryParameters['sseClientId'];
var message = await req.readAsString();
var jsonObject = json.decode(message) as String;
for (var connection in _connections) {
if (connection._clientId == clientId) {
connection._incomingController.add(jsonObject);
}
}
_connections[clientId]?._incomingController?.add(jsonObject);
} catch (e, st) {
_logger.fine('Failed to handle incoming message. $e $st');
}
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: sse
version: 1.0.0
version: 2.0.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/sse
description: >-
Expand Down
46 changes: 39 additions & 7 deletions test/sse_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ void main() {
HttpServer server;
WebDriver webdriver;
SseHandler handler;
Process chromeDriver;

setUpAll(() async {
try {
chromeDriver = await Process.start(
'chromedriver', ['--port=4444', '--url-base=wd/hub']);
} catch (e) {
throw StateError(
'Could not start ChromeDriver. Is it installed?\nError: $e');
}
});

tearDownAll(() {
chromeDriver.kill();
});

setUp(() async {
handler = SseHandler(Uri.parse('/test'));
Expand All @@ -28,7 +43,11 @@ void main() {
listDirectories: true, defaultDocument: 'index.html'));

server = await io.serve(cascade.handler, 'localhost', 0);
webdriver = await createDriver();
webdriver = await createDriver(desired: {
'chromeOptions': {
'args': ['--headless']
}
});
});

tearDown(() async {
Expand All @@ -55,12 +74,12 @@ void main() {
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
var connectionA = await connections.next;
connectionA.sink.add('foo');
expect(await connectionA.stream.first, 'foo');

await webdriver.get('http://localhost:${server.port}');
var connectionB = await connections.next;

connectionA.sink.add('foo');
connectionB.sink.add('bar');
await connectionA.onClose;
expect(await connectionB.stream.first, 'bar');
});

Expand All @@ -69,8 +88,8 @@ void main() {
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);
connection.close();
await connection.onClose;
await connection.sink.close();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

Expand All @@ -83,7 +102,20 @@ void main() {
var closeButton = await webdriver.findElement(const By.tagName('button'));
await closeButton.click();

await connection.onClose;
// Should complete since the connection is closed.
await connection.stream.toList();
expect(handler.numberOfClients, 0);
});

test('Cancelling the listener closes the connection', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);

var sub = connection.stream.listen((_) {});
await sub.cancel();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

Expand Down
4 changes: 0 additions & 4 deletions tool/travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ if [[ $ANALYSIS_STATUS -ne 0 ]]; then
STATUS=$ANALYSIS_STATUS
fi

# Start chromedriver.
chromedriver --port=4444 --url-base=wd/hub &
PIDC=$!

# Run tests.
pub run test -r expanded -p vm -j 1
TEST_STATUS=$?
Expand Down

0 comments on commit c7d4ecf

Please sign in to comment.