Skip to content

Commit

Permalink
Flexible sync subscribe/unsubscribe API (#1354)
Browse files Browse the repository at this point in the history
Co-authored-by: Kasper Overgård Nielsen <kasper.nielsen@mongodb.com>
Co-authored-by: Nikola Irinchev <irinchev@me.com>
  • Loading branch information
3 people authored Nov 14, 2023
1 parent 6cb6822 commit b6eb23b
Show file tree
Hide file tree
Showing 14 changed files with 495 additions and 123 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -673,24 +673,23 @@ jobs:

slack-on-failure:
name: Report failure in main branch
needs:
needs:
- cleanup-dart-matrix
- cleanup-flutter-matrix
runs-on: ubuntu-latest
if: always()
if: always() && github.ref == 'refs/heads/main'
steps:
# Run this action to set env.WORKFLOW_CONCLUSION
- uses: technote-space/workflow-conclusion-action@45ce8e0eb155657ab8ccf346ade734257fd196a5

- uses: act10ns/slack@ed1309ab9862e57e9e583e51c7889486b9a00b0f
if: ${{ github.ref == 'refs/heads/main' && env.WORKFLOW_CONCLUSION == 'FAILURE' }}
if: ${{ github.ref == 'refs/heads/main' && env.WORKFLOW_CONCLUSION == 'FAILURE' }}
# Statuses: neutral, success, skipped, cancelled, timed_out, action_required, failure
with:
status: ${{ env.WORKFLOW_CONCLUSION }}
status: ${{ env.WORKFLOW_CONCLUSION }}
webhook-url: ${{ secrets.SLACK_DART_WEBHOOK }}
channel: '#realm-github-dart'
message: |
*<https://github.com/realm/realm-dart/actions/runs/${{ github.run_id }}/attempts/${{ github.run_attempt }}|_{{workflow}}_ run id: ${{ github.run_id }} has status _{{jobStatus}}_ >*
<{{refUrl}}|`{{ref}}` - {{description}}>
{{#if description}}<{{diffUrl}}|branch: `{{diffRef}}`>{{/if}}
6 changes: 5 additions & 1 deletion .github/workflows/dart-desktop-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ jobs:
- name : Setup Dart SDK
uses: dart-lang/setup-dart@main
with:
sdk: ${{ contains(inputs.os, 'linux') && '3.0.6' || 'stable'}}
sdk: 'stable'
architecture: ${{ inputs.architecture == 'arm' && 'arm64' || 'x64'}}

- name: Install dependencies
run: dart pub get

- name: Bump ulimit
run: ulimit -n 10240
if: ${{ contains(inputs.os, 'macos') }}

# This will be a no-op under normal circumstances since the cluster would have been deployed
# in deploy-cluster. It is needed in case we want to re-run the job after the cluster has been reaped.
- name: Create cluster
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/flutter-desktop-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ jobs:
- name: Install dependencies
run: dart pub get

- name: Bump ulimit
run: ulimit -n 10240
if: ${{ contains(inputs.os, 'macos') }}

# This will be a no-op under normal circumstances since the cluster would have been deployed
# in deploy-cluster. It is needed in case we want to re-run the job after the cluster has been reaped.
- name: Create cluster
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* Throw an exception if `File::unlock` has failed, in order to inform the SDK that we are likely hitting some limitation on the OS filesystem, instead of crashing the application and use the same file locking logic for all the platforms. (Core upgrade)
* Lift a restriction that prevents asymmetric objects from linking to non-embedded objects. ([#1403](https://github.com/realm/realm-dart/issues/1403))
* Add ISRG X1 Root certificate (used by lets-encrypt and hence MongoDB) to `SecurityContext` of the default `HttpClient`. This ensure we work out-of-the-box on older devices (in particular Android 7 and earlier), as well as some Windows machines. ([#1187](https://github.com/realm/realm-dart/issues/1187), [#1370](https://github.com/realm/realm-dart/issues/1370))
* Added new flexible sync API `RealmResults.subscribe()` and `RealmResults.unsubscribe()` as an easy way to create subscriptions and download data in background. Added named parameter to `MutableSubscriptionSet.clear({bool unnamedOnly = false})` for removing all the unnamed subscriptions. ([#1354](https://github.com/realm/realm-dart/pull/1354))
* Added `cancellationToken` parameter to `Session.waitForDownload()`, `Session.waitForUpload()` and `SubscriptionSet.waitForSynchronization()`. ([#1354](https://github.com/realm/realm-dart/pull/1354))

### Fixed
* Fixed iteration after `skip` bug ([#1409](https://github.com/realm/realm-dart/issues/1409))
Expand Down Expand Up @@ -55,7 +57,6 @@
* Added support for query on `RealmSet`. ([#1346](https://github.com/realm/realm-dart/pull/1346))
* Support for passing `List`, `Set` or `Iterable` arguments to queries with `IN`-operators. ([#1346](https://github.com/realm/realm-dart/pull/1346))


### Fixed
* Fixed an early unlock race condition during client reset callbacks. ([#1335](https://github.com/realm/realm-dart/pull/1335))
* Rare corruption of files on streaming format (often following compact, convert or copying to a new file). (Core upgrade, since v12.12.0)
Expand Down
7 changes: 6 additions & 1 deletion lib/src/app.dart
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ class AppException extends RealmException {

@override
String toString() {
return "AppException: $message, link to server logs: $linkToServerLogs";
var errorString = "AppException: $message, status code: $statusCode";
if (linkToServerLogs != null) {
errorString += ", link to server logs: $linkToServerLogs";
}

return errorString;
}
}
89 changes: 54 additions & 35 deletions lib/src/native/realm_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -413,17 +413,21 @@ class _RealmCore {
}

static void _stateChangeCallback(Object userdata, int state) {
final completer = userdata as Completer<SubscriptionSetState>;

completer.complete(SubscriptionSetState.values[state]);
final completer = userdata as CancellableCompleter<SubscriptionSetState>;
if (!completer.isCancelled) {
completer.complete(SubscriptionSetState.values[state]);
}
}

Future<SubscriptionSetState> waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen) {
final completer = Completer<SubscriptionSetState>();
final callback = Pointer.fromFunction<Void Function(Handle, Int32)>(_stateChangeCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_on_subscription_set_state_change_async(subscriptions.handle._pointer, notifyWhen.index,
_realmLib.addresses.realm_dart_sync_on_subscription_state_changed_callback, userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
Future<SubscriptionSetState> waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen,
[CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<SubscriptionSetState>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Int32)>(_stateChangeCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_on_subscription_set_state_change_async(subscriptions.handle._pointer, notifyWhen.index,
_realmLib.addresses.realm_dart_sync_on_subscription_state_changed_callback, userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
}
return completer.future;
}

Expand Down Expand Up @@ -666,23 +670,26 @@ class _RealmCore {

Future<RealmHandle> openRealmAsync(RealmAsyncOpenTaskHandle handle, CancellationToken? cancellationToken) {
final completer = CancellableCompleter<RealmHandle>(cancellationToken);
final callback =
Pointer.fromFunction<Void Function(Handle, Pointer<realm_thread_safe_reference> realm, Pointer<realm_async_error_t> error)>(_openRealmAsyncCallback);
final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_async_open_task_start(
handle._pointer,
_realmLib.addresses.realm_dart_async_open_task_callback,
userData.cast(),
_realmLib.addresses.realm_dart_userdata_async_free,
);

if (!completer.isCancelled) {
final callback =
Pointer.fromFunction<Void Function(Handle, Pointer<realm_thread_safe_reference> realm, Pointer<realm_async_error_t> error)>(_openRealmAsyncCallback);
final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_async_open_task_start(
handle._pointer,
_realmLib.addresses.realm_dart_async_open_task_callback,
userData.cast(),
_realmLib.addresses.realm_dart_userdata_async_free,
);
}
return completer.future;
}

static void _openRealmAsyncCallback(Object userData, Pointer<realm_thread_safe_reference> realmSafePtr, Pointer<realm_async_error_t> error) {
return using((Arena arena) {
final completer = userData as Completer<RealmHandle>;

final completer = userData as CancellableCompleter<RealmHandle>;
if (completer.isCancelled) {
return;
}
if (error != nullptr) {
final err = arena<realm_error>();
bool success = _realmLib.realm_get_async_error(error, err);
Expand Down Expand Up @@ -1755,13 +1762,12 @@ class _RealmCore {
await using((arena) async {
final response_pointer = arena<realm_http_response>();
final responseRef = response_pointer.ref;
final method = _HttpMethod.values[requestMethod];

try {
// Build request
late HttpClientRequest request;

// this throws if requestMethod is unknown _HttpMethod
final method = _HttpMethod.values[requestMethod];

switch (method) {
case _HttpMethod.delete:
request = await client.deleteUrl(url);
Expand All @@ -1788,8 +1794,16 @@ class _RealmCore {
request.add(utf8.encode(body));
}

Realm.logger.log(RealmLogLevel.debug, "HTTP Transport: Executing ${method.name} $url");

final stopwatch = Stopwatch()..start();

// Do the call..
final response = await request.close();

stopwatch.stop();
Realm.logger.log(RealmLogLevel.debug, "HTTP Transport: Executed ${method.name} $url: ${response.statusCode} in ${stopwatch.elapsedMilliseconds} ms");

final responseBody = await response.fold<List<int>>([], (acc, l) => acc..addAll(l)); // gather response

// Report back to core
Expand All @@ -1816,11 +1830,14 @@ class _RealmCore {
});

responseRef.custom_status_code = _CustomErrorCode.noError.code;
} on SocketException catch (_) {
} on SocketException catch (socketEx) {
Realm.logger.log(RealmLogLevel.warn, "HTTP Transport: SocketException executing ${method.name} $url: $socketEx");
responseRef.custom_status_code = _CustomErrorCode.timeout.code;
} on HttpException catch (_) {
} on HttpException catch (httpEx) {
Realm.logger.log(RealmLogLevel.warn, "HTTP Transport: HttpException executing ${method.name} $url: $httpEx");
responseRef.custom_status_code = _CustomErrorCode.unknownHttp.code;
} catch (_) {
} catch (ex) {
Realm.logger.log(RealmLogLevel.error, "HTTP Transport: Exception executing ${method.name} $url: $ex");
responseRef.custom_status_code = _CustomErrorCode.unknown.code;
} finally {
_realmLib.realm_http_transport_complete_request(request_context, response_pointer);
Expand Down Expand Up @@ -2320,12 +2337,14 @@ class _RealmCore {
controller.onConnectionStateChange(ConnectionState.values[oldState], ConnectionState.values[newState]);
}

Future<void> sessionWaitForUpload(Session session) {
final completer = Completer<void>();
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_error_t>)>(_sessionWaitCompletionCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_session_wait_for_upload_completion(session.handle._pointer, _realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
Future<void> sessionWaitForUpload(Session session, [CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<void>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_error_t>)>(_sessionWaitCompletionCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_session_wait_for_upload_completion(session.handle._pointer, _realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
}
return completer.future;
}

Expand All @@ -2341,8 +2360,8 @@ class _RealmCore {
}

static void _sessionWaitCompletionCallback(Object userdata, Pointer<realm_error_t> errorCode) {
final completer = userdata as Completer<void>;
if (completer.isCompleted) {
final completer = userdata as CancellableCompleter<void>;
if (completer.isCancelled) {
return;
}
if (errorCode != nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions lib/src/realm_class.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import 'session.dart';
import 'subscription.dart';
import 'set.dart';

export 'package:cancellation_token/cancellation_token.dart' show CancellationToken, CancelledException;
export 'package:cancellation_token/cancellation_token.dart' show CancellationToken, TimeoutCancellationToken, CancelledException;
export 'package:realm_common/realm_common.dart'
show
Backlink,
Expand Down Expand Up @@ -122,7 +122,7 @@ export 'realm_object.dart'
RealmObjectChanges,
UserCallbackException;
export 'realm_property.dart';
export 'results.dart' show RealmResultsOfObject, RealmResultsChanges, RealmResults;
export 'results.dart' show RealmResultsOfObject, RealmResultsChanges, RealmResults, WaitForSyncMode, RealmResultsOfRealmObject;
export 'session.dart'
show
ConnectionStateChange,
Expand Down
Loading

0 comments on commit b6eb23b

Please sign in to comment.