diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f97d840a7..b31de514b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -673,24 +673,23 @@ jobs: slack-on-failure: name: Report failure in main branch - needs: + needs: - cleanup-dart-matrix - cleanup-flutter-matrix runs-on: ubuntu-latest - if: always() + if: always() && github.ref == 'refs/heads/main' steps: # Run this action to set env.WORKFLOW_CONCLUSION - uses: technote-space/workflow-conclusion-action@45ce8e0eb155657ab8ccf346ade734257fd196a5 - uses: act10ns/slack@ed1309ab9862e57e9e583e51c7889486b9a00b0f - if: ${{ github.ref == 'refs/heads/main' && env.WORKFLOW_CONCLUSION == 'FAILURE' }} + if: ${{ github.ref == 'refs/heads/main' && env.WORKFLOW_CONCLUSION == 'FAILURE' }} # Statuses: neutral, success, skipped, cancelled, timed_out, action_required, failure with: - status: ${{ env.WORKFLOW_CONCLUSION }} + status: ${{ env.WORKFLOW_CONCLUSION }} webhook-url: ${{ secrets.SLACK_DART_WEBHOOK }} channel: '#realm-github-dart' message: | ** <{{refUrl}}|`{{ref}}` - {{description}}> {{#if description}}<{{diffUrl}}|branch: `{{diffRef}}`>{{/if}} - \ No newline at end of file diff --git a/.github/workflows/dart-desktop-tests.yml b/.github/workflows/dart-desktop-tests.yml index b3082b716..72a7fe3a2 100644 --- a/.github/workflows/dart-desktop-tests.yml +++ b/.github/workflows/dart-desktop-tests.yml @@ -58,12 +58,16 @@ jobs: - name : Setup Dart SDK uses: dart-lang/setup-dart@main with: - sdk: ${{ contains(inputs.os, 'linux') && '3.0.6' || 'stable'}} + sdk: 'stable' architecture: ${{ inputs.architecture == 'arm' && 'arm64' || 'x64'}} - name: Install dependencies run: dart pub get + - name: Bump ulimit + run: ulimit -n 10240 + if: ${{ contains(inputs.os, 'macos') }} + # This will be a no-op under normal circumstances since the cluster would have been deployed # in deploy-cluster. It is needed in case we want to re-run the job after the cluster has been reaped. - name: Create cluster diff --git a/.github/workflows/flutter-desktop-tests.yml b/.github/workflows/flutter-desktop-tests.yml index 0650c5ccf..8f7568f60 100644 --- a/.github/workflows/flutter-desktop-tests.yml +++ b/.github/workflows/flutter-desktop-tests.yml @@ -74,6 +74,10 @@ jobs: - name: Install dependencies run: dart pub get + - name: Bump ulimit + run: ulimit -n 10240 + if: ${{ contains(inputs.os, 'macos') }} + # This will be a no-op under normal circumstances since the cluster would have been deployed # in deploy-cluster. It is needed in case we want to re-run the job after the cluster has been reaped. - name: Create cluster diff --git a/CHANGELOG.md b/CHANGELOG.md index e54013b59..7ac28006b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ * Throw an exception if `File::unlock` has failed, in order to inform the SDK that we are likely hitting some limitation on the OS filesystem, instead of crashing the application and use the same file locking logic for all the platforms. (Core upgrade) * Lift a restriction that prevents asymmetric objects from linking to non-embedded objects. ([#1403](https://github.com/realm/realm-dart/issues/1403)) * Add ISRG X1 Root certificate (used by lets-encrypt and hence MongoDB) to `SecurityContext` of the default `HttpClient`. This ensure we work out-of-the-box on older devices (in particular Android 7 and earlier), as well as some Windows machines. ([#1187](https://github.com/realm/realm-dart/issues/1187), [#1370](https://github.com/realm/realm-dart/issues/1370)) +* Added new flexible sync API `RealmResults.subscribe()` and `RealmResults.unsubscribe()` as an easy way to create subscriptions and download data in background. Added named parameter to `MutableSubscriptionSet.clear({bool unnamedOnly = false})` for removing all the unnamed subscriptions. ([#1354](https://github.com/realm/realm-dart/pull/1354)) +* Added `cancellationToken` parameter to `Session.waitForDownload()`, `Session.waitForUpload()` and `SubscriptionSet.waitForSynchronization()`. ([#1354](https://github.com/realm/realm-dart/pull/1354)) ### Fixed * Fixed iteration after `skip` bug ([#1409](https://github.com/realm/realm-dart/issues/1409)) @@ -55,7 +57,6 @@ * Added support for query on `RealmSet`. ([#1346](https://github.com/realm/realm-dart/pull/1346)) * Support for passing `List`, `Set` or `Iterable` arguments to queries with `IN`-operators. ([#1346](https://github.com/realm/realm-dart/pull/1346)) - ### Fixed * Fixed an early unlock race condition during client reset callbacks. ([#1335](https://github.com/realm/realm-dart/pull/1335)) * Rare corruption of files on streaming format (often following compact, convert or copying to a new file). (Core upgrade, since v12.12.0) diff --git a/lib/src/app.dart b/lib/src/app.dart index 69ec3369a..95b359a23 100644 --- a/lib/src/app.dart +++ b/lib/src/app.dart @@ -266,6 +266,11 @@ class AppException extends RealmException { @override String toString() { - return "AppException: $message, link to server logs: $linkToServerLogs"; + var errorString = "AppException: $message, status code: $statusCode"; + if (linkToServerLogs != null) { + errorString += ", link to server logs: $linkToServerLogs"; + } + + return errorString; } } diff --git a/lib/src/native/realm_core.dart b/lib/src/native/realm_core.dart index 4752f99a5..5f02efb8c 100644 --- a/lib/src/native/realm_core.dart +++ b/lib/src/native/realm_core.dart @@ -413,17 +413,21 @@ class _RealmCore { } static void _stateChangeCallback(Object userdata, int state) { - final completer = userdata as Completer; - - completer.complete(SubscriptionSetState.values[state]); + final completer = userdata as CancellableCompleter; + if (!completer.isCancelled) { + completer.complete(SubscriptionSetState.values[state]); + } } - Future waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen) { - final completer = Completer(); - final callback = Pointer.fromFunction(_stateChangeCallback); - final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer); - _realmLib.realm_sync_on_subscription_set_state_change_async(subscriptions.handle._pointer, notifyWhen.index, - _realmLib.addresses.realm_dart_sync_on_subscription_state_changed_callback, userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free); + Future waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen, + [CancellationToken? cancellationToken]) { + final completer = CancellableCompleter(cancellationToken); + if (!completer.isCancelled) { + final callback = Pointer.fromFunction(_stateChangeCallback); + final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer); + _realmLib.realm_sync_on_subscription_set_state_change_async(subscriptions.handle._pointer, notifyWhen.index, + _realmLib.addresses.realm_dart_sync_on_subscription_state_changed_callback, userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free); + } return completer.future; } @@ -666,23 +670,26 @@ class _RealmCore { Future openRealmAsync(RealmAsyncOpenTaskHandle handle, CancellationToken? cancellationToken) { final completer = CancellableCompleter(cancellationToken); - final callback = - Pointer.fromFunction realm, Pointer error)>(_openRealmAsyncCallback); - final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer); - _realmLib.realm_async_open_task_start( - handle._pointer, - _realmLib.addresses.realm_dart_async_open_task_callback, - userData.cast(), - _realmLib.addresses.realm_dart_userdata_async_free, - ); - + if (!completer.isCancelled) { + final callback = + Pointer.fromFunction realm, Pointer error)>(_openRealmAsyncCallback); + final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer); + _realmLib.realm_async_open_task_start( + handle._pointer, + _realmLib.addresses.realm_dart_async_open_task_callback, + userData.cast(), + _realmLib.addresses.realm_dart_userdata_async_free, + ); + } return completer.future; } static void _openRealmAsyncCallback(Object userData, Pointer realmSafePtr, Pointer error) { return using((Arena arena) { - final completer = userData as Completer; - + final completer = userData as CancellableCompleter; + if (completer.isCancelled) { + return; + } if (error != nullptr) { final err = arena(); bool success = _realmLib.realm_get_async_error(error, err); @@ -1755,13 +1762,12 @@ class _RealmCore { await using((arena) async { final response_pointer = arena(); final responseRef = response_pointer.ref; + final method = _HttpMethod.values[requestMethod]; + try { // Build request late HttpClientRequest request; - // this throws if requestMethod is unknown _HttpMethod - final method = _HttpMethod.values[requestMethod]; - switch (method) { case _HttpMethod.delete: request = await client.deleteUrl(url); @@ -1788,8 +1794,16 @@ class _RealmCore { request.add(utf8.encode(body)); } + Realm.logger.log(RealmLogLevel.debug, "HTTP Transport: Executing ${method.name} $url"); + + final stopwatch = Stopwatch()..start(); + // Do the call.. final response = await request.close(); + + stopwatch.stop(); + Realm.logger.log(RealmLogLevel.debug, "HTTP Transport: Executed ${method.name} $url: ${response.statusCode} in ${stopwatch.elapsedMilliseconds} ms"); + final responseBody = await response.fold>([], (acc, l) => acc..addAll(l)); // gather response // Report back to core @@ -1816,11 +1830,14 @@ class _RealmCore { }); responseRef.custom_status_code = _CustomErrorCode.noError.code; - } on SocketException catch (_) { + } on SocketException catch (socketEx) { + Realm.logger.log(RealmLogLevel.warn, "HTTP Transport: SocketException executing ${method.name} $url: $socketEx"); responseRef.custom_status_code = _CustomErrorCode.timeout.code; - } on HttpException catch (_) { + } on HttpException catch (httpEx) { + Realm.logger.log(RealmLogLevel.warn, "HTTP Transport: HttpException executing ${method.name} $url: $httpEx"); responseRef.custom_status_code = _CustomErrorCode.unknownHttp.code; - } catch (_) { + } catch (ex) { + Realm.logger.log(RealmLogLevel.error, "HTTP Transport: Exception executing ${method.name} $url: $ex"); responseRef.custom_status_code = _CustomErrorCode.unknown.code; } finally { _realmLib.realm_http_transport_complete_request(request_context, response_pointer); @@ -2320,12 +2337,14 @@ class _RealmCore { controller.onConnectionStateChange(ConnectionState.values[oldState], ConnectionState.values[newState]); } - Future sessionWaitForUpload(Session session) { - final completer = Completer(); - final callback = Pointer.fromFunction)>(_sessionWaitCompletionCallback); - final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer); - _realmLib.realm_sync_session_wait_for_upload_completion(session.handle._pointer, _realmLib.addresses.realm_dart_sync_wait_for_completion_callback, - userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free); + Future sessionWaitForUpload(Session session, [CancellationToken? cancellationToken]) { + final completer = CancellableCompleter(cancellationToken); + if (!completer.isCancelled) { + final callback = Pointer.fromFunction)>(_sessionWaitCompletionCallback); + final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer); + _realmLib.realm_sync_session_wait_for_upload_completion(session.handle._pointer, _realmLib.addresses.realm_dart_sync_wait_for_completion_callback, + userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free); + } return completer.future; } @@ -2341,8 +2360,8 @@ class _RealmCore { } static void _sessionWaitCompletionCallback(Object userdata, Pointer errorCode) { - final completer = userdata as Completer; - if (completer.isCompleted) { + final completer = userdata as CancellableCompleter; + if (completer.isCancelled) { return; } if (errorCode != nullptr) { diff --git a/lib/src/realm_class.dart b/lib/src/realm_class.dart index 0411b837d..1845cd5bc 100644 --- a/lib/src/realm_class.dart +++ b/lib/src/realm_class.dart @@ -35,7 +35,7 @@ import 'session.dart'; import 'subscription.dart'; import 'set.dart'; -export 'package:cancellation_token/cancellation_token.dart' show CancellationToken, CancelledException; +export 'package:cancellation_token/cancellation_token.dart' show CancellationToken, TimeoutCancellationToken, CancelledException; export 'package:realm_common/realm_common.dart' show Backlink, @@ -122,7 +122,7 @@ export 'realm_object.dart' RealmObjectChanges, UserCallbackException; export 'realm_property.dart'; -export 'results.dart' show RealmResultsOfObject, RealmResultsChanges, RealmResults; +export 'results.dart' show RealmResultsOfObject, RealmResultsChanges, RealmResults, WaitForSyncMode, RealmResultsOfRealmObject; export 'session.dart' show ConnectionStateChange, diff --git a/lib/src/results.dart b/lib/src/results.dart index 160c49a92..84e53de46 100644 --- a/lib/src/results.dart +++ b/lib/src/results.dart @@ -18,6 +18,8 @@ import 'dart:async'; import 'dart:ffi'; +import 'package:cancellation_token/cancellation_token.dart'; + import 'collections.dart'; import 'native/realm_core.dart'; import 'realm_class.dart'; @@ -162,6 +164,98 @@ extension RealmResultsOfObject on RealmResults { } } +class _SubscribedRealmResult extends RealmResults { + final String? subscriptionName; + + _SubscribedRealmResult._(RealmResults results, {this.subscriptionName}) + : super._( + results.handle, + results.realm, + results.metadata, + ); +} + +extension RealmResultsOfRealmObject on RealmResults { + /// Adds this [RealmResults] query to the set of active subscriptions. + /// The query will be joined via an OR statement with any existing queries for the same type. + /// + /// If a [name] is given this allows you to later refer to the subscription by name, + /// e.g. when calling [MutableSubscriptionSet.removeByName]. + /// + /// If [update] is specified to `true`, then any existing query + /// with the same name will be replaced. + /// Otherwise a [RealmException] is thrown, in case of duplicates. + /// + /// [WaitForSyncMode] specifies how to wait or not wait for subscribed objects to be downloaded. + /// The default value is [WaitForSyncMode.firstTime]. + /// + /// The [cancellationToken] is optional and can be used to cancel + /// the waiting for objects to be downloaded. + /// If the operation is cancelled, a [CancelledException] is thrown and the download + /// continues in the background. + /// In case of using [TimeoutCancellationToken] and the time limit is exceeded, + /// a [TimeoutException] is thrown and the download continues in the background. + /// + /// {@category Sync} + Future> subscribe({ + String? name, + WaitForSyncMode waitForSyncMode = WaitForSyncMode.firstTime, + CancellationToken? cancellationToken, + bool update = false, + }) async { + Subscription? existingSubscription = name == null ? realm.subscriptions.find(this) : realm.subscriptions.findByName(name); + late Subscription updatedSubscription; + realm.subscriptions.update((mutableSubscriptions) { + updatedSubscription = mutableSubscriptions.add(this, name: name, update: update); + }); + bool shouldWait = waitForSyncMode == WaitForSyncMode.always || + (waitForSyncMode == WaitForSyncMode.firstTime && subscriptionIsChanged(existingSubscription, updatedSubscription)); + + return await CancellableFuture.from>(() async { + if (cancellationToken != null && cancellationToken.isCancelled) { + throw cancellationToken.exception!; + } + if (shouldWait) { + await realm.subscriptions.waitForSynchronization(cancellationToken); + await realm.syncSession.waitForDownload(cancellationToken); + } + return _SubscribedRealmResult._(this, subscriptionName: name); + }, cancellationToken); + } + + /// Unsubscribe from this query result. It returns immediately + /// without waiting for synchronization. + /// + /// If the subscription is unnamed, the subscription matching + /// the query will be removed. + /// Return `false` if the [RealmResults] is not created by [subscribe]. + /// + /// {@category Sync} + bool unsubscribe() { + bool unsubscribed = false; + if (realm.config is! FlexibleSyncConfiguration) { + throw RealmError('unsubscribe is only allowed on Realms opened with a FlexibleSyncConfiguration'); + } + if (this is _SubscribedRealmResult) { + final subscriptionName = (this as _SubscribedRealmResult).subscriptionName; + realm.subscriptions.update((mutableSubscriptions) { + if (subscriptionName != null) { + unsubscribed = mutableSubscriptions.removeByName(subscriptionName); + } else { + unsubscribed = mutableSubscriptions.removeByQuery(this); + } + }); + } + return unsubscribed; + } + + bool subscriptionIsChanged(Subscription? existingSubscription, Subscription updatedSubscription) { + return existingSubscription == null || + existingSubscription.objectClassName != updatedSubscription.objectClassName || + existingSubscription.queryString != updatedSubscription.queryString; + } +} + /// @nodoc //RealmResults package internal members extension RealmResultsInternal on RealmResults { @@ -180,12 +274,7 @@ extension RealmResultsInternal on RealmResults { RealmObjectMetadata get metadata => _metadata!; - static RealmResults create( - RealmResultsHandle handle, - Realm realm, - RealmObjectMetadata? metadata, - [int skip = 0] - ) => + static RealmResults create(RealmResultsHandle handle, Realm realm, RealmObjectMetadata? metadata, [int skip = 0]) => RealmResults._(handle, realm, metadata, skip); } @@ -257,3 +346,20 @@ class _RealmResultsIterator implements Iterator { return true; } } + +/// +/// Behavior when waiting for subscribed objects to be synchronized/downloaded. +/// +enum WaitForSyncMode { + /// Waits until the objects have been downloaded from the server + /// the first time the subscription is created. If the subscription + /// already exists, the [RealmResults] is returned immediately. + firstTime, + + /// Always waits until the objects have been downloaded from the server. + always, + + /// Never waits for the download to complete, but keeps downloading the + /// objects in the background. + never, +} diff --git a/lib/src/session.dart b/lib/src/session.dart index 91c8fef07..07058181e 100644 --- a/lib/src/session.dart +++ b/lib/src/session.dart @@ -59,9 +59,11 @@ class Session implements Finalizable { void resume() => realmCore.sessionResume(this); /// Waits for the [Session] to finish all pending uploads. - Future waitForUpload() => realmCore.sessionWaitForUpload(this); + /// An optional [cancellationToken] can be used to cancel the wait operation. + Future waitForUpload([CancellationToken? cancellationToken]) => realmCore.sessionWaitForUpload(this, cancellationToken); /// Waits for the [Session] to finish all pending downloads. + /// An optional [cancellationToken] can be used to cancel the wait operation. Future waitForDownload([CancellationToken? cancellationToken]) => realmCore.sessionWaitForDownload(this, cancellationToken); /// Gets a [Stream] of [SyncProgress] that can be used to track upload or download progress. diff --git a/lib/src/subscription.dart b/lib/src/subscription.dart index 66fdea4c0..2f095c32e 100644 --- a/lib/src/subscription.dart +++ b/lib/src/subscription.dart @@ -147,8 +147,8 @@ abstract class SubscriptionSet with IterableMixin implements Final return result == null ? null : Subscription._(result); } - Future _waitForStateChange(SubscriptionSetState state) async { - final result = await realmCore.waitForSubscriptionSetStateChange(this, state); + Future _waitForStateChange(SubscriptionSetState state, [CancellationToken? cancellationToken]) async { + final result = await realmCore.waitForSubscriptionSetStateChange(this, state, cancellationToken); realmCore.refreshSubscriptionSet(this); return result; } @@ -159,8 +159,9 @@ abstract class SubscriptionSet with IterableMixin implements Final /// the returned [Future] will complete immediately. If the state is /// [SubscriptionSetState.error], the returned future will throw an /// error. - Future waitForSynchronization() async { - final result = await _waitForStateChange(SubscriptionSetState.complete); + /// An optional [cancellationToken] can be used to cancel the wait operation. + Future waitForSynchronization([CancellationToken? cancellationToken]) async { + final result = await _waitForStateChange(SubscriptionSetState.complete, cancellationToken); if (result == SubscriptionSetState.error) { throw error!; } @@ -184,7 +185,7 @@ abstract class SubscriptionSet with IterableMixin implements Final @override Iterator get iterator => _SubscriptionIterator._(this); - /// Update the subscription set and send the request to the server in the background. + /// Updates the subscription set and send the request to the server in the background. /// /// Calling [update] is a prerequisite for mutating the subscription set, /// using a [MutableSubscriptionSet] passed to the [action]. @@ -272,21 +273,22 @@ class MutableSubscriptionSet extends SubscriptionSet { return Subscription._(realmCore.insertOrAssignSubscription(this, query, name, update)); } - /// Remove the [subscription] from the set, if it exists. + /// Removes the [subscription] from the set, if it exists. bool remove(Subscription subscription) { return realmCore.eraseSubscriptionById(this, subscription); } - /// Remove the [query] from the set, if it exists. + /// Removes the [query] from the set, if it exists. bool removeByQuery(RealmResults query) { return realmCore.eraseSubscriptionByResults(this, query); } - /// Remove the [query] from the set that matches by [name], if it exists. + /// Removes the subscription from the set that matches by [name], if it exists. bool removeByName(String name) { return realmCore.eraseSubscriptionByName(this, name); } + /// Removes the subscriptions from the set that matches by type, if it exists. bool removeByType() { final name = realm.schema.singleWhere((e) => e.type == T).name; var result = false; @@ -300,9 +302,19 @@ class MutableSubscriptionSet extends SubscriptionSet { return result; } - /// Clear the subscription set. - void clear() { - realmCore.clearSubscriptionSet(this); + /// Clears the subscription set. + /// If [unnamedOnly] is `true`, then only unnamed subscriptions will be removed. + void clear({bool unnamedOnly = false}) { + if (unnamedOnly) { + for (var i = length - 1; i >= 0; i--) { + final subscription = this[i]; + if (subscription.name == null) { + remove(subscription); + } + } + } else { + realmCore.clearSubscriptionSet(this); + } } } diff --git a/test/realm_test.dart b/test/realm_test.dart index ebd184552..3315345fe 100644 --- a/test/realm_test.dart +++ b/test/realm_test.dart @@ -26,7 +26,6 @@ import 'package:test/test.dart' hide test, throws; import 'package:timezone/timezone.dart' as tz; import 'package:timezone/data/latest.dart' as tz; import 'package:path/path.dart' as p; -import 'package:cancellation_token/cancellation_token.dart'; import '../lib/realm.dart'; import 'test.dart'; import '../lib/src/native/realm_core.dart'; @@ -1971,8 +1970,8 @@ Future _addDataToAtlas(Realm realm, String productNamePrefix, {int itemsCo await realm.syncSession.waitForDownload(); } -Future _addSubscriptions(Realm realm, String searchByPreffix) async { - final query = realm.query(r'name BEGINSWITH $0', [searchByPreffix]); +Future _addSubscriptions(Realm realm, String searchByPrefix) async { + final query = realm.query(r'name BEGINSWITH $0', [searchByPrefix]); if (realm.subscriptions.find(query) == null) { realm.subscriptions.update((mutableSubscriptions) => mutableSubscriptions.add(query)); } diff --git a/test/session_test.dart b/test/session_test.dart index 2c60938a2..5423e2671 100644 --- a/test/session_test.dart +++ b/test/session_test.dart @@ -135,6 +135,19 @@ Future main([List? args]) async { await realm.syncSession.waitForDownload(); }); + baasTest('SyncSession.waitForDownload/waitForUpload canceled', (configuration) async { + final realm = await getIntegrationRealm(); + final cancellationDownloadToken = CancellationToken(); + final waitForDownloadFuture = realm.syncSession.waitForDownload(cancellationDownloadToken); + cancellationDownloadToken.cancel(); + expect(() async => await waitForDownloadFuture, throwsA(isA())); + + final cancellationUploadToken = CancellationToken(); + final waitForUploadFuture = realm.syncSession.waitForUpload(cancellationUploadToken); + cancellationUploadToken.cancel(); + expect(() async => await waitForUploadFuture, throwsA(isA())); + }); + baasTest('SyncSesison.waitForUpload with changes', (configuration) async { final differentiator = ObjectId(); diff --git a/test/subscription_test.dart b/test/subscription_test.dart index 75cf7441b..294cda083 100644 --- a/test/subscription_test.dart +++ b/test/subscription_test.dart @@ -20,27 +20,13 @@ import 'dart:io'; import 'dart:math'; import 'dart:typed_data'; -import 'package:meta/meta.dart'; import 'package:test/expect.dart' hide throws; import '../lib/realm.dart'; -import '../lib/src/configuration.dart'; import '../lib/src/native/realm_core.dart'; import '../lib/src/subscription.dart'; import 'test.dart'; -@isTest -void testSubscriptions(String name, FutureOr Function(Realm) testFunc) async { - baasTest(name, (appConfiguration) async { - final app = App(appConfiguration); - final credentials = Credentials.anonymous(); - final user = await app.logIn(credentials); - final configuration = Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately; - final realm = getRealm(configuration); - await testFunc(realm); - }); -} - Future main([List? args]) async { await setupTests(args); @@ -50,13 +36,24 @@ Future main([List? args]) async { expect(() => realm.subscriptions, throws()); }); - testSubscriptions('SubscriptionSet.state/waitForSynchronization', (realm) async { + baasTest('SubscriptionSet.state/waitForSynchronization', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; await subscriptions.waitForSynchronization(); expect(subscriptions.state, SubscriptionSetState.complete); }); - testSubscriptions('SubscriptionSet.version', (realm) async { + baasTest('SubscriptionSet.state/waitForSynchronization canceled', (config) async { + final realm = await getIntegrationRealm(appConfig: config); + final subscriptions = realm.subscriptions; + final cancellationToken = CancellationToken(); + final waitFuture = subscriptions.waitForSynchronization(cancellationToken); + cancellationToken.cancel(); + expect(() async => await waitFuture, throwsA(isA())); + }); + + baasTest('SubscriptionSet.version', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; expect(subscriptions.version, 0); @@ -75,7 +72,8 @@ Future main([List? args]) async { expect(subscriptions.version, 2); }); - testSubscriptions('MutableSubscriptionSet.add', (realm) { + baasTest('MutableSubscriptionSet.add', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; final query = realm.all(); @@ -86,7 +84,8 @@ Future main([List? args]) async { expect(subscriptions.find(query), isNotNull); }); - testSubscriptions('MutableSubscriptionSet.add named', (realm) { + baasTest('MutableSubscriptionSet.add named', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; const name = 'some name'; @@ -98,7 +97,8 @@ Future main([List? args]) async { expect(subscriptions.findByName(name), s); }); - testSubscriptions('SubscriptionSet.find', (realm) { + baasTest('SubscriptionSet.find', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; final query = realm.all(); @@ -110,7 +110,8 @@ Future main([List? args]) async { expect(subscriptions.find(query), isNotNull); }); - testSubscriptions('SubscriptionSet.find return match, even if named', (realm) { + baasTest('SubscriptionSet.find return match, even if named', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; final query = realm.all(); @@ -123,7 +124,8 @@ Future main([List? args]) async { expect(subscriptions.find(query), s); }); - testSubscriptions('SubscriptionSet.findByName', (realm) { + baasTest('SubscriptionSet.findByName', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; const name = 'some name'; @@ -135,7 +137,8 @@ Future main([List? args]) async { expect(subscriptions.findByName(name), isNotNull); }); - testSubscriptions('MutableSubscriptionSet.remove', (realm) { + baasTest('MutableSubscriptionSet.remove', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; final query = realm.all(); @@ -152,7 +155,8 @@ Future main([List? args]) async { expect(subscriptions, isEmpty); }); - testSubscriptions('MutableSubscriptionSet.removeByQuery', (realm) { + baasTest('MutableSubscriptionSet.removeByQuery', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; final query = realm.all(); @@ -167,7 +171,8 @@ Future main([List? args]) async { expect(subscriptions, isEmpty); }); - testSubscriptions('MutableSubscriptionSet.removeByName', (realm) { + baasTest('MutableSubscriptionSet.removeByName', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; const name = 'some name'; @@ -182,7 +187,8 @@ Future main([List? args]) async { expect(subscriptions, isEmpty); }); - testSubscriptions('MutableSubscriptionSet.removeAll', (realm) { + baasTest('MutableSubscriptionSet.removeAll', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; subscriptions.update((mutableSubscriptions) { @@ -197,7 +203,8 @@ Future main([List? args]) async { expect(subscriptions, isEmpty); }); - testSubscriptions('SubscriptionSet.elementAt', (realm) { + baasTest('SubscriptionSet.elementAt', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; subscriptions.update((mutableSubscriptions) { @@ -219,7 +226,8 @@ Future main([List? args]) async { expect(() => subscriptions[1000], throws()); }); - testSubscriptions('MutableSubscriptionSet.elementAt', (realm) { + baasTest('MutableSubscriptionSet.elementAt', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; subscriptions.update((mutableSubscriptions) { @@ -231,7 +239,8 @@ Future main([List? args]) async { }); }); - testSubscriptions('MutableSubscriptionSet.add double-add throws', (realm) { + baasTest('MutableSubscriptionSet.add double-add throws', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; // Adding same unnamed query twice without requesting an update will just de-duplicate @@ -258,7 +267,8 @@ Future main([List? args]) async { }, throws('Duplicate subscription')); }); - testSubscriptions('MutableSubscriptionSet.add with update flag', (realm) { + baasTest('MutableSubscriptionSet.add with update flag', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; subscriptions.update((mutableSubscriptions) { @@ -276,7 +286,8 @@ Future main([List? args]) async { expect(subscriptions.length, 2); }); - testSubscriptions('MutableSubscriptionSet.add multiple queries for same class', (realm) { + baasTest('MutableSubscriptionSet.add multiple queries for same class', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; final random = Random.secure(); @@ -305,7 +316,8 @@ Future main([List? args]) async { } }); - testSubscriptions('MutableSubscriptionSet.add same name, different classes', (realm) { + baasTest('MutableSubscriptionSet.add same name, different classes', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; expect( @@ -316,7 +328,8 @@ Future main([List? args]) async { throws()); }); - testSubscriptions('MutableSubscriptionSet.add same name, different classes, with update flag', (realm) { + baasTest('MutableSubscriptionSet.add same name, different classes, with update flag', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; late Subscription subscription; @@ -329,7 +342,8 @@ Future main([List? args]) async { expect(subscriptions[0], subscription); // last added wins }); - testSubscriptions('MutableSubscriptionSet.add same query, different classes', (realm) { + baasTest('MutableSubscriptionSet.add same query, different classes', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; subscriptions.update((mutableSubscriptions) { @@ -343,7 +357,8 @@ Future main([List? args]) async { } }); - testSubscriptions('MutableSubscriptionSet.add illegal query', (realm) async { + baasTest('MutableSubscriptionSet.add illegal query', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; // Illegal query for subscription: @@ -356,7 +371,8 @@ Future main([List? args]) async { expect(() async => await subscriptions.waitForSynchronization(), throws("invalid RQL")); }); - testSubscriptions('MutableSubscriptionSet.remove same query, different classes', (realm) { + baasTest('MutableSubscriptionSet.remove same query, different classes', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; late Subscription s; @@ -374,7 +390,8 @@ Future main([List? args]) async { expect(subscriptions, [s]); }); - testSubscriptions('MutableSubscriptionSet.removeByType', (realm) { + baasTest('MutableSubscriptionSet.removeByType', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; late Subscription s; @@ -394,7 +411,8 @@ Future main([List? args]) async { expect(subscriptions, [s]); }); - testSubscriptions('Get subscriptions', (realm) async { + baasTest('Get subscriptions', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; expect(subscriptions, isEmpty); @@ -430,7 +448,8 @@ Future main([List? args]) async { await subscriptions.waitForSynchronization(); }); - testSubscriptions('Subscription properties roundtrip', (realm) async { + baasTest('Subscription properties roundtrip', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; final before = DateTime.now().toUtc(); @@ -507,7 +526,8 @@ Future main([List? args]) async { expect(() => realm.write(() => realm.add(Task(ObjectId()))), throws("no flexible sync subscription has been created")); }); - testSubscriptions('Filter realm data using query subscription', (realm) async { + baasTest('Filter realm data using query subscription', (config) async { + final realm = await getIntegrationRealm(appConfig: config); realm.subscriptions.update((mutableSubscriptions) { mutableSubscriptions.add(realm.all()); }); @@ -537,12 +557,8 @@ Future main([List? args]) async { expect(filtered.length, all.length); }); - baasTest('Subscriptions when realm is closed gets closed as well', (configuration) async { - final app = App(configuration); - final user = await getIntegrationUser(app); - - final config = Configuration.flexibleSync(user, getSyncSchema()); - final realm = getRealm(config); + baasTest('Subscriptions when realm is closed gets closed as well', (config) async { + final realm = await getIntegrationRealm(appConfig: config); final subscriptions = realm.subscriptions; expect(() => subscriptions.state, returnsNormally); @@ -578,4 +594,192 @@ Future main([List? args]) async { expect(writeReason.reason, 'write to ObjectID("$productId") in table "${writeReason.objectType}" not allowed; object is outside of the current query view'); expect(writeReason.primaryKey.value, productId); }); + + baasTest('Flexible sync subscribe/unsubscribe API', (config) async { + final realm = await getIntegrationRealm(appConfig: config); + final prefix = generateRandomString(4); + final byTestRun = "name BEGINSWITH '$prefix'"; + final query = realm.query(byTestRun); + await query.subscribe(); + + // Write new data and upload + realm.write(() { + realm.addAll([ + Event(ObjectId(), name: "$prefix NPM Event", isCompleted: true, durationInMinutes: 30), + Event(ObjectId(), name: "$prefix NPM Meeting", isCompleted: false, durationInMinutes: 10), + Event(ObjectId(), name: "$prefix Some other event", isCompleted: true, durationInMinutes: 15), + ]); + }); + expect(query.length, 3); + await realm.syncSession.waitForUpload(); + + // Remove all the data from realm file after synchronization completes + realm.subscriptions.update((mutableSubscriptions) => mutableSubscriptions.clear()); + await realm.subscriptions.waitForSynchronization(); + expect(query.length, 0); + + // Subscribing will download only the objects with names containing 'NPM' + final subscribedByName = await realm.query('$byTestRun AND name CONTAINS \$0', ["NPM"]).subscribe(); + expect(subscribedByName.length, 2); + + // Adding subscription by duration on top of downloaded objects by name + // will remove the objects, which don't match duration < 20, from the local realm + final subscribedByNameAndDuration = await subscribedByName.query(r'durationInMinutes < $0', [20]).subscribe(); + expect(subscribedByNameAndDuration.length, 1); + expect(subscribedByNameAndDuration[0].durationInMinutes, 10); + expect(subscribedByNameAndDuration[0].name, contains("NPM")); + + // Query local realm by duration + final filteredByDuration = realm.query("$byTestRun AND durationInMinutes < \$0", [20]); + expect(filteredByDuration.length, 1); // duration 10 only, because there is subscription by name containing 'NPM' and duration < 20 + + // Subscribing only by duration will download all objects with duration < 20 independent on the name + final subscribedByDuration = await filteredByDuration.subscribe(); + expect(subscribedByDuration.length, 2); // duration 10 and 15, because all objects with durations < 20 are downloaded + }); + + test("Using flexible sync subscribe API for local realm throws", () async { + final config = Configuration.local([Event.schema]); + final realm = getRealm(config); + await expectLater( + () => realm.all().subscribe(), throws("subscriptions is only valid on Realms opened with a FlexibleSyncConfiguration")); + expect(() => realm.all().unsubscribe(), throws("unsubscribe is only allowed on Realms opened with a FlexibleSyncConfiguration")); + }); + + baasTest('Flexible sync subscribe API - duplicated subscription', (config) async { + final realm = await getIntegrationRealm(appConfig: config); + final subscriptionName1 = "sub1"; + final subscriptionName2 = "sub2"; + final query1 = realm.all(); + final query2 = realm.query("name = \$0", ["some name"]); + + await query1.subscribe(name: subscriptionName1); + expect(realm.subscriptions.length, 1); + + //Replace subscription with query2 using the same name and update flag + await query2.subscribe(name: subscriptionName1, update: true); + expect(realm.subscriptions.length, 1); + expect(realm.subscriptions.findByName(subscriptionName1), isNotNull); + + //Subscribe for the same query2 with different name + await query2.subscribe(name: subscriptionName2); + expect(realm.subscriptions.length, 2); + expect(realm.subscriptions.findByName(subscriptionName1), isNotNull); + expect(realm.subscriptions.findByName(subscriptionName2), isNotNull); + + //Add query subscription with the same name and update=false throws + await expectLater(() => query1.subscribe(name: subscriptionName2), throws("Duplicate subscription with name: $subscriptionName2")); + }); + + baasTest('Flexible sync subscribe/unsubscribe and removeAllUnnamed', (config) async { + final realm = await getIntegrationRealm(appConfig: config); + final subscriptionName1 = "sub1"; + final subscriptionName2 = "sub2"; + final query = realm.all(); + final queryFiltered = realm.query("name='x'"); + + final unnamedResults = await query.subscribe(); // +1 unnamed subscription + await query.subscribe(); // +0 subscription already exists + await realm.all().subscribe(); // +0 subscription already exists + await queryFiltered.subscribe(); // +1 unnamed subscription + final namedResults1 = await query.subscribe(name: subscriptionName1); // +1 named subscription + final namedResults2 = await query.subscribe(name: subscriptionName2); // +1 named subscription + expect(realm.subscriptions.length, 4); + + expect(query.unsubscribe(), isFalse); // -0 (query is not a subscription) + expect(realm.subscriptions.length, 4); + + expect(unnamedResults.unsubscribe(), isTrue); // -1 unnamed subscription on query + expect(realm.subscriptions.length, 3); + expect(realm.subscriptions.find(queryFiltered), isNotNull); + expect(realm.subscriptions.findByName(subscriptionName1), isNotNull); + expect(realm.subscriptions.findByName(subscriptionName2), isNotNull); + + realm.subscriptions.update((mutableSubscriptions) => mutableSubscriptions.clear(unnamedOnly: true)); // -1 unnamed subscription on queryFiltered + + expect(realm.subscriptions.length, 2); + expect(realm.subscriptions.findByName(subscriptionName1), isNotNull); + expect(realm.subscriptions.findByName(subscriptionName2), isNotNull); + + expect(namedResults1.unsubscribe(), isTrue); // -1 named subscription sub1 + expect(realm.subscriptions.length, 1); + expect(realm.subscriptions.findByName(subscriptionName2), isNotNull); + + expect(namedResults2.unsubscribe(), isTrue); // -1 named subscription + expect(realm.subscriptions.length, 0); + }); + + baasTest('Flexible sync subscribe/unsubscribe API wait for download', (configuration) async { + int count = 2; + RealmResults query = await _getQueryToSubscribeForDownload(configuration, count); + final results = await query.subscribe(waitForSyncMode: WaitForSyncMode.never); + expect(results.length, 0); // didn't wait for downloading because of WaitForSyncMode.never + + final second = await query.subscribe(waitForSyncMode: WaitForSyncMode.always); + expect(second.length, count); // product_1 and product_21 + }); + + baasTest('Flexible sync subscribe/unsubscribe cancellation token', (configuration) async { + RealmResults query = await _getQueryToSubscribeForDownload(configuration, 3); + + // Wait Always if timeout expired + final timeoutCancellationToken = TimeoutCancellationToken(Duration(microseconds: 0)); + await expectLater( + () async => await query.subscribe(waitForSyncMode: WaitForSyncMode.always, cancellationToken: timeoutCancellationToken), + throwsA(isA()), + ); + + // Wait Always but cancel berfore + final cancellationToken = CancellationToken(); + cancellationToken.cancel(); + await expectLater( + query.subscribe(waitForSyncMode: WaitForSyncMode.always, cancellationToken: cancellationToken), + throwsA(isA()), + ); + + // Wait Never but cancel before + final cancellationToken1 = CancellationToken(); + cancellationToken1.cancel(); + await expectLater( + query.subscribe(waitForSyncMode: WaitForSyncMode.never, cancellationToken: cancellationToken1), + throwsA(isA()), + ); + + // Wait Always but cancel after + final cancellationToken2 = CancellationToken(); + final subFuture = query.subscribe(waitForSyncMode: WaitForSyncMode.always, cancellationToken: cancellationToken2); + cancellationToken2.cancel(); + + expect( + () async => await subFuture, + throwsA(isA()), + ); + }); +} + +Future> _getQueryToSubscribeForDownload(AppConfiguration configuration, int takeCount) async { + final prefix = generateRandomString(4); + final byTestRun = "name BEGINSWITH '$prefix'"; + App app = App(configuration); + final userA = await app.logIn(Credentials.anonymous(reuseCredentials: false)); + final configA = Configuration.flexibleSync(userA, getSyncSchema()); + final realmA = getRealm(configA); + await realmA.query(byTestRun).subscribe(); + List names = []; + realmA.write(() { + for (var i = 0; i < 20; i++) { + final name = "${prefix}_${i + 1}"; + names.add(name); + realmA.add(Product(ObjectId(), name)); + } + }); + await realmA.syncSession.waitForUpload(); + realmA.close(); + + final userB = await app.logIn(Credentials.anonymous(reuseCredentials: false)); + final configB = Configuration.flexibleSync(userB, getSyncSchema()); + final realmB = getRealm(configB); + final query = realmB.query('$byTestRun AND name IN \$0', [names.take(takeCount)]); + + return query; } diff --git a/test/test.dart b/test/test.dart index 2df22d03a..e58b1c4e6 100644 --- a/test/test.dart +++ b/test/test.dart @@ -396,9 +396,12 @@ void test(String name, dynamic Function() testFunction, {dynamic skip, Map setupTests(List? args) async { Realm.logger = Logger.detached('test run') ..level = Level.ALL ..onRecord.listen((record) { + if (record.level.value >= RealmLogLevel.warn.value) { + print('${record.time} ${record.level.name}: ${record.message}'); + } + testing.printOnFailure('${record.time} ${record.level.name}: ${record.message}'); }); @@ -645,13 +652,9 @@ Future baasTest( skip = shouldSkip(baasUri, skip); test(name, () async { - try { - final config = await getAppConfig(appName: appName); - await testFunction(config); - } catch (error) { - printSplunkLogLink(appName, baasUri); - rethrow; - } + printSplunkLogLink(appName, baasUri); + final config = await getAppConfig(appName: appName); + await testFunction(config); }, skip: skip); } @@ -708,7 +711,7 @@ Future getIntegrationRealm({App? app, ObjectId? differentiator, AppConfig app ??= App(appConfig ?? await getAppConfig()); final user = await getIntegrationUser(app); - final config = Configuration.flexibleSync(user, getSyncSchema()); + final config = Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately; final realm = getRealm(config); if (differentiator != null) { realm.subscriptions.update((mutableSubscriptions) { @@ -823,15 +826,16 @@ void printSplunkLogLink(AppNames appName, String? uriVariable) { if (uriVariable == null) { return; } + final app = baasApps[appName.name] ?? baasApps.values.firstWhere((element) => element.name == BaasClient.defaultAppName, orElse: () => throw RealmError("No BAAS apps")); final baasUri = Uri.parse(uriVariable); - print("App service name: ${app.uniqueName}"); + testing.printOnFailure("App service name: ${app.uniqueName}"); final host = baasUri.host.endsWith('-qa.mongodb.com') ? "-qa" : ""; final splunk = Uri.encodeFull( "https://splunk.corp.mongodb.com/en-US/app/search/search?q=search index=baas$host \"${app.uniqueName}-*\" | reverse | top error msg&earliest=-7d&latest=now&display.general.type=visualizations"); - print("Splunk logs: $splunk"); + testing.printOnFailure("Splunk logs: $splunk"); } /// Schema list for default app service