Skip to content

Commit

Permalink
feat(api): .subscribe() for GraphQL (#1915)
Browse files Browse the repository at this point in the history
Co-authored-by: Elijah Quartey <Equartey@users.noreply.github.com>
  • Loading branch information
Travis Sheppard and Equartey committed Sep 9, 2022
1 parent f6c1c1b commit 2e05c6f
Show file tree
Hide file tree
Showing 12 changed files with 1,275 additions and 127 deletions.
234 changes: 120 additions & 114 deletions packages/api/amplify_api/example/integration_test/graphql_tests.dart
Original file line number Diff line number Diff line change
Expand Up @@ -529,136 +529,142 @@ void main() {
});
});

group('subscriptions', () {
// Some local helper methods to help with establishing subscriptions and such.

// Wait for subscription established for given request.
Future<StreamSubscription<GraphQLResponse<T>>>
_getEstablishedSubscriptionOperation<T>(
GraphQLRequest<T> subscriptionRequest,
void Function(GraphQLResponse<T>) onData) async {
Completer<void> establishedCompleter = Completer();
final stream =
Amplify.API.subscribe<T>(subscriptionRequest, onEstablished: () {
establishedCompleter.complete();
});
final subscription = stream.listen(
onData,
onError: (Object e) => fail('Error in subscription stream: $e'),
);

await establishedCompleter.future
.timeout(const Duration(seconds: _subscriptionTimeoutInterval));
return subscription;
}
group(
'subscriptions',
() {
// Some local helper methods to help with establishing subscriptions and such.

// Wait for subscription established for given request.
Future<StreamSubscription<GraphQLResponse<T>>>
_getEstablishedSubscriptionOperation<T>(
GraphQLRequest<T> subscriptionRequest,
void Function(GraphQLResponse<T>) onData) async {
Completer<void> establishedCompleter = Completer();
final stream =
Amplify.API.subscribe<T>(subscriptionRequest, onEstablished: () {
establishedCompleter.complete();
});
final subscription = stream.listen(
onData,
onError: (Object e) => fail('Error in subscription stream: $e'),
);

await establishedCompleter.future
.timeout(const Duration(seconds: _subscriptionTimeoutInterval));
return subscription;
}

// Establish subscription for request, do the mutationFunction, then wait
// for the stream event, cancel the operation, return response from event.
Future<GraphQLResponse<T?>> _establishSubscriptionAndMutate<T>(
GraphQLRequest<T> subscriptionRequest,
Future<void> Function() mutationFunction) async {
Completer<GraphQLResponse<T?>> dataCompleter = Completer();
// With stream established, exec callback with stream events.
final subscription = await _getEstablishedSubscriptionOperation<T>(
subscriptionRequest, (event) {
if (event.hasErrors) {
fail('subscription errors: ${event.errors}');
}
dataCompleter.complete(event);
});
await mutationFunction();
final response = await dataCompleter.future
.timeout((const Duration(seconds: _subscriptionTimeoutInterval)));
// Establish subscription for request, do the mutationFunction, then wait
// for the stream event, cancel the operation, return response from event.
Future<GraphQLResponse<T?>> _establishSubscriptionAndMutate<T>(
GraphQLRequest<T> subscriptionRequest,
Future<void> Function() mutationFunction) async {
Completer<GraphQLResponse<T?>> dataCompleter = Completer();
// With stream established, exec callback with stream events.
final subscription = await _getEstablishedSubscriptionOperation<T>(
subscriptionRequest, (event) {
if (event.hasErrors) {
fail('subscription errors: ${event.errors}');
}
dataCompleter.complete(event);
});
await mutationFunction();
final response = await dataCompleter.future
.timeout((const Duration(seconds: _subscriptionTimeoutInterval)));

await subscription.cancel();
return response;
}

await subscription.cancel();
return response;
}
testWidgets(
'should emit event when onCreate subscription made with model helper',
(WidgetTester tester) async {
String name =
'Integration Test Blog - subscription create ${UUID.getUUID()}';
final subscriptionRequest =
ModelSubscriptions.onCreate(Blog.classType);

testWidgets(
'should emit event when onCreate subscription made with model helper',
(WidgetTester tester) async {
String name =
'Integration Test Blog - subscription create ${UUID.getUUID()}';
final subscriptionRequest = ModelSubscriptions.onCreate(Blog.classType);
final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest, () => addBlog(name));
Blog? blogFromEvent = eventResponse.data;

final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest, () => addBlog(name));
Blog? blogFromEvent = eventResponse.data;
expect(blogFromEvent?.name, equals(name));
});

expect(blogFromEvent?.name, equals(name));
});
testWidgets(
'should emit event when onUpdate subscription made with model helper',
(WidgetTester tester) async {
const originalName = 'Integration Test Blog - subscription update';
final updatedName =
'Integration Test Blog - subscription update, name now ${UUID.getUUID()}';
Blog blogToUpdate = await addBlog(originalName);

final subscriptionRequest =
ModelSubscriptions.onUpdate(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
blogToUpdate = blogToUpdate.copyWith(name: updatedName);
final updateReq = ModelMutations.update(blogToUpdate);
return Amplify.API.mutate(request: updateReq).response;
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(updatedName));
});

testWidgets(
'should emit event when onUpdate subscription made with model helper',
(WidgetTester tester) async {
const originalName = 'Integration Test Blog - subscription update';
final updatedName =
'Integration Test Blog - subscription update, name now ${UUID.getUUID()}';
Blog blogToUpdate = await addBlog(originalName);

final subscriptionRequest = ModelSubscriptions.onUpdate(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
blogToUpdate = blogToUpdate.copyWith(name: updatedName);
final updateReq = ModelMutations.update(blogToUpdate);
return Amplify.API.mutate(request: updateReq).response;
testWidgets(
'should emit event when onDelete subscription made with model helper',
(WidgetTester tester) async {
const name = 'Integration Test Blog - subscription delete';
Blog blogToDelete = await addBlog(name);

final subscriptionRequest =
ModelSubscriptions.onDelete(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
final deleteReq =
ModelMutations.deleteById(Blog.classType, blogToDelete.id);
return Amplify.API.mutate(request: deleteReq).response;
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(name));
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(updatedName));
});
testWidgets('should cancel subscription', (WidgetTester tester) async {
const name = 'Integration Test Blog - subscription to cancel';
Blog blogToDelete = await addBlog(name);

testWidgets(
'should emit event when onDelete subscription made with model helper',
(WidgetTester tester) async {
const name = 'Integration Test Blog - subscription delete';
Blog blogToDelete = await addBlog(name);
final subReq = ModelSubscriptions.onDelete(Blog.classType);
final subscription =
await _getEstablishedSubscriptionOperation<Blog>(subReq, (_) {
fail('Subscription event triggered. Should be canceled.');
});
await subscription.cancel();

final subscriptionRequest = ModelSubscriptions.onDelete(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
// delete the blog, wait for update
final deleteReq =
ModelMutations.deleteById(Blog.classType, blogToDelete.id);
return Amplify.API.mutate(request: deleteReq).response;
await Amplify.API.mutate(request: deleteReq).response;
await Future<dynamic>.delayed(const Duration(seconds: 5));
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(name));
});
testWidgets(
'should emit event when onCreate subscription made with model helper for post (model with parent).',
(WidgetTester tester) async {
String title =
'Integration Test post - subscription create ${UUID.getUUID()}';
final subscriptionRequest =
ModelSubscriptions.onCreate(Post.classType);

testWidgets('should cancel subscription', (WidgetTester tester) async {
const name = 'Integration Test Blog - subscription to cancel';
Blog blogToDelete = await addBlog(name);
final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest,
() => addPostAndBlogWithModelHelper(title, 0));
Post? postFromEvent = eventResponse.data;

final subReq = ModelSubscriptions.onDelete(Blog.classType);
final subscription =
await _getEstablishedSubscriptionOperation<Blog>(subReq, (_) {
fail('Subscription event triggered. Should be canceled.');
expect(postFromEvent?.title, equals(title));
});
await subscription.cancel();

// delete the blog, wait for update
final deleteReq =
ModelMutations.deleteById(Blog.classType, blogToDelete.id);
await Amplify.API.mutate(request: deleteReq).response;
await Future<dynamic>.delayed(const Duration(seconds: 5));
});

testWidgets(
'should emit event when onCreate subscription made with model helper for post (model with parent).',
(WidgetTester tester) async {
String title =
'Integration Test post - subscription create ${UUID.getUUID()}';
final subscriptionRequest = ModelSubscriptions.onCreate(Post.classType);

final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest, () => addPostAndBlogWithModelHelper(title, 0));
Post? postFromEvent = eventResponse.data;

expect(postFromEvent?.title, equals(title));
});
},
skip:
'TODO(ragingsquirrel3): re-enable tests once subscriptions are implemented.');
},
);
});
}
20 changes: 13 additions & 7 deletions packages/api/amplify_api/example/lib/graphql_api_view.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ class _GraphQLApiViewState extends State<GraphQLApiView> {
onEstablished: () => print('Subscription established'),
);

try {
await for (var event in operation) {
print('Subscription event data received: ${event.data}');
}
} on Exception catch (e) {
print('Error in subscription stream: $e');
}
final streamSubscription = operation.listen(
(event) {
final result = 'Subscription event data received: ${event.data}';
print(result);
setState(() {
_result = result;
});
},
onError: (Object error) => print(
'Error in GraphQL subscription: $error',
),
);
_unsubscribe = streamSubscription.cancel;
}

Future<void> query() async {
Expand Down
33 changes: 33 additions & 0 deletions packages/api/amplify_api/lib/src/api_plugin_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ library amplify_api;
import 'dart:io';

import 'package:amplify_api/amplify_api.dart';
import 'package:amplify_api/src/graphql/ws/web_socket_connection.dart';
import 'package:amplify_api/src/native_api_plugin.dart';
import 'package:amplify_core/amplify_core.dart';
import 'package:async/async.dart';
Expand All @@ -37,11 +38,16 @@ class AmplifyAPIDart extends AmplifyAPI {
late final AWSApiPluginConfig _apiConfig;
final http.Client? _baseHttpClient;
late final AmplifyAuthProviderRepository _authProviderRepo;
final _logger = AmplifyLogger.category(Category.api);

/// A map of the keys from the Amplify API config to HTTP clients to use for
/// requests to that endpoint.
final Map<String, http.Client> _clientPool = {};

/// A map of the keys from the Amplify API config websocket connections to use
/// for that endpoint.
final Map<String, WebSocketConnection> _webSocketConnectionPool = {};

/// The registered [APIAuthProvider] instances.
final Map<APIAuthorizationType, APIAuthProvider> _authProviders = {};

Expand Down Expand Up @@ -123,6 +129,24 @@ class AmplifyAPIDart extends AmplifyAPI {
));
}

/// Returns the websocket connection to use for a given endpoint.
///
/// Use [apiName] if there are multiple endpoints.
@visibleForTesting
WebSocketConnection getWebSocketConnection({String? apiName}) {
final endpoint = _apiConfig.getEndpoint(
type: EndpointType.graphQL,
apiName: apiName,
);
return _webSocketConnectionPool[endpoint.name] ??= WebSocketConnection(
endpoint.config,
_authProviderRepo,
logger: _logger.createChild(
'webSocketConnection${endpoint.name}',
),
);
}

Uri _getGraphQLUri(String? apiName) {
final endpoint = _apiConfig.getEndpoint(
type: EndpointType.graphQL,
Expand Down Expand Up @@ -187,6 +211,15 @@ class AmplifyAPIDart extends AmplifyAPI {
return _makeCancelable<GraphQLResponse<T>>(responseFuture);
}

@override
Stream<GraphQLResponse<T>> subscribe<T>(
GraphQLRequest<T> request, {
void Function()? onEstablished,
}) {
return getWebSocketConnection(apiName: request.apiName)
.subscribe(request, onEstablished);
}

// ====== REST =======

@override
Expand Down
Loading

0 comments on commit 2e05c6f

Please sign in to comment.