From 491a828248d443dce34e47e53c94ccb976fd25f6 Mon Sep 17 00:00:00 2001 From: Ben Getsug Date: Tue, 22 Nov 2022 14:00:56 -0600 Subject: [PATCH 1/7] added response error handler --- CHANGELOG.md | 1 + lib/src/server/handler.dart | 28 ++++++++++++++-------------- lib/src/server/server.dart | 7 ++++++- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9338303a..916e148b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 3.1.1-dev * Require Dart 2.17 or greater. +* Fix issue [#51](https://github.com/grpc/grpc-dart/issues/51), add support for response error handling. ## 3.1.0 diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 052be92d..904df2a4 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -35,6 +35,7 @@ class ServerHandlerImpl extends ServiceCall { final Service? Function(String service) _serviceLookup; final List _interceptors; final CodecRegistry? _codecRegistry; + final Function? _responseErrorHandler; // ignore: cancel_subscriptions StreamSubscription? _incomingSubscription; @@ -61,9 +62,14 @@ class ServerHandlerImpl extends ServiceCall { Timer? _timeoutTimer; final X509Certificate? _clientCertificate; - ServerHandlerImpl(this._serviceLookup, this._stream, this._interceptors, - this._codecRegistry, - [this._clientCertificate]); + ServerHandlerImpl( + this._serviceLookup, + this._stream, + this._interceptors, + this._codecRegistry, [ + this._clientCertificate, + this._responseErrorHandler, + ]); @override DateTime? get deadline => _deadline; @@ -293,7 +299,11 @@ class ServerHandlerImpl extends ServiceCall { sendTrailers(); } - void _onResponseError(error) { + void _onResponseError(error, trace) { + if (_responseErrorHandler != null) { + _responseErrorHandler!(error, trace); + } + if (error is GrpcError) { _sendError(error); } else { @@ -424,13 +434,3 @@ class ServerHandlerImpl extends ServiceCall { _cancelResponseSubscription(); } } - -class ServerHandler extends ServerHandlerImpl { - // ignore: use_super_parameters - ServerHandler(Service Function(String service) serviceLookup, stream, - [List interceptors = const [], - CodecRegistry? codecRegistry, - X509Certificate? clientCertificate]) - : super(serviceLookup, stream, interceptors, codecRegistry, - clientCertificate); -} diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index 9fbf8712..a4cb2ece 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -87,6 +87,7 @@ class ConnectionServer { final Map _services = {}; final List _interceptors; final CodecRegistry? _codecRegistry; + final Function? _responseErrorHandler; final _connections = []; @@ -95,8 +96,10 @@ class ConnectionServer { List services, [ List interceptors = const [], CodecRegistry? codecRegistry, + Function? responseErrorHandler, ]) : _codecRegistry = codecRegistry, - _interceptors = interceptors { + _interceptors = interceptors, + _responseErrorHandler = responseErrorHandler { for (final service in services) { _services[service.$name] = service; } @@ -133,6 +136,7 @@ class ConnectionServer { lookupService, stream, _interceptors, _codecRegistry, // ignore: unnecessary_cast clientCertificate as io_bits.X509Certificate?, + _responseErrorHandler, )..handle(); } } @@ -232,6 +236,7 @@ class Server extends ConnectionServer { _codecRegistry, // ignore: unnecessary_cast clientCertificate as io_bits.X509Certificate?, + _responseErrorHandler, )..handle(); } From 1ab0717d6fda24dbf498349ba159cbdeb13b6663 Mon Sep 17 00:00:00 2001 From: Ben Getsug Date: Tue, 22 Nov 2022 14:05:06 -0600 Subject: [PATCH 2/7] added response error handler --- lib/src/server/server.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index a4cb2ece..ef6dabd4 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -153,6 +153,7 @@ class Server extends ConnectionServer { super.services, [ super.interceptors, super.codecRegistry, + super.responseErrorHandler, ]); /// The port that the server is listening on, or `null` if the server is not From f1423adf6b7001c04d17a96aa304ca15359a1ff8 Mon Sep 17 00:00:00 2001 From: Ben Getsug Date: Tue, 22 Nov 2022 14:14:57 -0600 Subject: [PATCH 3/7] added response error handler --- lib/src/server/server.dart | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index ef6dabd4..2e27a9f9 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -149,6 +149,7 @@ class Server extends ConnectionServer { SecureServerSocket? _secureServer; /// Create a server for the given [services]. + @Deprecated('use Server.create() instead') Server( super.services, [ super.interceptors, @@ -156,6 +157,14 @@ class Server extends ConnectionServer { super.responseErrorHandler, ]); + /// Create a server for the given [services]. + Server.create({ + required List services, + List interceptors = const [], + CodecRegistry? codecRegistry, + Function? responseErrorHandler, + }) : super(services, interceptors, codecRegistry, responseErrorHandler); + /// The port that the server is listening on, or `null` if the server is not /// active. int? get port { From 04f15c8fdd47656d76b458918e89ce8d45739cc9 Mon Sep 17 00:00:00 2001 From: Ben Getsug Date: Tue, 22 Nov 2022 14:52:20 -0600 Subject: [PATCH 4/7] support for custom error handling --- CHANGELOG.md | 2 +- lib/src/server/handler.dart | 28 ++++++++++++++-------------- lib/src/server/server.dart | 16 ++++++++-------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 916e148b..79d92ef5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## 3.1.1-dev * Require Dart 2.17 or greater. -* Fix issue [#51](https://github.com/grpc/grpc-dart/issues/51), add support for response error handling. +* Fix issue [#51](https://github.com/grpc/grpc-dart/issues/51), add support for custom error handling. ## 3.1.0 diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 904df2a4..29f25e95 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -35,7 +35,7 @@ class ServerHandlerImpl extends ServiceCall { final Service? Function(String service) _serviceLookup; final List _interceptors; final CodecRegistry? _codecRegistry; - final Function? _responseErrorHandler; + final Function? _errorHandler; // ignore: cancel_subscriptions StreamSubscription? _incomingSubscription; @@ -68,7 +68,7 @@ class ServerHandlerImpl extends ServiceCall { this._interceptors, this._codecRegistry, [ this._clientCertificate, - this._responseErrorHandler, + this._errorHandler, ]); @override @@ -260,12 +260,12 @@ class ServerHandlerImpl extends ServiceCall { Object? request; try { request = _descriptor.deserialize(data.data); - } catch (error) { + } catch (error, trace) { final grpcError = GrpcError.internal('Error deserializing request: $error'); - _sendError(grpcError); + _sendError(grpcError, trace); _requests! - ..addError(grpcError) + ..addError(grpcError, trace) ..close(); return; } @@ -282,7 +282,7 @@ class ServerHandlerImpl extends ServiceCall { sendHeaders(); } _stream.sendData(frame(bytes, _callEncodingCodec)); - } catch (error) { + } catch (error, trace) { final grpcError = GrpcError.internal('Error sending response: $error'); if (!_requests!.isClosed) { // If we can, alert the handler that things are going wrong. @@ -290,7 +290,7 @@ class ServerHandlerImpl extends ServiceCall { ..addError(grpcError) ..close(); } - _sendError(grpcError); + _sendError(grpcError, trace); _cancelResponseSubscription(); } } @@ -300,14 +300,10 @@ class ServerHandlerImpl extends ServiceCall { } void _onResponseError(error, trace) { - if (_responseErrorHandler != null) { - _responseErrorHandler!(error, trace); - } - if (error is GrpcError) { - _sendError(error); + _sendError(error, trace); } else { - _sendError(GrpcError.unknown(error.toString())); + _sendError(GrpcError.unknown(error.toString()), trace); } } @@ -420,7 +416,11 @@ class ServerHandlerImpl extends ServiceCall { ..onDone(_onDone); } - void _sendError(GrpcError error) { + void _sendError(GrpcError error, [StackTrace? trace]) { + if (_errorHandler != null) { + _errorHandler!(error, trace); + } + sendTrailers( status: error.code, message: error.message, diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index 2e27a9f9..56fbccfd 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -87,7 +87,7 @@ class ConnectionServer { final Map _services = {}; final List _interceptors; final CodecRegistry? _codecRegistry; - final Function? _responseErrorHandler; + final Function? _errorHandler; final _connections = []; @@ -96,10 +96,10 @@ class ConnectionServer { List services, [ List interceptors = const [], CodecRegistry? codecRegistry, - Function? responseErrorHandler, + Function? errorHandler, ]) : _codecRegistry = codecRegistry, _interceptors = interceptors, - _responseErrorHandler = responseErrorHandler { + _errorHandler = errorHandler { for (final service in services) { _services[service.$name] = service; } @@ -136,7 +136,7 @@ class ConnectionServer { lookupService, stream, _interceptors, _codecRegistry, // ignore: unnecessary_cast clientCertificate as io_bits.X509Certificate?, - _responseErrorHandler, + _errorHandler, )..handle(); } } @@ -154,7 +154,7 @@ class Server extends ConnectionServer { super.services, [ super.interceptors, super.codecRegistry, - super.responseErrorHandler, + super.errorHandler, ]); /// Create a server for the given [services]. @@ -162,8 +162,8 @@ class Server extends ConnectionServer { required List services, List interceptors = const [], CodecRegistry? codecRegistry, - Function? responseErrorHandler, - }) : super(services, interceptors, codecRegistry, responseErrorHandler); + Function? errorHandler, + }) : super(services, interceptors, codecRegistry, errorHandler); /// The port that the server is listening on, or `null` if the server is not /// active. @@ -246,7 +246,7 @@ class Server extends ConnectionServer { _codecRegistry, // ignore: unnecessary_cast clientCertificate as io_bits.X509Certificate?, - _responseErrorHandler, + _errorHandler, )..handle(); } From 7e03d0617a5f22d5e940f9a778dbe2058feff91b Mon Sep 17 00:00:00 2001 From: Ben Getsug Date: Mon, 28 Nov 2022 11:01:42 -0600 Subject: [PATCH 5/7] updated ServerHandler constructor signature per the related changes to Server.create updated examples, tests, and interop to use new Server.create constructor misc. code style/formatting tweaks --- example/helloworld/bin/server.dart | 7 ++- example/helloworld/bin/unix_server.dart | 2 +- example/metadata/lib/src/server.dart | 2 +- example/route_guide/lib/src/server.dart | 2 +- interop/bin/server.dart | 2 +- lib/src/server/handler.dart | 27 ++++++----- lib/src/server/server.dart | 45 +++++++++++-------- test/client_certificate_test.dart | 6 ++- test/client_handles_bad_connections_test.dart | 7 ++- test/grpc_web_server.dart | 3 +- test/round_trip_test.dart | 23 ++++++---- ...server_handles_broken_connection_test.dart | 13 ++++-- test/src/server_utils.dart | 6 ++- test/timeline_test.dart | 2 +- 14 files changed, 92 insertions(+), 55 deletions(-) diff --git a/example/helloworld/bin/server.dart b/example/helloworld/bin/server.dart index 90516677..0f2b2878 100644 --- a/example/helloworld/bin/server.dart +++ b/example/helloworld/bin/server.dart @@ -25,10 +25,9 @@ class GreeterService extends GreeterServiceBase { } Future main(List args) async { - final server = Server( - [GreeterService()], - const [], - CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), + final server = Server.create( + services: [GreeterService()], + codecRegistry: CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), ); await server.serve(port: 50051); print('Server listening on port ${server.port}...'); diff --git a/example/helloworld/bin/unix_server.dart b/example/helloworld/bin/unix_server.dart index 0639d17d..82fbf022 100644 --- a/example/helloworld/bin/unix_server.dart +++ b/example/helloworld/bin/unix_server.dart @@ -29,7 +29,7 @@ class GreeterService extends GreeterServiceBase { Future main(List args) async { final udsAddress = InternetAddress('localhost', type: InternetAddressType.unix); - final server = Server([GreeterService()]); + final server = Server.create(services: [GreeterService()]); await server.serve(address: udsAddress); print('Start UNIX Server @localhost...'); } diff --git a/example/metadata/lib/src/server.dart b/example/metadata/lib/src/server.dart index 26f95b79..27c701f7 100644 --- a/example/metadata/lib/src/server.dart +++ b/example/metadata/lib/src/server.dart @@ -78,7 +78,7 @@ class MetadataService extends MetadataServiceBase { class Server { Future main(List args) async { - final server = grpc.Server([MetadataService()]); + final server = grpc.Server.create(services: [MetadataService()]); await server.serve(port: 8080); print('Server listening on port ${server.port}...'); } diff --git a/example/route_guide/lib/src/server.dart b/example/route_guide/lib/src/server.dart index 23484644..8f7da679 100644 --- a/example/route_guide/lib/src/server.dart +++ b/example/route_guide/lib/src/server.dart @@ -144,7 +144,7 @@ class RouteGuideService extends RouteGuideServiceBase { class Server { Future main(List args) async { - final server = grpc.Server([RouteGuideService()]); + final server = grpc.Server.create(services: [RouteGuideService()]); await server.serve(port: 8080); print('Server listening on port ${server.port}...'); } diff --git a/interop/bin/server.dart b/interop/bin/server.dart index a4cf86ea..7374ed08 100644 --- a/interop/bin/server.dart +++ b/interop/bin/server.dart @@ -131,7 +131,7 @@ Future main(List args) async { final services = [TestService()]; - final server = Server(services); + final server = Server.create(services: services); ServerTlsCredentials? tlsCredentials; if (arguments['use_tls'] == 'true') { diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 29f25e95..bf135129 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -29,10 +29,12 @@ import 'call.dart'; import 'interceptor.dart'; import 'service.dart'; +typedef ServiceLookup = Service? Function(String service); + /// Handles an incoming gRPC call. -class ServerHandlerImpl extends ServiceCall { +class ServerHandler extends ServiceCall { final ServerTransportStream _stream; - final Service? Function(String service) _serviceLookup; + final ServiceLookup _serviceLookup; final List _interceptors; final CodecRegistry? _codecRegistry; final Function? _errorHandler; @@ -62,14 +64,19 @@ class ServerHandlerImpl extends ServiceCall { Timer? _timeoutTimer; final X509Certificate? _clientCertificate; - ServerHandlerImpl( - this._serviceLookup, - this._stream, - this._interceptors, - this._codecRegistry, [ - this._clientCertificate, - this._errorHandler, - ]); + ServerHandler({ + required ServerTransportStream stream, + required ServiceLookup serviceLookup, + required List interceptors, + required CodecRegistry? codecRegistry, + X509Certificate? clientCertificate, + Function? errorHandler, + }) : _stream = stream, + _serviceLookup = serviceLookup, + _interceptors = interceptors, + _codecRegistry = codecRegistry, + _clientCertificate = clientCertificate, + _errorHandler = errorHandler; @override DateTime? get deadline => _deadline; diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index 56fbccfd..0e91ddf7 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -107,10 +107,12 @@ class ConnectionServer { Service? lookupService(String service) => _services[service]; - Future serveConnection(ServerTransportConnection connection, - [X509Certificate? clientCertificate]) async { + Future serveConnection( + ServerTransportConnection connection, [ + X509Certificate? clientCertificate, + ]) async { _connections.add(connection); - ServerHandlerImpl? handler; + ServerHandler? handler; // TODO(jakobr): Set active state handlers, close connection after idle // timeout. connection.incomingStreams.listen((stream) { @@ -130,13 +132,18 @@ class ConnectionServer { } @visibleForTesting - ServerHandlerImpl serveStream_(ServerTransportStream stream, - [X509Certificate? clientCertificate]) { - return ServerHandlerImpl( - lookupService, stream, _interceptors, _codecRegistry, + ServerHandler serveStream_( + ServerTransportStream stream, [ + X509Certificate? clientCertificate, + ]) { + return ServerHandler( + stream: stream, + serviceLookup: lookupService, + interceptors: _interceptors, + codecRegistry: _codecRegistry, // ignore: unnecessary_cast - clientCertificate as io_bits.X509Certificate?, - _errorHandler, + clientCertificate: clientCertificate as io_bits.X509Certificate?, + errorHandler: _errorHandler, )..handle(); } } @@ -237,16 +244,18 @@ class Server extends ConnectionServer { @override @visibleForTesting - ServerHandlerImpl serveStream_(ServerTransportStream stream, - [X509Certificate? clientCertificate]) { - return ServerHandlerImpl( - lookupService, - stream, - _interceptors, - _codecRegistry, + ServerHandler serveStream_( + ServerTransportStream stream, [ + X509Certificate? clientCertificate, + ]) { + return ServerHandler( + stream: stream, + serviceLookup: lookupService, + interceptors: _interceptors, + codecRegistry: _codecRegistry, // ignore: unnecessary_cast - clientCertificate as io_bits.X509Certificate?, - _errorHandler, + clientCertificate: clientCertificate as io_bits.X509Certificate?, + errorHandler: _errorHandler, )..handle(); } diff --git a/test/client_certificate_test.dart b/test/client_certificate_test.dart index 3652874f..238b932e 100644 --- a/test/client_certificate_test.dart +++ b/test/client_certificate_test.dart @@ -25,6 +25,7 @@ class EchoService extends EchoServiceBase { } const String address = 'localhost'; + Future main() async { test('Client certificate required', () async { // Server @@ -80,7 +81,7 @@ Future main() async { } Future _setUpServer([bool requireClientCertificate = false]) async { - final server = Server([EchoService()]); + final server = Server.create(services: [EchoService()]); final serverContext = SecurityContextServerCredentials.baseSecurityContext(); serverContext.useCertificateChain('test/data/localhost.crt'); serverContext.usePrivateKey('test/data/localhost.key'); @@ -102,6 +103,7 @@ class SecurityContextChannelCredentials extends ChannelCredentials { {super.authority, super.onBadCertificate}) : _securityContext = securityContext, super.secure(); + @override SecurityContext get securityContext => _securityContext; @@ -116,8 +118,10 @@ class SecurityContextServerCredentials extends ServerTlsCredentials { SecurityContextServerCredentials(SecurityContext securityContext) : _securityContext = securityContext, super(); + @override SecurityContext get securityContext => _securityContext; + static SecurityContext baseSecurityContext() { return createSecurityContext(true); } diff --git a/test/client_handles_bad_connections_test.dart b/test/client_handles_bad_connections_test.dart index 7b40f435..986a7508 100644 --- a/test/client_handles_bad_connections_test.dart +++ b/test/client_handles_bad_connections_test.dart @@ -17,6 +17,7 @@ class TestClient extends grpc.Client { (List value) => value[0]); TestClient(super.channel); + grpc.ResponseStream stream(int request, {grpc.CallOptions? options}) { return $createStreamingCall(_$stream, Stream.value(request), options: options); @@ -42,11 +43,13 @@ class TestService extends grpc.Service { class FixedConnectionClientChannel extends ClientChannelBase { final Http2ClientConnection clientConnection; List states = []; + FixedConnectionClientChannel(this.clientConnection) { onConnectionStateChanged.listen((state) { states.add(state); }); } + @override ClientConnection createConnection() => clientConnection; } @@ -55,7 +58,7 @@ Future main() async { testTcpAndUds('client reconnects after the connection gets old', (address) async { // client reconnect after a short delay. - final server = grpc.Server([TestService()]); + final server = grpc.Server.create(services: [TestService()]); await server.serve(address: address, port: 0); final channel = FixedConnectionClientChannel(Http2ClientConnection( @@ -80,7 +83,7 @@ Future main() async { testTcpAndUds('client reconnects when stream limit is used', (address) async { // client reconnect after setting stream limit. - final server = grpc.Server([TestService()]); + final server = grpc.Server.create(services: [TestService()]); await server.serve( address: address, port: 0, diff --git a/test/grpc_web_server.dart b/test/grpc_web_server.dart index d9354887..8f875553 100644 --- a/test/grpc_web_server.dart +++ b/test/grpc_web_server.dart @@ -89,6 +89,7 @@ static_resources: // with an error. Otherwise if verbose is specified it will be dumped // to stdout unconditionally. final output = []; + void _info(String line) { if (!verbose) { output.add(line); @@ -99,7 +100,7 @@ void _info(String line) { Future hybridMain(StreamChannel channel) async { // Spawn a gRPC server. - final server = Server([EchoService()]); + final server = Server.create(services: [EchoService()]); await server.serve(port: 0); _info('grpc server listening on ${server.port}'); diff --git a/test/round_trip_test.dart b/test/round_trip_test.dart index 12d34b25..4a452248 100644 --- a/test/round_trip_test.dart +++ b/test/round_trip_test.dart @@ -15,6 +15,7 @@ class TestClient extends Client { (int value) => [value], (List value) => value[0]); TestClient(super.channel); + ResponseStream stream(int request, {CallOptions? options}) { return $createStreamingCall(_$stream, Stream.value(request), options: options); @@ -78,9 +79,11 @@ class TestServiceWithGrpcError extends TestService { class FixedConnectionClientChannel extends ClientChannelBase { final Http2ClientConnection clientConnection; List states = []; + FixedConnectionClientChannel(this.clientConnection) { onConnectionStateChanged.listen((state) => states.add(state)); } + @override ClientConnection createConnection() => clientConnection; } @@ -88,7 +91,7 @@ class FixedConnectionClientChannel extends ClientChannelBase { Future main() async { testTcpAndUds('round trip insecure connection', (address) async { // round trip test of insecure connection. - final server = Server([TestService()]); + final server = Server.create(services: [TestService()]); await server.serve(address: address, port: 0); final channel = FixedConnectionClientChannel(Http2ClientConnection( @@ -104,7 +107,8 @@ Future main() async { testUds('UDS provides valid default authority', (address) async { // round trip test of insecure connection. - final server = Server([TestService(expectedAuthority: 'localhost')]); + final server = + Server.create(services: [TestService(expectedAuthority: 'localhost')]); await server.serve(address: address, port: 0); final channel = FixedConnectionClientChannel(Http2ClientConnection( @@ -120,8 +124,10 @@ Future main() async { testTcpAndUds('round trip with outgoing and incoming compression', (address) async { - final server = Server( - [TestService()], const [], CodecRegistry(codecs: const [GzipCodec()])); + final server = Server.create( + services: [TestService()], + codecRegistry: CodecRegistry(codecs: const [GzipCodec()]), + ); await server.serve(address: address, port: 0); final channel = FixedConnectionClientChannel(Http2ClientConnection( @@ -144,7 +150,7 @@ Future main() async { testTcpAndUds('round trip secure connection', (address) async { // round trip test of secure connection. - final server = Server([TestService()]); + final server = Server.create(services: [TestService()]); await server.serve( address: address, port: 0, @@ -167,7 +173,8 @@ Future main() async { }); test('exception in onMetadataException', () async { - final server = Server([TestServiceWithOnMetadataException()]); + final server = + Server.create(services: [TestServiceWithOnMetadataException()]); await server.serve(address: 'localhost', port: 0); final channel = FixedConnectionClientChannel(Http2ClientConnection( @@ -183,7 +190,7 @@ Future main() async { }); test('cancellation of streaming subscription propagates properly', () async { - final server = Server([TestService()]); + final server = Server.create(services: [TestService()]); await server.serve(address: 'localhost', port: 0); final channel = FixedConnectionClientChannel(Http2ClientConnection( @@ -198,7 +205,7 @@ Future main() async { }); test('trailers on server GrpcError', () async { - final server = Server([TestServiceWithGrpcError()]); + final server = Server.create(services: [TestServiceWithGrpcError()]); await server.serve(address: 'localhost', port: 0); final channel = FixedConnectionClientChannel(Http2ClientConnection( diff --git a/test/server_handles_broken_connection_test.dart b/test/server_handles_broken_connection_test.dart index 799c0908..cd6d1fd6 100644 --- a/test/server_handles_broken_connection_test.dart +++ b/test/server_handles_broken_connection_test.dart @@ -2,8 +2,10 @@ import 'dart:async'; import 'dart:io'; import 'dart:isolate'; + import 'package:grpc/grpc.dart' as grpc; import 'package:test/test.dart'; + import 'common.dart'; class TestClient extends grpc.Client { @@ -13,6 +15,7 @@ class TestClient extends grpc.Client { (List value) => value[0]); TestClient(grpc.ClientChannel super.channel); + grpc.ResponseStream infiniteStream(int request, {grpc.CallOptions? options}) { return $createStreamingCall(_$infiniteStream, Stream.value(request), @@ -52,6 +55,7 @@ class ClientData { final InternetAddress address; final int port; final SendPort sendPort; + ClientData( {required this.address, required this.port, required this.sendPort}); } @@ -77,11 +81,12 @@ Future main() async { (address) async { // interrrupt the connect of client, the server does not crash. late grpc.Server server; - server = grpc.Server([ + server = grpc.Server.create(services: [ TestService( - finallyCallback: expectAsync0(() { - expect(server.shutdown(), completes); - }, reason: 'the producer should get cancelled')) + finallyCallback: expectAsync0(() { + expect(server.shutdown(), completes); + }, reason: 'the producer should get cancelled'), + ) ]); await server.serve(address: address, port: 0); final receivePort = ReceivePort(); diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart index 8e92c2e5..3fca1788 100644 --- a/test/src/server_utils.dart +++ b/test/src/server_utils.dart @@ -120,8 +120,10 @@ class TestServerStream extends ServerTransportStream { class ServerHarness extends _Harness { @override - ConnectionServer createServer() => - Server([service], [interceptor]); + ConnectionServer createServer() => Server.create( + services: [service], + interceptors: [interceptor], + ); static ServiceMethod createMethod(String name, Function methodHandler, bool clientStreaming, bool serverStreaming) { diff --git a/test/timeline_test.dart b/test/timeline_test.dart index f90681d1..c4123a10 100644 --- a/test/timeline_test.dart +++ b/test/timeline_test.dart @@ -132,7 +132,7 @@ TimelineTask fakeTimelineTaskFactory( FakeTimelineTask(filterKey: filterKey, parent: parent); Future testee() async { - final server = Server([TestService()]); + final server = Server.create(services: [TestService()]); await server.serve(address: 'localhost', port: 0); isTimelineLoggingEnabled = true; timelineTaskFactory = fakeTimelineTaskFactory; From 10f96c50f0d6df250efcfd6bf96317edbfa8e0bf Mon Sep 17 00:00:00 2001 From: Ben Getsug Date: Wed, 30 Nov 2022 09:26:42 -0600 Subject: [PATCH 6/7] added typedef for server error handler --- lib/src/server/handler.dart | 9 ++++----- lib/src/server/server.dart | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index bf135129..408317e2 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -30,6 +30,7 @@ import 'interceptor.dart'; import 'service.dart'; typedef ServiceLookup = Service? Function(String service); +typedef GrpcErrorHandler = void Function(GrpcError error, [StackTrace? trace]); /// Handles an incoming gRPC call. class ServerHandler extends ServiceCall { @@ -37,7 +38,7 @@ class ServerHandler extends ServiceCall { final ServiceLookup _serviceLookup; final List _interceptors; final CodecRegistry? _codecRegistry; - final Function? _errorHandler; + final GrpcErrorHandler? _errorHandler; // ignore: cancel_subscriptions StreamSubscription? _incomingSubscription; @@ -70,7 +71,7 @@ class ServerHandler extends ServiceCall { required List interceptors, required CodecRegistry? codecRegistry, X509Certificate? clientCertificate, - Function? errorHandler, + GrpcErrorHandler? errorHandler, }) : _stream = stream, _serviceLookup = serviceLookup, _interceptors = interceptors, @@ -424,9 +425,7 @@ class ServerHandler extends ServiceCall { } void _sendError(GrpcError error, [StackTrace? trace]) { - if (_errorHandler != null) { - _errorHandler!(error, trace); - } + _errorHandler?.call(error, trace); sendTrailers( status: error.code, diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index 0e91ddf7..200319f1 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -87,7 +87,7 @@ class ConnectionServer { final Map _services = {}; final List _interceptors; final CodecRegistry? _codecRegistry; - final Function? _errorHandler; + final GrpcErrorHandler? _errorHandler; final _connections = []; @@ -96,7 +96,7 @@ class ConnectionServer { List services, [ List interceptors = const [], CodecRegistry? codecRegistry, - Function? errorHandler, + GrpcErrorHandler? errorHandler, ]) : _codecRegistry = codecRegistry, _interceptors = interceptors, _errorHandler = errorHandler { @@ -169,7 +169,7 @@ class Server extends ConnectionServer { required List services, List interceptors = const [], CodecRegistry? codecRegistry, - Function? errorHandler, + GrpcErrorHandler? errorHandler, }) : super(services, interceptors, codecRegistry, errorHandler); /// The port that the server is listening on, or `null` if the server is not From 504d1dee623d4b0ef595b4d14f514e878cb78ba5 Mon Sep 17 00:00:00 2001 From: Ben Getsug Date: Wed, 30 Nov 2022 09:45:25 -0600 Subject: [PATCH 7/7] added typedef for server error handler --- lib/src/server/handler.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 408317e2..c1dd4064 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -30,7 +30,7 @@ import 'interceptor.dart'; import 'service.dart'; typedef ServiceLookup = Service? Function(String service); -typedef GrpcErrorHandler = void Function(GrpcError error, [StackTrace? trace]); +typedef GrpcErrorHandler = void Function(GrpcError error, StackTrace? trace); /// Handles an incoming gRPC call. class ServerHandler extends ServiceCall {