Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): .subscribe() for GraphQL #1915

Merged
merged 37 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a68b67a
chore!(api): migrate API category type definitions (#1640)
Jun 15, 2022
6f4c84f
chore(api): API Native Bridge for .addPlugin() (#1756)
Equartey Jun 23, 2022
7b4e0ca
chore(api): API Pigeon update (#1813)
Equartey Jun 27, 2022
26b23b6
feat(api): REST methods in dart with auth mode none (#1783)
Jun 27, 2022
e3c9cf6
feat!(api): GraphQL API key auth mode (#1858)
Equartey Jul 13, 2022
18e4272
feat!(core,auth): auth providers definition and CognitoIamAuthProvide…
Jul 19, 2022
7a98c5a
feat(core,api): IAM auth mode for HTTP requests (REST and GQL) (#1893)
Jul 21, 2022
5f2783b
feat(api): .subscribe() for GraphQL
Jul 21, 2022
55931aa
change name
Jul 21, 2022
b6ebdab
add some unit tests
Jul 26, 2022
dcaeba1
refactor a little
Jul 26, 2022
5715cb5
some renames
Jul 27, 2022
db28cab
feat(api): GraphQL Custom Request Headers (#1938)
Equartey Jul 29, 2022
c8297a6
feat(auth,api): cognito user pools auth provider & auth mode for API …
Aug 8, 2022
a62dc21
Merge branch 'feat/api-next' into feat/api-next-subscription
Aug 9, 2022
c464e6e
change disconnect
Aug 10, 2022
d93e00c
add logger
Aug 10, 2022
ac5348a
throw error during connection
Aug 11, 2022
2bc9024
expand unit tests
Aug 15, 2022
bdf706f
chore!(api): migrate API category type definitions (#1640)
Jun 15, 2022
ef1b223
chore(api): API Native Bridge for .addPlugin() (#1756)
Equartey Jun 23, 2022
4621a66
chore(api): API Pigeon update (#1813)
Equartey Jun 27, 2022
438c236
feat(api): REST methods in dart with auth mode none (#1783)
Jun 27, 2022
c99ca0c
feat!(api): GraphQL API key auth mode (#1858)
Equartey Jul 13, 2022
98891d7
feat!(core,auth): auth providers definition and CognitoIamAuthProvide…
Jul 19, 2022
d0a254e
feat(core,api): IAM auth mode for HTTP requests (REST and GQL) (#1893)
Jul 21, 2022
c63ddd6
feat(api): GraphQL Custom Request Headers (#1938)
Equartey Jul 29, 2022
64a50f6
feat(auth,api): cognito user pools auth provider & auth mode for API …
Aug 8, 2022
02c51fe
Merge branch 'feat/api-next' into feat/api-next-subscription
Aug 16, 2022
8b125c6
correct error handling
Aug 18, 2022
7300024
fix(auth): correct auth providers imports from rebase (#2042)
Aug 22, 2022
8eebbe4
Merge branch 'feat/api-next' into feat/api-next-subscription
Aug 22, 2022
e1ac0ce
improve test syntax
Aug 22, 2022
b6358fc
change null safety
Aug 22, 2022
a7a9339
Merge branch 'feat/api-next' into feat/api-next-subscription
Aug 23, 2022
d09da9a
undo plist change
Aug 23, 2022
6e384c3
make enum messageType
Aug 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 120 additions & 114 deletions packages/api/amplify_api/example/integration_test/graphql_tests.dart
Original file line number Diff line number Diff line change
Expand Up @@ -529,136 +529,142 @@ void main() {
});
});

group('subscriptions', () {
// Some local helper methods to help with establishing subscriptions and such.

// Wait for subscription established for given request.
Future<StreamSubscription<GraphQLResponse<T>>>
_getEstablishedSubscriptionOperation<T>(
GraphQLRequest<T> subscriptionRequest,
void Function(GraphQLResponse<T>) onData) async {
Completer<void> establishedCompleter = Completer();
final stream =
Amplify.API.subscribe<T>(subscriptionRequest, onEstablished: () {
establishedCompleter.complete();
});
final subscription = stream.listen(
onData,
onError: (Object e) => fail('Error in subscription stream: $e'),
);

await establishedCompleter.future
.timeout(const Duration(seconds: _subscriptionTimeoutInterval));
return subscription;
}
group(
'subscriptions',
() {
// Some local helper methods to help with establishing subscriptions and such.

// Wait for subscription established for given request.
Future<StreamSubscription<GraphQLResponse<T>>>
_getEstablishedSubscriptionOperation<T>(
GraphQLRequest<T> subscriptionRequest,
void Function(GraphQLResponse<T>) onData) async {
Completer<void> establishedCompleter = Completer();
final stream =
Amplify.API.subscribe<T>(subscriptionRequest, onEstablished: () {
establishedCompleter.complete();
});
final subscription = stream.listen(
onData,
onError: (Object e) => fail('Error in subscription stream: $e'),
);

await establishedCompleter.future
.timeout(const Duration(seconds: _subscriptionTimeoutInterval));
return subscription;
}

// Establish subscription for request, do the mutationFunction, then wait
// for the stream event, cancel the operation, return response from event.
Future<GraphQLResponse<T?>> _establishSubscriptionAndMutate<T>(
GraphQLRequest<T> subscriptionRequest,
Future<void> Function() mutationFunction) async {
Completer<GraphQLResponse<T?>> dataCompleter = Completer();
// With stream established, exec callback with stream events.
final subscription = await _getEstablishedSubscriptionOperation<T>(
subscriptionRequest, (event) {
if (event.hasErrors) {
fail('subscription errors: ${event.errors}');
}
dataCompleter.complete(event);
});
await mutationFunction();
final response = await dataCompleter.future
.timeout((const Duration(seconds: _subscriptionTimeoutInterval)));
// Establish subscription for request, do the mutationFunction, then wait
// for the stream event, cancel the operation, return response from event.
Future<GraphQLResponse<T?>> _establishSubscriptionAndMutate<T>(
GraphQLRequest<T> subscriptionRequest,
Future<void> Function() mutationFunction) async {
Completer<GraphQLResponse<T?>> dataCompleter = Completer();
// With stream established, exec callback with stream events.
final subscription = await _getEstablishedSubscriptionOperation<T>(
subscriptionRequest, (event) {
if (event.hasErrors) {
fail('subscription errors: ${event.errors}');
}
dataCompleter.complete(event);
});
await mutationFunction();
final response = await dataCompleter.future
.timeout((const Duration(seconds: _subscriptionTimeoutInterval)));

await subscription.cancel();
return response;
}

await subscription.cancel();
return response;
}
testWidgets(
'should emit event when onCreate subscription made with model helper',
(WidgetTester tester) async {
String name =
'Integration Test Blog - subscription create ${UUID.getUUID()}';
final subscriptionRequest =
ModelSubscriptions.onCreate(Blog.classType);

testWidgets(
'should emit event when onCreate subscription made with model helper',
(WidgetTester tester) async {
String name =
'Integration Test Blog - subscription create ${UUID.getUUID()}';
final subscriptionRequest = ModelSubscriptions.onCreate(Blog.classType);
final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest, () => addBlog(name));
Blog? blogFromEvent = eventResponse.data;

final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest, () => addBlog(name));
Blog? blogFromEvent = eventResponse.data;
expect(blogFromEvent?.name, equals(name));
});

expect(blogFromEvent?.name, equals(name));
});
testWidgets(
'should emit event when onUpdate subscription made with model helper',
(WidgetTester tester) async {
const originalName = 'Integration Test Blog - subscription update';
final updatedName =
'Integration Test Blog - subscription update, name now ${UUID.getUUID()}';
Blog blogToUpdate = await addBlog(originalName);

final subscriptionRequest =
ModelSubscriptions.onUpdate(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
blogToUpdate = blogToUpdate.copyWith(name: updatedName);
final updateReq = ModelMutations.update(blogToUpdate);
return Amplify.API.mutate(request: updateReq).response;
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(updatedName));
});

testWidgets(
'should emit event when onUpdate subscription made with model helper',
(WidgetTester tester) async {
const originalName = 'Integration Test Blog - subscription update';
final updatedName =
'Integration Test Blog - subscription update, name now ${UUID.getUUID()}';
Blog blogToUpdate = await addBlog(originalName);

final subscriptionRequest = ModelSubscriptions.onUpdate(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
blogToUpdate = blogToUpdate.copyWith(name: updatedName);
final updateReq = ModelMutations.update(blogToUpdate);
return Amplify.API.mutate(request: updateReq).response;
testWidgets(
'should emit event when onDelete subscription made with model helper',
(WidgetTester tester) async {
const name = 'Integration Test Blog - subscription delete';
Blog blogToDelete = await addBlog(name);

final subscriptionRequest =
ModelSubscriptions.onDelete(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
final deleteReq =
ModelMutations.deleteById(Blog.classType, blogToDelete.id);
return Amplify.API.mutate(request: deleteReq).response;
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(name));
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(updatedName));
});
testWidgets('should cancel subscription', (WidgetTester tester) async {
const name = 'Integration Test Blog - subscription to cancel';
Blog blogToDelete = await addBlog(name);

testWidgets(
'should emit event when onDelete subscription made with model helper',
(WidgetTester tester) async {
const name = 'Integration Test Blog - subscription delete';
Blog blogToDelete = await addBlog(name);
final subReq = ModelSubscriptions.onDelete(Blog.classType);
final subscription =
await _getEstablishedSubscriptionOperation<Blog>(subReq, (_) {
fail('Subscription event triggered. Should be canceled.');
});
await subscription.cancel();

final subscriptionRequest = ModelSubscriptions.onDelete(Blog.classType);
final eventResponse =
await _establishSubscriptionAndMutate(subscriptionRequest, () {
// delete the blog, wait for update
final deleteReq =
ModelMutations.deleteById(Blog.classType, blogToDelete.id);
return Amplify.API.mutate(request: deleteReq).response;
await Amplify.API.mutate(request: deleteReq).response;
await Future<dynamic>.delayed(const Duration(seconds: 5));
});
Blog? blogFromEvent = eventResponse.data;

expect(blogFromEvent?.name, equals(name));
});
testWidgets(
'should emit event when onCreate subscription made with model helper for post (model with parent).',
(WidgetTester tester) async {
String title =
'Integration Test post - subscription create ${UUID.getUUID()}';
final subscriptionRequest =
ModelSubscriptions.onCreate(Post.classType);

testWidgets('should cancel subscription', (WidgetTester tester) async {
const name = 'Integration Test Blog - subscription to cancel';
Blog blogToDelete = await addBlog(name);
final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest,
() => addPostAndBlogWithModelHelper(title, 0));
Post? postFromEvent = eventResponse.data;

final subReq = ModelSubscriptions.onDelete(Blog.classType);
final subscription =
await _getEstablishedSubscriptionOperation<Blog>(subReq, (_) {
fail('Subscription event triggered. Should be canceled.');
expect(postFromEvent?.title, equals(title));
});
await subscription.cancel();

// delete the blog, wait for update
final deleteReq =
ModelMutations.deleteById(Blog.classType, blogToDelete.id);
await Amplify.API.mutate(request: deleteReq).response;
await Future<dynamic>.delayed(const Duration(seconds: 5));
});

testWidgets(
'should emit event when onCreate subscription made with model helper for post (model with parent).',
(WidgetTester tester) async {
String title =
'Integration Test post - subscription create ${UUID.getUUID()}';
final subscriptionRequest = ModelSubscriptions.onCreate(Post.classType);

final eventResponse = await _establishSubscriptionAndMutate(
subscriptionRequest, () => addPostAndBlogWithModelHelper(title, 0));
Post? postFromEvent = eventResponse.data;

expect(postFromEvent?.title, equals(title));
});
},
skip:
'TODO(ragingsquirrel3): re-enable tests once subscriptions are implemented.');
},
);
});
}
20 changes: 13 additions & 7 deletions packages/api/amplify_api/example/lib/graphql_api_view.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ class _GraphQLApiViewState extends State<GraphQLApiView> {
onEstablished: () => print('Subscription established'),
);

try {
await for (var event in operation) {
print('Subscription event data received: ${event.data}');
}
} on Exception catch (e) {
print('Error in subscription stream: $e');
}
final streamSubscription = operation.listen(
(event) {
final result = 'Subscription event data received: ${event.data}';
print(result);
setState(() {
ragingsquirrel3 marked this conversation as resolved.
Show resolved Hide resolved
_result = result;
});
},
onError: (Object error) => print(
'Error in GraphQL subscription: $error',
),
);
_unsubscribe = streamSubscription.cancel;
}

Future<void> query() async {
Expand Down
33 changes: 33 additions & 0 deletions packages/api/amplify_api/lib/src/api_plugin_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ library amplify_api;
import 'dart:io';

import 'package:amplify_api/amplify_api.dart';
import 'package:amplify_api/src/graphql/ws/web_socket_connection.dart';
import 'package:amplify_api/src/native_api_plugin.dart';
import 'package:amplify_core/amplify_core.dart';
import 'package:async/async.dart';
Expand All @@ -37,11 +38,16 @@ class AmplifyAPIDart extends AmplifyAPI {
late final AWSApiPluginConfig _apiConfig;
final http.Client? _baseHttpClient;
late final AmplifyAuthProviderRepository _authProviderRepo;
final _logger = AmplifyLogger.category(Category.api);

/// A map of the keys from the Amplify API config to HTTP clients to use for
/// requests to that endpoint.
final Map<String, http.Client> _clientPool = {};

/// A map of the keys from the Amplify API config websocket connections to use
/// for that endpoint.
final Map<String, WebSocketConnection> _webSocketConnectionPool = {};

/// The registered [APIAuthProvider] instances.
final Map<APIAuthorizationType, APIAuthProvider> _authProviders = {};

Expand Down Expand Up @@ -123,6 +129,24 @@ class AmplifyAPIDart extends AmplifyAPI {
));
}

/// Returns the websocket connection to use for a given endpoint.
///
/// Use [apiName] if there are multiple endpoints.
@visibleForTesting
WebSocketConnection getWebSocketConnection({String? apiName}) {
final endpoint = _apiConfig.getEndpoint(
type: EndpointType.graphQL,
apiName: apiName,
);
return _webSocketConnectionPool[endpoint.name] ??= WebSocketConnection(
endpoint.config,
_authProviderRepo,
logger: _logger.createChild(
'webSocketConnection${endpoint.name}',
),
);
}

Uri _getGraphQLUri(String? apiName) {
final endpoint = _apiConfig.getEndpoint(
type: EndpointType.graphQL,
Expand Down Expand Up @@ -187,6 +211,15 @@ class AmplifyAPIDart extends AmplifyAPI {
return _makeCancelable<GraphQLResponse<T>>(responseFuture);
}

@override
Stream<GraphQLResponse<T>> subscribe<T>(
GraphQLRequest<T> request, {
void Function()? onEstablished,
}) {
return getWebSocketConnection(apiName: request.apiName)
.subscribe(request, onEstablished);
}

// ====== REST =======

@override
Expand Down
Loading