From f7c0c1b57841e47f0cb48d7851c9a571330c2787 Mon Sep 17 00:00:00 2001 From: Tyler-Larkin Date: Fri, 18 Oct 2024 15:49:49 -0700 Subject: [PATCH] fix(api): Reconnect WebSocket when resuming app from a paused state (#5567) * fix(api): Reconnect WebSocket when resuming app from a paused state --- packages/api/amplify_api/lib/amplify_api.dart | 6 +- .../amplify_api/lib/src/api_plugin_impl.dart | 2 + .../lib/src/flutter_life_cycle.dart | 42 ++++ .../lib/amplify_api_dart.dart | 1 + .../lib/src/api_plugin_impl.dart | 9 +- .../web_socket/blocs/web_socket_bloc.dart | 56 ++++++ .../web_socket/types/process_life_cycle.dart | 33 +++ .../web_socket/types/web_socket_event.dart | 13 ++ .../amplify_api_dart/test/graphql_test.dart | 4 + packages/api/amplify_api_dart/test/util.dart | 11 + .../test/web_socket/web_socket_bloc_test.dart | 190 ++++++++++++++++++ 11 files changed, 365 insertions(+), 2 deletions(-) create mode 100644 packages/api/amplify_api/lib/src/flutter_life_cycle.dart create mode 100644 packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/process_life_cycle.dart diff --git a/packages/api/amplify_api/lib/amplify_api.dart b/packages/api/amplify_api/lib/amplify_api.dart index d27a2141f7..c7dbfa3f0a 100644 --- a/packages/api/amplify_api/lib/amplify_api.dart +++ b/packages/api/amplify_api/lib/amplify_api.dart @@ -5,4 +5,8 @@ library amplify_api; export 'package:amplify_api/src/api_plugin_impl.dart'; export 'package:amplify_api_dart/amplify_api_dart.dart' - hide AmplifyAPIDart, ConnectivityPlatform, ConnectivityStatus; + hide + AmplifyAPIDart, + ConnectivityPlatform, + ProcessLifeCycle, + ConnectivityStatus; diff --git a/packages/api/amplify_api/lib/src/api_plugin_impl.dart b/packages/api/amplify_api/lib/src/api_plugin_impl.dart index abd05fe314..dd23b97739 100644 --- a/packages/api/amplify_api/lib/src/api_plugin_impl.dart +++ b/packages/api/amplify_api/lib/src/api_plugin_impl.dart @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import 'package:amplify_api/src/connectivity_plus_platform.dart'; +import 'package:amplify_api/src/flutter_life_cycle.dart'; import 'package:amplify_api_dart/amplify_api_dart.dart'; import 'package:amplify_core/amplify_core.dart'; @@ -14,6 +15,7 @@ class AmplifyAPI extends AmplifyAPIDart with AWSDebuggable { super.options, }) : super( connectivity: const ConnectivityPlusPlatform(), + processLifeCycle: FlutterLifeCycle(), ); @override diff --git a/packages/api/amplify_api/lib/src/flutter_life_cycle.dart b/packages/api/amplify_api/lib/src/flutter_life_cycle.dart new file mode 100644 index 0000000000..92405745eb --- /dev/null +++ b/packages/api/amplify_api/lib/src/flutter_life_cycle.dart @@ -0,0 +1,42 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import 'dart:async'; + +import 'package:amplify_api_dart/amplify_api_dart.dart'; +import 'package:flutter/widgets.dart'; +import 'package:meta/meta.dart'; + +/// {@template amplify_api.flutter_life_cycle} +/// Creates a stream of [ProcessStatus] mapped from [AppLifecycleListener](https://api.flutter.dev/flutter/widgets/AppLifecycleListener-class.html). +/// {@endtemplate} +@internal +class FlutterLifeCycle extends ProcessLifeCycle { + /// {@macro amplify_api.flutter_life_cycle} + FlutterLifeCycle() { + AppLifecycleListener( + onStateChange: _onStateChange, + ); + } + + final _stateController = + StreamController.broadcast(sync: true); + + @override + Stream get onStateChanged => _stateController.stream; + + void _onStateChange(AppLifecycleState state) { + switch (state) { + case AppLifecycleState.detached: + _stateController.add(ProcessStatus.detached); + case AppLifecycleState.paused: + _stateController.add(ProcessStatus.paused); + case AppLifecycleState.hidden: + _stateController.add(ProcessStatus.hidden); + case AppLifecycleState.inactive: + _stateController.add(ProcessStatus.inactive); + case AppLifecycleState.resumed: + _stateController.add(ProcessStatus.resumed); + } + } +} diff --git a/packages/api/amplify_api_dart/lib/amplify_api_dart.dart b/packages/api/amplify_api_dart/lib/amplify_api_dart.dart index e25c15fb25..d689d1ccde 100644 --- a/packages/api/amplify_api_dart/lib/amplify_api_dart.dart +++ b/packages/api/amplify_api_dart/lib/amplify_api_dart.dart @@ -19,3 +19,4 @@ export 'src/graphql/model_helpers/model_subscriptions.dart'; /// Network connectivity util not needed by consumers of Flutter package amplify_api. export 'src/graphql/web_socket/types/connectivity_platform.dart'; +export 'src/graphql/web_socket/types/process_life_cycle.dart'; diff --git a/packages/api/amplify_api_dart/lib/src/api_plugin_impl.dart b/packages/api/amplify_api_dart/lib/src/api_plugin_impl.dart index 1cda540419..c4248bbc45 100644 --- a/packages/api/amplify_api_dart/lib/src/api_plugin_impl.dart +++ b/packages/api/amplify_api_dart/lib/src/api_plugin_impl.dart @@ -11,6 +11,7 @@ import 'package:amplify_api_dart/src/graphql/web_socket/blocs/web_socket_bloc.da import 'package:amplify_api_dart/src/graphql/web_socket/services/web_socket_service.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart'; +import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart'; import 'package:amplify_api_dart/src/util/amplify_api_config.dart'; import 'package:amplify_api_dart/src/util/amplify_authorization_rest_client.dart'; import 'package:amplify_core/amplify_core.dart'; @@ -30,8 +31,10 @@ class AmplifyAPIDart extends APIPluginInterface with AWSDebuggable { AmplifyAPIDart({ APIPluginOptions options = const APIPluginOptions(), ConnectivityPlatform connectivity = const ConnectivityPlatform(), + ProcessLifeCycle processLifeCycle = const ProcessLifeCycle(), }) : _options = options, - _connectivity = connectivity { + _connectivity = connectivity, + _processLifeCycle = processLifeCycle { _options.authProviders.forEach(registerAuthProvider); } @@ -43,6 +46,9 @@ class AmplifyAPIDart extends APIPluginInterface with AWSDebuggable { /// Creates a stream representing network connectivity at the hardware level. final ConnectivityPlatform _connectivity; + /// Creates a stream representing the process life cycle state. + final ProcessLifeCycle _processLifeCycle; + /// A map of the keys from the Amplify API config with auth modes to HTTP clients /// to use for requests to that endpoint/auth mode. e.g. { "myEndpoint.AWS_IAM": AWSHttpClient} final Map _clientPool = {}; @@ -277,6 +283,7 @@ class AmplifyAPIDart extends APIPluginInterface with AWSDebuggable { wsService: AmplifyWebSocketService(), subscriptionOptions: _options.subscriptionOptions, connectivity: _connectivity, + processLifeCycle: _processLifeCycle, ); } diff --git a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart index 0903db904b..b3a3d90089 100644 --- a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart +++ b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart @@ -9,6 +9,7 @@ import 'package:amplify_api_dart/src/graphql/web_socket/services/web_socket_serv import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/state/ws_subscriptions_state.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart'; +import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/subscriptions_event.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/web_socket_types.dart'; import 'package:amplify_core/amplify_core.dart' hide SubscriptionEvent; @@ -33,8 +34,10 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { required WebSocketService wsService, required GraphQLSubscriptionOptions subscriptionOptions, required ConnectivityPlatform connectivity, + required ProcessLifeCycle processLifeCycle, AWSHttpClient? pollClientOverride, }) : _connectivity = connectivity, + _processLifeCycle = processLifeCycle, _pollClient = pollClientOverride ?? AWSHttpClient() { final subBlocs = >{}; @@ -49,6 +52,7 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { ); final blocStream = _wsEventStream.asyncExpand(_eventTransformer); _networkSubscription = _getConnectivityStream(); + _processLifeCycleSubscription = _getProcessLifecycleStream(); _stateSubscription = blocStream.listen(_emit); add(const InitEvent()); } @@ -81,10 +85,14 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { late final Stream _wsEventStream = _wsEventController.stream; late final StreamSubscription _stateSubscription; late final StreamSubscription _networkSubscription; + late final StreamSubscription _processLifeCycleSubscription; /// Creates a stream representing network connectivity at the hardware level. final ConnectivityPlatform _connectivity; + /// Creates a stream representing the process life cycle state. + final ProcessLifeCycle _processLifeCycle; + /// The underlying event stream, used only in testing. @visibleForTesting Stream get wsEventStream => _wsEventStream; @@ -164,6 +172,8 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { yield* _networkLoss(); } else if (event is NetworkFoundEvent) { yield* _networkFound(); + } else if (event is ProcessResumeEvent) { + yield* _processResumed(); } else if (event is PollSuccessEvent) { yield* _pollSuccess(); } else if (event is PollFailedEvent) { @@ -328,6 +338,16 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { yield* const Stream.empty(); } + Stream _processResumed() async* { + final state = _currentState; + if (state is ConnectedState) { + yield state.reconnecting(networkState: NetworkState.disconnected); + add(const ReconnectEvent()); + } + // TODO(dnys1): Yield broken on web debug build. + yield* const Stream.empty(); + } + /// Handle successful polls Stream _pollSuccess() async* { // TODO(dnys1): Yield broken on web debug build. @@ -467,6 +487,7 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { await Future.wait([ // TODO(equartey): https://github.com/fluttercommunity/plus_plugins/issues/1382 if (!isWindows()) _networkSubscription.cancel(), + _processLifeCycleSubscription.cancel(), Future.value(_pollClient.close()), _stateSubscription.cancel(), _wsEventController.close(), @@ -507,6 +528,41 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { ); } + /// Process life cycle stream monitors when the process resumes from a paused state. + StreamSubscription _getProcessLifecycleStream() { + var prev = ProcessStatus.detached; + return _processLifeCycle.onStateChanged.listen( + (state) { + if (_isResuming(state, prev)) { + // ignore: invalid_use_of_internal_member + if (!WebSocketOptions.autoReconnect) { + _shutdownWithException( + const NetworkException( + 'Unable to recover network connection, web socket will close.', + recoverySuggestion: 'Avoid pausing the process.', + ), + StackTrace.current, + ); + } else { + add(const ProcessResumeEvent()); + } + } + + prev = state; + }, + onError: (Object e, StackTrace st) => + logger.error('Error in process life cycle stream $e, $st'), + ); + } + + bool _isResuming(ProcessStatus current, ProcessStatus previous) { + if (previous != ProcessStatus.paused) return false; + + return current == ProcessStatus.hidden || + current == ProcessStatus.inactive || + current == ProcessStatus.resumed; + } + Future _poll() async { try { final res = await _sendPollRequest(); diff --git a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/process_life_cycle.dart b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/process_life_cycle.dart new file mode 100644 index 0000000000..ca9ed6e085 --- /dev/null +++ b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/process_life_cycle.dart @@ -0,0 +1,33 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/// Possible process life cycle states +enum ProcessStatus { + /// Engine is running without a view. + detached, + + /// Application is not visible to the user or responding to user input. + paused, + + /// All views of an application are hidden. + hidden, + + /// A view of the application is visible, but none have input. + inactive, + + /// Default running mode. + resumed, +} + +/// {@template amplify_api_dart.process_life_cycle} +/// Used to create a stream representing the process life cycle state. +/// +/// The generated stream is empty. +/// {@endtemplate} +class ProcessLifeCycle { + /// {@macro amplify_api_dart.process_life_cycle} + const ProcessLifeCycle(); + + /// Generates a new stream of [ProcessStatus]. + Stream get onStateChanged => const Stream.empty(); +} diff --git a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/web_socket_event.dart b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/web_socket_event.dart index 907cd6f28d..a4f5d31b36 100644 --- a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/web_socket_event.dart +++ b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/types/web_socket_event.dart @@ -88,6 +88,19 @@ class NetworkLossEvent extends NetworkEvent { String get runtimeTypeName => 'NetworkLossEvent'; } +/// Discrete class for when the process is resumed +/// Triggers when AppLifecycleListener detects the process has been resumed. +class ProcessResumeEvent extends WebSocketEvent { + /// Create a process resumed event + const ProcessResumeEvent(); + + @override + String get runtimeTypeName => 'ProcessResumeEvent'; + + @override + Map toJson() => const {}; +} + /// Triggers when a successful ping to AppSync is made class PollSuccessEvent extends WebSocketEvent { /// Create a successful Poll event diff --git a/packages/api/amplify_api_dart/test/graphql_test.dart b/packages/api/amplify_api_dart/test/graphql_test.dart index c1399a0025..c2dd679594 100644 --- a/packages/api/amplify_api_dart/test/graphql_test.dart +++ b/packages/api/amplify_api_dart/test/graphql_test.dart @@ -280,6 +280,7 @@ void main() { 'payload': {'data': mockSubscriptionData}, }; + mockProcessLifeCycleController = StreamController(); mockWebSocketService = MockWebSocketService(); const subscriptionOptions = GraphQLSubscriptionOptions( pollInterval: Duration(seconds: 1), @@ -292,6 +293,7 @@ void main() { subscriptionOptions: subscriptionOptions, pollClientOverride: mockClient.client, connectivity: const ConnectivityPlatform(), + processLifeCycle: const MockProcessLifeCycle(), ); sendMockConnectionAck(mockWebSocketBloc!, mockWebSocketService!); @@ -599,6 +601,7 @@ void main() { }); test('should have correct state flow during a failure', () async { + mockProcessLifeCycleController = StreamController(); mockWebSocketService = MockWebSocketService(); const subscriptionOptions = GraphQLSubscriptionOptions( pollInterval: Duration(seconds: 1), @@ -613,6 +616,7 @@ void main() { subscriptionOptions: subscriptionOptions, pollClientOverride: mockClient.client, connectivity: const ConnectivityPlatform(), + processLifeCycle: const MockProcessLifeCycle(), ); final blocReady = Completer(); diff --git a/packages/api/amplify_api_dart/test/util.dart b/packages/api/amplify_api_dart/test/util.dart index f4d7e9e390..0d65f7ee93 100644 --- a/packages/api/amplify_api_dart/test/util.dart +++ b/packages/api/amplify_api_dart/test/util.dart @@ -10,6 +10,7 @@ import 'package:amplify_api_dart/src/graphql/web_socket/blocs/web_socket_bloc.da import 'package:amplify_api_dart/src/graphql/web_socket/services/web_socket_service.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart'; +import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/web_socket_types.dart'; import 'package:amplify_core/amplify_core.dart'; import 'package:amplify_core/src/config/amplify_outputs/data/data_outputs.dart'; @@ -329,6 +330,16 @@ class MockConnectivity extends ConnectivityPlatform { mockNetworkStreamController.stream; } +late StreamController mockProcessLifeCycleController; + +class MockProcessLifeCycle extends ProcessLifeCycle { + const MockProcessLifeCycle(); + + @override + Stream get onStateChanged => + mockProcessLifeCycleController.stream; +} + /// Ensures a query predicate converts to JSON correctly. void testQueryPredicateTranslation( QueryPredicate? queryPredicate, diff --git a/packages/api/amplify_api_dart/test/web_socket/web_socket_bloc_test.dart b/packages/api/amplify_api_dart/test/web_socket/web_socket_bloc_test.dart index cf4ca8f769..297970268f 100644 --- a/packages/api/amplify_api_dart/test/web_socket/web_socket_bloc_test.dart +++ b/packages/api/amplify_api_dart/test/web_socket/web_socket_bloc_test.dart @@ -7,6 +7,7 @@ import 'dart:convert'; import 'package:amplify_api_dart/src/graphql/web_socket/blocs/web_socket_bloc.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart'; +import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart'; import 'package:amplify_api_dart/src/graphql/web_socket/types/web_socket_types.dart'; import 'package:amplify_core/amplify_core.dart'; import 'package:test/test.dart'; @@ -54,6 +55,7 @@ void main() { if (!noConnectivity) { mockNetworkStreamController = StreamController(); } + mockProcessLifeCycleController = StreamController(); mockPollClient = MockPollClient(); service = MockWebSocketService(); @@ -66,6 +68,7 @@ void main() { connectivity: noConnectivity ? const ConnectivityPlatform() : const MockConnectivity(), + processLifeCycle: const MockProcessLifeCycle(), ); sendMockConnectionAck(bloc!, service!); @@ -307,6 +310,149 @@ void main() { mockPollClient.induceTimeout = false; }); + test('should reconnect when process resumes', () async { + var dataCompleter = Completer(); + final subscribeEvent = SubscribeEvent( + subscriptionRequest, + () { + service!.channel.sink.add(mockDataString); + }, + ); + + final bloc = getWebSocketBloc(); + + expect( + bloc.stream, + emitsInOrder( + [ + isA(), + isA(), + isA(), + isA(), + isA(), + isA(), + ], + ), + ); + + bloc.subscribe(subscribeEvent).listen( + expectAsync1( + (event) { + expect(event.data, json.encode(mockSubscriptionData)); + dataCompleter.complete(event.data); + }, + count: 2, + ), + ); + + await dataCompleter.future; + dataCompleter = Completer(); + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + + await expectLater(bloc.stream, emitsThrough(isA())); + + service!.channel.sink.add(mockDataString); + await dataCompleter.future; + }); + + test('should throttle reconnect after repeatedly resuming', () async { + final blocReady = Completer(); + final subscribeEvent = SubscribeEvent( + subscriptionRequest, + blocReady.complete, + ); + + final bloc = getWebSocketBloc(); + + expect( + bloc.stream, + emitsInOrder( + [ + isA(), + isA(), + isA(), + isA(), + isA(), + isA(), + ], + ), + reason: + 'Bloc should debounce multiple reconnection triggers while resuming.', + ); + + bloc.subscribe( + subscribeEvent, + ); + + await blocReady.future; + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed) + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed) + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed) + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed) + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + }); + + test('should reconnect multiple times after resuming', () async { + final blocReady = Completer(); + final subscribeEvent = SubscribeEvent( + subscriptionRequest, + blocReady.complete, + ); + + final bloc = getWebSocketBloc() + ..subscribe( + subscribeEvent, + ); + + await blocReady.future; + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + + await expectLater(bloc.stream, emitsThrough(isA())); + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + + await expectLater(bloc.stream, emitsThrough(isA())); + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + + await expectLater(bloc.stream, emitsThrough(isA())); + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + + await expectLater(bloc.stream, emitsThrough(isA())); + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + + await expectLater(bloc.stream, emitsThrough(isA())); + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + + await expectLater(bloc.stream, emitsThrough(isA())); + }); + test( 'subscribe() ignores a WebSocket message that comes while the bloc is disconnected', () async { @@ -348,6 +494,7 @@ void main() { final badService = MockWebSocketService(badInit: true); mockNetworkStreamController = StreamController(); + mockProcessLifeCycleController = StreamController(); final bloc = WebSocketBloc( config: testApiKeyConfig, authProviderRepo: getTestAuthProviderRepo(), @@ -355,6 +502,7 @@ void main() { subscriptionOptions: subscriptionOptions, pollClientOverride: mockPollClient.client, connectivity: const MockConnectivity(), + processLifeCycle: const MockProcessLifeCycle(), ); expect( @@ -491,6 +639,48 @@ void main() { await bloc.done.future; }); + + test('a process resumes with autoReconnect disabled', () async { + final blocReady = Completer(); + final subscribeEvent = SubscribeEvent( + subscriptionRequest, + blocReady.complete, + ); + final bloc = getWebSocketBloc(); + + expect( + bloc.stream, + emitsInOrder( + [ + isA(), + isA(), + isA(), + isA(), + isA(), + isA(), + ], + ), + ); + + // ignore: invalid_use_of_internal_member + WebSocketOptions.autoReconnect = false; + + bloc.subscribe(subscribeEvent).listen( + null, + onError: expectAsync1((event) { + expect( + event, + isA(), + ); + }), + ); + + await blocReady.future; + + mockProcessLifeCycleController + ..add(ProcessStatus.paused) + ..add(ProcessStatus.resumed); + }); }); }); }