Skip to content

Commit

Permalink
Revert "Switch to callbacks for pings"
Browse files Browse the repository at this point in the history
This reverts commit d90d617.
  • Loading branch information
mosuem committed Jan 8, 2024
1 parent d90d617 commit a2df36c
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 76 deletions.
3 changes: 2 additions & 1 deletion lib/src/client/http2_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class Http2ClientConnection implements connection.ClientConnection {
},
onPingTimeout: () => shutdown(),
);
transport.frameReceived = keepAliveManager?.onFrameReceived;
transport.onFrameReceived
.listen((_) => keepAliveManager?.onFrameReceived());
}
_connectionLifeTimer
..reset()
Expand Down
8 changes: 4 additions & 4 deletions lib/src/server/handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class ServerHandler extends ServiceCall {
final X509Certificate? _clientCertificate;
final InternetAddress? _remoteAddress;

/// Callback everytime data is received
final void Function()? onDataReceived;
/// Emits a ping everytime data is received
final Sink<void>? onDataReceived;

final Completer<void> _isCanceledCompleter = Completer<void>();

Expand Down Expand Up @@ -148,7 +148,7 @@ class ServerHandler extends ServiceCall {
// -- Idle state, incoming data --

void _onDataIdle(GrpcMessage headerMessage) async {
onDataReceived?.call();
onDataReceived?.add(null);
if (headerMessage is! GrpcMetadata) {
_sendError(GrpcError.unimplemented('Expected header frame'));
_sinkIncoming();
Expand Down Expand Up @@ -289,7 +289,7 @@ class ServerHandler extends ServiceCall {
return;
}

onDataReceived?.call();
onDataReceived?.add(null);
final data = message;
Object? request;
try {
Expand Down
24 changes: 12 additions & 12 deletions lib/src/server/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,25 @@ class ConnectionServer {
required ServerTransportConnection connection,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
required ServerKeepAlive serverKeepAlive,
}) async {
_connections.add(connection);
handlers[connection] = [];
// TODO(jakobr): Set active state handlers, close connection after idle
// timeout.

serverKeepAlive.tooManyBadPings =
() async => await connection.terminate(ErrorCode.ENHANCE_YOUR_CALM);
final onDataReceivedController = StreamController<void>();
ServerKeepAlive(
options: _keepAliveOptions,
tooManyBadPings: () async =>
await connection.terminate(ErrorCode.ENHANCE_YOUR_CALM),
pingNotifier: connection.onPingReceived,
dataNotifier: onDataReceivedController.stream,
).handle();
connection.incomingStreams.listen((stream) {
final handler = serveStream_(
stream: stream,
clientCertificate: clientCertificate,
remoteAddress: remoteAddress,
onDataReceived: serverKeepAlive.onDataReceived,
onDataReceived: onDataReceivedController.sink,
);
handler.onCanceled.then((_) => handlers[connection]?.remove(handler));
handlers[connection]!.add(handler);
Expand All @@ -149,6 +153,7 @@ class ConnectionServer {
}
_connections.remove(connection);
handlers.remove(connection);
await onDataReceivedController.close();
});
}

Expand All @@ -157,7 +162,7 @@ class ConnectionServer {
required ServerTransportStream stream,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
void Function()? onDataReceived,
Sink<void>? onDataReceived,
}) {
return ServerHandler(
stream: stream,
Expand Down Expand Up @@ -274,22 +279,17 @@ class Server extends ConnectionServer {
clientCertificate = socket.peerCertificate;
}

final serverKeepAlive = ServerKeepAlive(options: _keepAliveOptions);
final connection = ServerTransportConnection.viaSocket(
socket,
settings: http2ServerSettings,
pingReceived: serverKeepAlive.onPingReceived,
);
connection.pingReceived = serverKeepAlive.onPingReceived;

serveConnection(
connection: connection,
clientCertificate: clientCertificate,
remoteAddress: socket.remoteAddressOrNull,
serverKeepAlive: serverKeepAlive,
);
}, onError: (error, stackTrace) {
print('error');
if (error is Error) {
Zone.current.handleUncaughtError(error, stackTrace);
}
Expand All @@ -302,7 +302,7 @@ class Server extends ConnectionServer {
required ServerTransportStream stream,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
void Function()? onDataReceived,
Sink<void>? onDataReceived,
}) {
return ServerHandler(
stream: stream,
Expand Down
24 changes: 20 additions & 4 deletions lib/src/server/server_keepalive.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,38 @@ class ServerKeepAliveOptions {
class ServerKeepAlive {
/// What to do after receiving too many bad pings, probably shut down the
/// connection to not be DDoSed.
Future<void> Function()? tooManyBadPings;
final Future<void> Function()? tooManyBadPings;

final ServerKeepAliveOptions options;

/// A stream of events for every time the server gets pinged.
final Stream<void> pingNotifier;

/// A stream of events for every time the server receives data.
final Stream<void> dataNotifier;

int _badPings = 0;
Stopwatch? _timeOfLastReceivedPing;

ServerKeepAlive({
this.tooManyBadPings,
required this.options,
required this.pingNotifier,
required this.dataNotifier,
});

void handle() {
// If we don't care about bad pings, there is not point in listening to
// events.
if (_enforcesMaxBadPings) {
pingNotifier.listen((_) => _onPingReceived());
dataNotifier.listen((_) => _onDataReceived());
}
}

bool get _enforcesMaxBadPings => (options.maxBadPings ?? 0) > 0;

Future<void> onPingReceived(int _) async {
Future<void> _onPingReceived() async {
if (_enforcesMaxBadPings) {
if (_timeOfLastReceivedPing == null) {
_timeOfLastReceivedPing = clock.stopwatch()
Expand All @@ -65,13 +82,12 @@ class ServerKeepAlive {
_badPings++;
}
if (_badPings > options.maxBadPings!) {
// print('Call too many bad pings');
await tooManyBadPings?.call();
}
}
}

void onDataReceived() {
void _onDataReceived() {
if (_enforcesMaxBadPings) {
_badPings = 0;
_timeOfLastReceivedPing = null;
Expand Down
3 changes: 0 additions & 3 deletions pubspec_overrides.yaml

This file was deleted.

5 changes: 3 additions & 2 deletions test/client_tests/client_keepalive_manager_test.mocks.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Mocks generated by Mockito 5.4.2 from annotations
// Mocks generated by Mockito 5.4.1 from annotations
// in grpc/test/client_tests/client_keepalive_manager_test.dart.
// Do not manually edit this file.

// @dart=2.19

// ignore_for_file: no_leading_underscores_for_library_prefixes
import 'package:mockito/mockito.dart' as _i1;

Expand Down Expand Up @@ -30,7 +32,6 @@ class MockPinger extends _i1.Mock implements _i2.Pinger {
),
returnValueForMissingStub: null,
);

@override
void onPingTimeout() => super.noSuchMethod(
Invocation.method(
Expand Down
2 changes: 1 addition & 1 deletion test/keepalive_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class FakeEchoService extends EchoServiceBase {
@override
Stream<ServerStreamingEchoResponse> serverStreamingEcho(
ServiceCall call, ServerStreamingEchoRequest request) {
// TODO: implement serverStreamingEcho
// TODO: implement serverStreamingEcho
throw UnimplementedError();
}
}
46 changes: 29 additions & 17 deletions test/server_keepalive_manager_test.dart
Original file line number Diff line number Diff line change
@@ -1,45 +1,57 @@
import 'dart:async';

import 'package:fake_async/fake_async.dart';
import 'package:grpc/src/server/server_keepalive.dart';
import 'package:test/test.dart';

void main() {
late StreamController pingStream;
late StreamController dataStream;
late int maxBadPings;

var goAway = false;

ServerKeepAlive initServer([ServerKeepAliveOptions? options]) =>
ServerKeepAlive(
void initServer([ServerKeepAliveOptions? options]) => ServerKeepAlive(
options: options ??
ServerKeepAliveOptions(
maxBadPings: maxBadPings,
minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5),
),
pingNotifier: pingStream.stream,
dataNotifier: dataStream.stream,
tooManyBadPings: () async => goAway = true,
);
).handle();

setUp(() {
pingStream = StreamController();
dataStream = StreamController();
maxBadPings = 10;
goAway = false;
});

tearDown(() {
pingStream.close();
dataStream.close();
});

final timeAfterPing = Duration(milliseconds: 10);

test('Sending too many pings without data kills connection', () async {
FakeAsync().run((async) {
final server = initServer();
initServer();
// Send good ping
server.onPingReceived(0);
pingStream.sink.add(null);
async.elapse(timeAfterPing);

// Send [maxBadPings] bad pings, that's still ok
for (var i = 0; i < maxBadPings; i++) {
server.onPingReceived(0);
pingStream.sink.add(null);
}
async.elapse(timeAfterPing);
expect(goAway, false);

// Send another bad ping; that's one too many!
server.onPingReceived(0);
pingStream.sink.add(null);
async.elapse(timeAfterPing);
expect(goAway, true);
});
Expand All @@ -48,17 +60,17 @@ void main() {
'Sending too many pings without data doesn`t kill connection if the server doesn`t care',
() async {
FakeAsync().run((async) {
final server = initServer(ServerKeepAliveOptions(
initServer(ServerKeepAliveOptions(
maxBadPings: null,
minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5),
));
// Send good ping
server.onPingReceived(0);
pingStream.sink.add(null);
async.elapse(timeAfterPing);

// Send a lot of bad pings, that's still ok.
for (var i = 0; i < 50; i++) {
server.onPingReceived(0);
pingStream.sink.add(null);
}
async.elapse(timeAfterPing);
expect(goAway, false);
Expand All @@ -67,36 +79,36 @@ void main() {

test('Sending many pings with data doesn`t kill connection', () async {
FakeAsync().run((async) {
final server = initServer();
initServer();

// Send good ping
server.onPingReceived(0);
pingStream.sink.add(null);
async.elapse(timeAfterPing);

// Send [maxBadPings] bad pings, that's still ok
for (var i = 0; i < maxBadPings; i++) {
server.onPingReceived(0);
pingStream.sink.add(null);
}
async.elapse(timeAfterPing);
expect(goAway, false);

// Sending data resets the bad ping count
server.onDataReceived();
dataStream.add(null);
async.elapse(timeAfterPing);

// Send good ping
server.onPingReceived(0);
pingStream.sink.add(null);
async.elapse(timeAfterPing);

// Send [maxBadPings] bad pings, that's still ok
for (var i = 0; i < maxBadPings; i++) {
server.onPingReceived(0);
pingStream.sink.add(null);
}
async.elapse(timeAfterPing);
expect(goAway, false);

// Send another bad ping; that's one too many!
server.onPingReceived(0);
pingStream.sink.add(null);
async.elapse(timeAfterPing);
expect(goAway, true);
});
Expand Down
Loading

0 comments on commit a2df36c

Please sign in to comment.