Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Migrate http2 to null safety
Browse files Browse the repository at this point in the history
  • Loading branch information
iinozemtsev committed Jan 13, 2021
1 parent 6084f9d commit 85c49cf
Show file tree
Hide file tree
Showing 37 changed files with 474 additions and 222 deletions.
28 changes: 16 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
language: dart

dart:
- 2.8.4
- dev

dart_task:
- test: --exclude-tags flaky
- test: --tags flaky
- dartanalyzer: --fatal-infos --fatal-warnings .

matrix:
jobs:
include:
# Only validate formatting using the dev release
- dart: dev
dart_task: dartfmt
- name: "Analyzer"
os: linux
script: dart analyze --fatal-warnings --fatal-infos .
- name: "Format"
os: linux
script: dartfmt -n --set-exit-if-changed .
- name: "Tests"
os: linux
script: dart test --exclude-tags flaky
- name: "Flaky Tests"
os: linux
script: dart test --tags flaky
allow_failures:
- dart_task:
test: --tags flaky
- name: "Flaky Tests"
os: linux
script: dart test --tags flaky

# Only building master means that we don't run two builds for each pull request.
branches:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.0.0-nullsafety.0

* Migrate to null safety.

## 1.0.2-dev

* Update minimum Dart SDK to `2.8.4`.
Expand Down
2 changes: 2 additions & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ analyzer:
unused_import: error
unused_local_variable: error
dead_code: error
exclude:
- test/**/*.mocks.dart
linter:
rules:
- await_only_futures
Expand Down
2 changes: 1 addition & 1 deletion example/display_headers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import 'dart:io';
import 'package:http2/transport.dart';

void main(List<String> args) async {
if (args == null || args.length != 1) {
if (args.length != 1) {
print('Usage: dart display_headers.dart <HTTPS_URI>');
exit(1);
}
Expand Down
6 changes: 3 additions & 3 deletions experimental/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void handleClient(SecureSocket socket) {
connection.incomingStreams.listen((ServerTransportStream stream) async {
dumpInfo('main', 'Got new HTTP/2 stream with id: ${stream.id}');

String path;
String? path;
stream.incomingMessages.listen((StreamMessage msg) async {
dumpInfo('${stream.id}', 'Got new incoming message');
if (msg is HeadersStreamMessage) {
Expand All @@ -59,9 +59,9 @@ void handleClient(SecureSocket socket) {
if (path == '/') {
unawaited(sendHtml(stream));
} else if (['/iframe', '/iframe2'].contains(path)) {
unawaited(sendIFrameHtml(stream, path));
unawaited(sendIFrameHtml(stream, path!));
} else {
unawaited(send404(stream, path));
unawaited(send404(stream, path!));
}
}
} else if (msg is DataStreamMessage) {
Expand Down
14 changes: 7 additions & 7 deletions lib/multiprotocol_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import 'transport.dart' as http2;
/// * one handles HTTP/2 clients (called with a [http2.ServerTransportStream])
class MultiProtocolHttpServer {
final SecureServerSocket _serverSocket;
final http2.ServerSettings _settings;
final http2.ServerSettings? _settings;

_ServerSocketController _http11Controller;
HttpServer _http11Server;
late _ServerSocketController _http11Controller;
late HttpServer _http11Server;

StreamController<http2.ServerTransportStream> _http2Controller;
Stream<http2.ServerTransportStream> _http2Server;
late StreamController<http2.ServerTransportStream> _http2Controller;
late Stream<http2.ServerTransportStream> _http2Server;
final _http2Connections = <http2.ServerTransportConnection>{};

MultiProtocolHttpServer._(this._serverSocket, this._settings) {
Expand All @@ -46,7 +46,7 @@ class MultiProtocolHttpServer {
/// See also [startServing].
static Future<MultiProtocolHttpServer> bind(
address, int port, SecurityContext context,
{http2.ServerSettings settings}) async {
{http2.ServerSettings? settings}) async {
context.setAlpnProtocols(['h2', 'h2-14', 'http/1.1'], true);
var secureServer = await SecureServerSocket.bind(address, port, context);
return MultiProtocolHttpServer._(secureServer, settings);
Expand All @@ -65,7 +65,7 @@ class MultiProtocolHttpServer {
/// an exception (i.e. these must take care of error handling themselves).
void startServing(void Function(HttpRequest) callbackHttp11,
void Function(http2.ServerTransportStream) callbackHttp2,
{void Function(dynamic error, StackTrace) onError}) {
{void Function(dynamic error, StackTrace)? onError}) {
// 1. Start listening on the real [SecureServerSocket].
_serverSocket.listen((SecureSocket socket) {
var protocol = socket.selectedProtocol;
Expand Down
4 changes: 2 additions & 2 deletions lib/src/async_utils/async_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ class BufferedSink {

/// A intermediate [StreamController] used to catch pause signals and to
/// propagate the change via [bufferIndicator].
StreamController<List<int>> _controller;
late final StreamController<List<int>> _controller;

/// A future which completes once the sink has been closed.
Future _doneFuture;
late final Future _doneFuture;

BufferedSink(StreamSink<List<int>> dataSink) {
bufferIndicator.markBuffered();
Expand Down
43 changes: 19 additions & 24 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class Connection {
final bool isClientConnection;

/// Active state handler for this connection.
ActiveStateHandler onActiveStateChanged;
ActiveStateHandler? onActiveStateChanged;

final Completer<void> _onInitialPeerSettingsReceived = Completer<void>();

Expand All @@ -117,32 +117,32 @@ abstract class Connection {
final FrameDefragmenter _defragmenter = FrameDefragmenter();

/// The outgoing frames of this connection;
FrameWriter _frameWriter;
late FrameWriter _frameWriter;

/// A subscription of incoming [Frame]s.
StreamSubscription<Frame> _frameReaderSubscription;
late StreamSubscription<Frame> _frameReaderSubscription;

/// The incoming connection-level message queue.
ConnectionMessageQueueIn _incomingQueue;
late ConnectionMessageQueueIn _incomingQueue;

/// The outgoing connection-level message queue.
ConnectionMessageQueueOut _outgoingQueue;
late ConnectionMessageQueueOut _outgoingQueue;

/// The ping handler used for making pings & handling remote pings.
PingHandler _pingHandler;
late PingHandler _pingHandler;

/// The settings handler used for changing settings & for handling remote
/// setting changes.
SettingsHandler _settingsHandler;
late SettingsHandler _settingsHandler;

/// The set of active streams this connection has.
StreamHandler _streams;
late StreamHandler _streams;

/// The connection-level flow control window handler for outgoing messages.
OutgoingConnectionWindowHandler _connectionWindowHandler;
late OutgoingConnectionWindowHandler _connectionWindowHandler;

/// The state of this connection.
ConnectionState _state;
late ConnectionState _state;

Connection(Stream<List<int>> incoming, StreamSink<List<int>> outgoing,
Settings settings,
Expand Down Expand Up @@ -241,19 +241,18 @@ abstract class Connection {
// By default a endpoitn can make an unlimited number of concurrent streams.
if (settings.concurrentStreamLimit != null) {
settingsList.add(Setting(Setting.SETTINGS_MAX_CONCURRENT_STREAMS,
settings.concurrentStreamLimit));
settings.concurrentStreamLimit!));
}

// By default the stream level flow control window is 64 KiB.
if (settings.streamWindowSize != null) {
settingsList.add(Setting(
Setting.SETTINGS_INITIAL_WINDOW_SIZE, settings.streamWindowSize));
Setting.SETTINGS_INITIAL_WINDOW_SIZE, settings.streamWindowSize!));
}

if (settings is ClientSettings) {
// By default the server is allowed to do server pushes.
if (settings.allowServerPushes == null ||
settings.allowServerPushes == false) {
if (!settings.allowServerPushes) {
settingsList.add(Setting(Setting.SETTINGS_ENABLE_PUSH, 0));
}
} else if (settings is ServerSettings) {
Expand All @@ -278,12 +277,8 @@ abstract class Connection {
_finishing(active: true);

// TODO: There is probably more we need to wait for.
return _streams.done.whenComplete(() {
var futures = [_frameWriter.close()];
var f = _frameReaderSubscription.cancel();
if (f != null) futures.add(f);
return Future.wait(futures);
});
return _streams.done.whenComplete(() =>
Future.wait([_frameWriter.close(), _frameReaderSubscription.cancel()]));
}

/// Terminates this connection forcefully.
Expand All @@ -293,7 +288,7 @@ abstract class Connection {

void _activeStateHandler(bool isActive) {
if (onActiveStateChanged != null) {
onActiveStateChanged(isActive);
onActiveStateChanged!(isActive);
}
}

Expand All @@ -319,7 +314,7 @@ abstract class Connection {
}
}

void _handleFrameImpl(Frame frame) {
void _handleFrameImpl(Frame? frame) {
// The first frame from the other side must be a [SettingsFrame], otherwise
// we terminate the connection.
if (_state.isInitialized) {
Expand Down Expand Up @@ -369,7 +364,7 @@ abstract class Connection {
}
}

void _finishing({bool active = true, String message}) {
void _finishing({bool active = true, String? message}) {
// If this connection is already dead, we return.
if (_state.isTerminated) return;

Expand Down Expand Up @@ -407,7 +402,7 @@ abstract class Connection {
///
/// The returned future will never complete with an error.
Future _terminate(int errorCode,
{bool causedByTransportError = false, String message}) {
{bool causedByTransportError = false, String? message}) {
// TODO: When do we complete here?
if (_state.state != ConnectionState.Terminated) {
_state.state = ConnectionState.Terminated;
Expand Down
11 changes: 5 additions & 6 deletions lib/src/connection_preface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ const List<int> CONNECTION_PREFACE = [
/// connection preface. If an error occurs while reading the connection
/// preface, the returned stream will have only an error.
Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
StreamController<List<int>> result;
StreamSubscription subscription;
late StreamController<List<int>> result;
late StreamSubscription subscription;
var connectionPrefaceRead = false;
var prefaceBuffer = <int>[];
var terminated = false;

void terminate(error) {
void terminate(Object error) {
if (!terminated) {
result.addError(error);
result.close();
Expand All @@ -64,7 +64,6 @@ Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
return false;
}
}
prefaceBuffer = null;
connectionPrefaceRead = true;
return true;
}
Expand Down Expand Up @@ -99,9 +98,9 @@ Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
result = StreamController(
onListen: () {
subscription = incoming.listen(onData,
onError: (e, StackTrace s) => result.addError(e, s),
onError: (Object e, StackTrace s) => result.addError(e, s),
onDone: () {
if (prefaceBuffer != null) {
if (!connectionPrefaceRead) {
terminate('EOS before connection preface could be read.');
} else {
result.close();
Expand Down
2 changes: 1 addition & 1 deletion lib/src/error_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TerminatableMixin {

bool get wasTerminated => _terminated;

void onTerminated(error) {
void onTerminated(Object? error) {
// Subclasses can override this method if they want.
}

Expand Down
8 changes: 4 additions & 4 deletions lib/src/flowcontrol/connection_queues.dart
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ class ConnectionMessageQueueIn extends Object

// TODO: Do we need to do a runtime check here and
// raise a protocol error if we cannot find the registered stream?
var streamMQ = _stream2messageQueue[streamId];
var pendingMessages = _stream2pendingMessages[streamId];
var streamMQ = _stream2messageQueue[streamId]!;
var pendingMessages = _stream2pendingMessages[streamId]!;
pendingMessages.addLast(message);
_tryDispatch(streamId, streamMQ, pendingMessages);
}
Expand All @@ -304,7 +304,7 @@ class ConnectionMessageQueueIn extends Object

// TODO: Do we need to do a runtime check here and
// raise a protocol error if we cannot find the registered stream?
var streamMQ = _stream2messageQueue[streamId];
var streamMQ = _stream2messageQueue[streamId]!;
streamMQ.enqueueMessage(message);
}

Expand Down Expand Up @@ -336,7 +336,7 @@ class ConnectionMessageQueueIn extends Object
void forceDispatchIncomingMessages() {
final toBeRemoved = <int>{};
_stream2pendingMessages.forEach((int streamId, Queue<Message> messages) {
final mq = _stream2messageQueue[streamId];
final mq = _stream2messageQueue[streamId]!;
while (messages.isNotEmpty) {
_count--;
final message = messages.removeFirst();
Expand Down
4 changes: 2 additions & 2 deletions lib/src/flowcontrol/stream_queues.dart
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ class StreamMessageQueueIn extends Object
final Queue<Message> _pendingMessages = Queue<Message>();

/// The [StreamController] used for producing the [messages] stream.
StreamController<StreamMessage> _incomingMessagesC;
late StreamController<StreamMessage> _incomingMessagesC;

/// The [StreamController] used for producing the [serverPushes] stream.
StreamController<TransportStreamPush> _serverPushStreamsC;
late StreamController<TransportStreamPush> _serverPushStreamsC;

StreamMessageQueueIn(this.windowHandler) {
// We start by marking it as buffered, since no one is listening yet and
Expand Down
14 changes: 7 additions & 7 deletions lib/src/frames/frame_defragmenter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import 'frames.dart';
// (since we're buffering all of them).
class FrameDefragmenter {
/// The current incomplete [HeadersFrame] fragment.
HeadersFrame _headersFrame;
HeadersFrame? _headersFrame;

/// The current incomplete [PushPromiseFrame] fragment.
PushPromiseFrame _pushPromiseFrame;
PushPromiseFrame? _pushPromiseFrame;

/// Tries to defragment [frame].
///
Expand All @@ -29,14 +29,14 @@ class FrameDefragmenter {
/// All other [Frame] types will be returned.
// TODO: Consider handling continuation frames without preceding
// headers/push-promise frame here instead of the call site?
Frame tryDefragmentFrame(Frame frame) {
Frame? tryDefragmentFrame(Frame? frame) {
if (_headersFrame != null) {
if (frame is ContinuationFrame) {
if (_headersFrame.header.streamId != frame.header.streamId) {
if (_headersFrame!.header.streamId != frame.header.streamId) {
throw ProtocolException(
'Defragmentation: frames have different stream ids.');
}
_headersFrame = _headersFrame.addBlockContinuation(frame);
_headersFrame = _headersFrame!.addBlockContinuation(frame);

if (frame.hasEndHeadersFlag) {
var frame = _headersFrame;
Expand All @@ -52,11 +52,11 @@ class FrameDefragmenter {
}
} else if (_pushPromiseFrame != null) {
if (frame is ContinuationFrame) {
if (_pushPromiseFrame.header.streamId != frame.header.streamId) {
if (_pushPromiseFrame!.header.streamId != frame.header.streamId) {
throw ProtocolException(
'Defragmentation: frames have different stream ids.');
}
_pushPromiseFrame = _pushPromiseFrame.addBlockContinuation(frame);
_pushPromiseFrame = _pushPromiseFrame!.addBlockContinuation(frame);

if (frame.hasEndHeadersFlag) {
var frame = _pushPromiseFrame;
Expand Down
Loading

0 comments on commit 85c49cf

Please sign in to comment.