From 47772d5788edea881742ed4cd93b8b620f41dd3e Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Wed, 17 Jul 2024 18:49:02 +0200 Subject: [PATCH 01/14] Dart: Introduce abstract Client and Subscription classes Right now their only implementations are the DirectClient and DirectClientSubscription from the original code. The idea is to create other types which will pass the commands to a session running in the native code. --- bindings/dart/lib/client.dart | 100 ++++++++++++++++++++++++--- bindings/dart/lib/ouisync.dart | 79 ++++++--------------- bindings/dart/lib/state_monitor.dart | 2 +- 3 files changed, 115 insertions(+), 66 deletions(-) diff --git a/bindings/dart/lib/client.dart b/bindings/dart/lib/client.dart index be0204e6..ef7f1f89 100644 --- a/bindings/dart/lib/client.dart +++ b/bindings/dart/lib/client.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:collection'; +import 'dart:convert'; import 'dart:ffi'; import 'dart:isolate'; import 'dart:typed_data'; @@ -10,20 +11,29 @@ import 'package:msgpack_dart/msgpack_dart.dart'; import 'bindings.dart'; import 'ouisync.dart' show Error; -/// Client to interface with ouisync -class Client { +abstract class Client { + Future invoke(String method, [Object? args]); + Future close(); + bool get isClosed; + Subscription subscribe(String name, Object? arg); +} + +/// Client to interface with ouisync running in the same process as the FlutterEngine. +/// Ouisync backend (Rust) function invokations are done through FFI. +class DirectClient extends Client { int _handle; final Stream _stream; var _nextMessageId = 0; final _responses = HashMap>(); final _subscriptions = HashMap>(); - Client(this._handle, ReceivePort port) : _stream = port.cast() { + DirectClient(this._handle, ReceivePort port) : _stream = port.cast() { unawaited(_receive()); } int get handle => _handle; + @override Future invoke(String method, [Object? args]) async { final id = _nextMessageId++; final completer = Completer(); @@ -57,12 +67,36 @@ class Client { } } - int close() { + @override + Future close() async { + final handle = _handle; + _handle = 0; + + if (handle == 0) { + return; + } + + await _invoke_native_async( + (port) => bindings.session_close( + handle, + NativeApi.postCObject, + port, + ), + ); + } + + void closeSync() { final handle = _handle; _handle = 0; - return handle; + + if (handle == 0) { + return; + } + + bindings.session_close_blocking(handle); } + @override bool get isClosed => _handle == 0; void _send(Uint8List data) { @@ -176,17 +210,39 @@ class Client { sink.addError(error); } } + + @override + Subscription subscribe(String name, Object? arg) { + return _DirectClientSubscription(this, name, arg); + } + + Future copyToRawFd(int fileHandle, int fd) { + return _invoke_native_async( + (port) => bindings.file_copy_to_raw_fd( + handle, + fileHandle, + fd, + NativeApi.postCObject, + port, + ), + ); + } +} + +abstract class Subscription { + Stream get stream; + Future close(); } -class Subscription { - final Client _client; +class _DirectClientSubscription extends Subscription { + final DirectClient _client; final StreamController _controller; final String _name; final Object? _arg; int _id = 0; _SubscriptionState _state = _SubscriptionState.idle; - Subscription(this._client, this._name, this._arg) + _DirectClientSubscription(this._client, this._name, this._arg) : _controller = StreamController.broadcast() { _controller.onListen = () => _switch(_SubscriptionState.subscribing); _controller.onCancel = () => _switch(_SubscriptionState.unsubscribing); @@ -194,6 +250,7 @@ class Subscription { Stream get stream => _controller.stream; + @override Future close() async { if (_controller.hasListener) { await _controller.close(); @@ -274,3 +331,30 @@ enum _SubscriptionState { subscribing, unsubscribing, } + +// Helper to invoke a native async function. +Future _invoke_native_async(void Function(int) fun) async { + final recvPort = ReceivePort(); + + try { + fun(recvPort.sendPort.nativePort); + + final bytes = await recvPort.cast().first; + + if (bytes.isEmpty) { + return; + } + + final code = ErrorCode.decode(bytes.buffer.asByteData().getUint16(0)); + final message = utf8.decode(bytes.sublist(2)); + + if (code == ErrorCode.ok) { + return; + } else { + throw Error(code, message); + } + } finally { + recvPort.close(); + } +} + diff --git a/bindings/dart/lib/ouisync.dart b/bindings/dart/lib/ouisync.dart index 68ea1bf7..010077b2 100644 --- a/bindings/dart/lib/ouisync.dart +++ b/bindings/dart/lib/ouisync.dart @@ -36,7 +36,7 @@ class Session { String? _mountPoint; Session._(this._client) - : _networkSubscription = Subscription(_client, "network", null); + : _networkSubscription = _client.subscribe("network", null); /// Creates a new session in this process. /// [configPath] is a path to a directory where configuration files shall be stored. If it @@ -71,7 +71,7 @@ class Session { throw Error(errorCode, errorMessage); } - final client = Client(handle, recvPort); + final client = DirectClient(handle, recvPort); return Session._(client); } @@ -207,19 +207,7 @@ class Session { /// `closeSync` function. Future close() async { await _networkSubscription.close(); - - final handle = _client.close(); - if (handle == 0) { - return; - } - - await _invoke( - (port) => bindings.session_close( - handle, - NativeApi.postCObject, - port, - ), - ); + await _client.close(); } /// Try to gracefully close connections to peers then close the session. @@ -227,15 +215,18 @@ class Session { /// shutdown (on app exit). In those situations the network needs to be shut /// down using a blocking call. /// + /// This function closes the session only if the client used is the DirectClient. + /// Otherwise it throws. + /// /// Note that this function is idempotent with itself as well as with the /// `close` function. void closeSync() { - final handle = _client.close(); - if (handle == 0) { - return; + final client = _client; + if (client is DirectClient) { + client.closeSync(); + } else { + throw "closeSync is currently only implemented for DirectClient"; } - - bindings.session_close_blocking(handle); } } @@ -312,7 +303,7 @@ class Repository { final Subscription _subscription; Repository._(this._client, this._handle, this._store) - : _subscription = Subscription(_client, "repository", _handle); + : _subscription = _client.subscribe("repository", _handle); /// Creates a new repository and set access to it based on the following table: /// @@ -903,20 +894,20 @@ class File { Future get progress => _client.invoke('file_progress', _handle); /// Copy the contents of the file into the provided raw file descriptor. - Future copyToRawFd(int fd) { + /// TODO: Right now this function only works when using the DirectClient, + /// otherwise throws. + Future copyToRawFd(int fd) async { if (debugTrace) { print("File.copyToRawFd"); } - return _invoke( - (port) => bindings.file_copy_to_raw_fd( - _client.handle, - _handle, - fd, - NativeApi.postCObject, - port, - ), - ); + final client = _client; + + if (client is DirectClient) { + await client.copyToRawFd(_handle, fd); + } else { + throw "copyToRawFd is currently implemented only for DirectClient"; + } } } @@ -952,32 +943,6 @@ T _withPoolSync(T Function(_Pool) fun) { } } -// Helper to invoke a native async function. -Future _invoke(void Function(int) fun) async { - final recvPort = ReceivePort(); - - try { - fun(recvPort.sendPort.nativePort); - - final bytes = await recvPort.cast().first; - - if (bytes.isEmpty) { - return; - } - - final code = ErrorCode.decode(bytes.buffer.asByteData().getUint16(0)); - final message = utf8.decode(bytes.sublist(2)); - - if (code == ErrorCode.ok) { - return; - } else { - throw Error(code, message); - } - } finally { - recvPort.close(); - } -} - // Allocator that tracks all allocations and frees them all at the same time. class _Pool implements Allocator { List> ptrs = []; diff --git a/bindings/dart/lib/state_monitor.dart b/bindings/dart/lib/state_monitor.dart index 4a9dccc4..e19c0af6 100644 --- a/bindings/dart/lib/state_monitor.dart +++ b/bindings/dart/lib/state_monitor.dart @@ -109,7 +109,7 @@ class StateMonitor { StateMonitor._(_client, [..._path, childId]); Subscription subscribe() => - Subscription(_client, "state_monitor", _path.map((id) => id.toString())); + _client.subscribe("state_monitor", _path.map((id) => id.toString())); @override String toString() => "StateMonitor($_path)"; From a3a198da12ffe888e7cf9dbe3c5b9ca4d46b6fb0 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Thu, 18 Jul 2024 13:41:54 +0200 Subject: [PATCH 02/14] WIP started implementing ChannelClient --- bindings/dart/lib/client.dart | 271 ++---------------- bindings/dart/lib/internal/channel.dart | 4 + .../dart/lib/internal/channel_client.dart | 51 ++++ bindings/dart/lib/internal/direct_client.dart | 227 +++++++++++++++ bindings/dart/lib/ouisync.dart | 4 +- bindings/dart/lib/state_monitor.dart | 2 +- 6 files changed, 308 insertions(+), 251 deletions(-) create mode 100644 bindings/dart/lib/internal/channel.dart create mode 100644 bindings/dart/lib/internal/channel_client.dart create mode 100644 bindings/dart/lib/internal/direct_client.dart diff --git a/bindings/dart/lib/client.dart b/bindings/dart/lib/client.dart index ef7f1f89..85d258f6 100644 --- a/bindings/dart/lib/client.dart +++ b/bindings/dart/lib/client.dart @@ -1,248 +1,49 @@ import 'dart:async'; import 'dart:collection'; -import 'dart:convert'; -import 'dart:ffi'; -import 'dart:isolate'; -import 'dart:typed_data'; -import 'package:ffi/ffi.dart'; -import 'package:msgpack_dart/msgpack_dart.dart'; +import 'internal/direct_client.dart'; +export 'internal/direct_client.dart'; -import 'bindings.dart'; -import 'ouisync.dart' show Error; +import 'internal/channel_client.dart'; +export 'internal/channel_client.dart'; abstract class Client { Future invoke(String method, [Object? args]); Future close(); - bool get isClosed; - Subscription subscribe(String name, Object? arg); + Subscriptions subscriptions(); } -/// Client to interface with ouisync running in the same process as the FlutterEngine. -/// Ouisync backend (Rust) function invokations are done through FFI. -class DirectClient extends Client { - int _handle; - final Stream _stream; - var _nextMessageId = 0; - final _responses = HashMap>(); - final _subscriptions = HashMap>(); +class Subscriptions { + final _sinks = HashMap>(); - DirectClient(this._handle, ReceivePort port) : _stream = port.cast() { - unawaited(_receive()); - } - - int get handle => _handle; - - @override - Future invoke(String method, [Object? args]) async { - final id = _nextMessageId++; - final completer = Completer(); - - _responses[id] = completer; - - final request = {method: args}; - - // DEBUG - //print('send: id: $id, request: $request'); - - try { - // Message format: - // - // +-------------------------------------+-------------------------------------------+ - // | id (big endian 64 bit unsigned int) | request (messagepack encoded byte string) | - // +-------------------------------------+-------------------------------------------+ - // - // This allows the server to decode the id even if the request is malformed so it can send - // error response back. - final message = (BytesBuilder() - ..add((ByteData(8)..setUint64(0, id)).buffer.asUint8List()) - ..add(serialize(request))) - .takeBytes(); - - _send(message); - - return await completer.future as T; - } finally { - _responses.remove(id); - } - } - - @override - Future close() async { - final handle = _handle; - _handle = 0; - - if (handle == 0) { - return; - } - - await _invoke_native_async( - (port) => bindings.session_close( - handle, - NativeApi.postCObject, - port, - ), - ); - } + void handle(int subscriptionId, Object? payload) { + final sink = _sinks[subscriptionId]; - void closeSync() { - final handle = _handle; - _handle = 0; - - if (handle == 0) { - return; - } - - bindings.session_close_blocking(handle); - } - - @override - bool get isClosed => _handle == 0; - - void _send(Uint8List data) { - if (_handle == 0) { - throw StateError('session has been closed'); - } - - // TODO: is there a way to do this without having to allocate whole new buffer? - var buffer = malloc(data.length); - - try { - buffer.asTypedList(data.length).setAll(0, data); - bindings.session_channel_send(_handle, buffer, data.length); - } finally { - malloc.free(buffer); - } - } - - Future _receive() async { - await for (final bytes in _stream) { - if (bytes.length < 8) { - continue; - } - - final id = bytes.buffer.asByteData().getUint64(0); - final message = deserialize(bytes.sublist(8)); - - // DEBUG - //print('recv: id: $id, message: $message'); - - if (message is! Map) { - continue; + if (sink == null) { + print('unsolicited notification'); + return; } - final isSuccess = message.containsKey('success'); - final isFailure = message.containsKey('failure'); - final isNotification = message.containsKey('notification'); - - if (isSuccess || isFailure) { - final responseCompleter = _responses.remove(id); - if (responseCompleter == null) { - print('unsolicited response'); - continue; - } - - if (isSuccess) { - _handleResponseSuccess(responseCompleter, message['success']); - } else if (isFailure) { - _handleResponseFailure(responseCompleter, message['failure']); - } - } else if (isNotification) { - final subscription = _subscriptions[id]; - if (subscription == null) { - print('unsolicited notification'); - continue; - } - - _handleNotification(subscription, message['notification']); + if (payload is String) { + sink.add(null); + } else if (payload is Map && payload.length == 1) { + sink.add(payload.entries.single.value); } else { - final responseCompleter = _responses.remove(id); - if (responseCompleter != null) { - _handleInvalidResponse(responseCompleter); - } + final error = Exception('invalid notification'); + sink.addError(error); } } - } - - void _handleResponseSuccess(Completer completer, Object? payload) { - if (payload == "none") { - completer.complete(null); - return; - } - - if (payload is Map && payload.length == 1) { - completer.complete(payload.entries.single.value); - } else { - _handleInvalidResponse(completer); - } - } - - void _handleResponseFailure(Completer completer, Object? payload) { - if (payload is! List) { - _handleInvalidResponse(completer); - return; - } - - final code = payload[0]; - final message = payload[1]; - - if (code is! int || message is! String) { - _handleInvalidResponse(completer); - return; - } - - final error = Error(ErrorCode.decode(code), message); - completer.completeError(error); - } - - void _handleInvalidResponse(Completer completer) { - final error = Exception('invalid response'); - completer.completeError(error); - } - - void _handleNotification(StreamSink sink, Object? payload) { - if (payload is String) { - sink.add(null); - } else if (payload is Map && payload.length == 1) { - sink.add(payload.entries.single.value); - } else { - final error = Exception('invalid notification'); - sink.addError(error); - } - } - - @override - Subscription subscribe(String name, Object? arg) { - return _DirectClientSubscription(this, name, arg); - } - - Future copyToRawFd(int fileHandle, int fd) { - return _invoke_native_async( - (port) => bindings.file_copy_to_raw_fd( - handle, - fileHandle, - fd, - NativeApi.postCObject, - port, - ), - ); - } -} - -abstract class Subscription { - Stream get stream; - Future close(); } -class _DirectClientSubscription extends Subscription { - final DirectClient _client; +class Subscription { + final Client _client; final StreamController _controller; final String _name; final Object? _arg; int _id = 0; _SubscriptionState _state = _SubscriptionState.idle; - _DirectClientSubscription(this._client, this._name, this._arg) + Subscription(this._client, this._name, this._arg) : _controller = StreamController.broadcast() { _controller.onListen = () => _switch(_SubscriptionState.subscribing); _controller.onCancel = () => _switch(_SubscriptionState.unsubscribing); @@ -303,7 +104,7 @@ class _DirectClientSubscription extends Subscription { return; } - _client._subscriptions[_id] = _controller.sink; + _client.subscriptions()._sinks[_id] = _controller.sink; } catch (e) { print('failed to subscribe to $_name: $e'); } @@ -314,7 +115,7 @@ class _DirectClientSubscription extends Subscription { return; } - _client._subscriptions.remove(_id); + _client.subscriptions()._sinks.remove(_id); try { await _client.invoke('unsubscribe', _id); @@ -332,29 +133,3 @@ enum _SubscriptionState { unsubscribing, } -// Helper to invoke a native async function. -Future _invoke_native_async(void Function(int) fun) async { - final recvPort = ReceivePort(); - - try { - fun(recvPort.sendPort.nativePort); - - final bytes = await recvPort.cast().first; - - if (bytes.isEmpty) { - return; - } - - final code = ErrorCode.decode(bytes.buffer.asByteData().getUint16(0)); - final message = utf8.decode(bytes.sublist(2)); - - if (code == ErrorCode.ok) { - return; - } else { - throw Error(code, message); - } - } finally { - recvPort.close(); - } -} - diff --git a/bindings/dart/lib/internal/channel.dart b/bindings/dart/lib/internal/channel.dart new file mode 100644 index 00000000..0f3a15d9 --- /dev/null +++ b/bindings/dart/lib/internal/channel.dart @@ -0,0 +1,4 @@ + +//class Channel { +// send(ByteData message) -> Future +//} diff --git a/bindings/dart/lib/internal/channel_client.dart b/bindings/dart/lib/internal/channel_client.dart new file mode 100644 index 00000000..e349b538 --- /dev/null +++ b/bindings/dart/lib/internal/channel_client.dart @@ -0,0 +1,51 @@ +import 'package:flutter/services.dart'; +import '../client.dart'; + +class ChannelClient extends Client { + final MethodChannel _channel; + final _subscriptions = Subscriptions(); + + ChannelClient(String channelName) : _channel = MethodChannel(channelName) { + _channel.setMethodCallHandler(_handleMethodCall); + } + + @override + Future invoke(String method, [Object? args]) async { + return await _channel.invokeMethod(method, args); + } + + @override + Future close() async { + _channel.setMethodCallHandler(null); + } + + Future _handleMethodCall(MethodCall call) async { + if (call.method != "notify") { + throw PlatformException(code: "error", message: "Unsupported method: ${call.method}"); + } + + final args = call.arguments; + + if (args !is List) { + throw PlatformException(code: "error", message: "Invalid arguments"); + } + + if (args[0] !is int) { + throw PlatformException(code: "error", message: "First argument must be subsciption ID"); + } + + final subscriptionId = args[0]; + + if (args[1] !is Object?) { + throw PlatformException(code: "error", message: "Second argument must be of type Object?"); + } + + final payload = args[1]; + + _subscriptions.handle(subscriptionId, payload); + + return null; + } + + Subscriptions subscriptions() => _subscriptions; +} diff --git a/bindings/dart/lib/internal/direct_client.dart b/bindings/dart/lib/internal/direct_client.dart new file mode 100644 index 00000000..1fd76a35 --- /dev/null +++ b/bindings/dart/lib/internal/direct_client.dart @@ -0,0 +1,227 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; +import 'dart:ffi'; +import 'dart:isolate'; +import 'dart:typed_data'; + +import 'package:ffi/ffi.dart'; +import 'package:msgpack_dart/msgpack_dart.dart'; + +import '../client.dart'; +import '../bindings.dart'; +import '../ouisync.dart' show Error; + +/// Client to interface with ouisync running in the same process as the FlutterEngine. +/// Ouisync backend (Rust) function invokations are done through FFI. +class DirectClient extends Client { + int _handle; + final Stream _stream; + var _nextMessageId = 0; + final _responses = HashMap>(); + final _subscriptions = Subscriptions(); + + DirectClient(this._handle, ReceivePort port) : _stream = port.cast(), super() { + unawaited(_receive()); + } + + int get handle => _handle; + + @override + Future invoke(String method, [Object? args]) async { + final id = _nextMessageId++; + final completer = Completer(); + + _responses[id] = completer; + + final request = {method: args}; + + // DEBUG + //print('send: id: $id, request: $request'); + + try { + // Message format: + // + // +-------------------------------------+-------------------------------------------+ + // | id (big endian 64 bit unsigned int) | request (messagepack encoded byte string) | + // +-------------------------------------+-------------------------------------------+ + // + // This allows the server to decode the id even if the request is malformed so it can send + // error response back. + final message = (BytesBuilder() + ..add((ByteData(8)..setUint64(0, id)).buffer.asUint8List()) + ..add(serialize(request))) + .takeBytes(); + + _send(message); + + return await completer.future as T; + } finally { + _responses.remove(id); + } + } + + @override + Future close() async { + final handle = _handle; + _handle = 0; + + if (handle == 0) { + return; + } + + await _invoke_native_async( + (port) => bindings.session_close( + handle, + NativeApi.postCObject, + port, + ), + ); + } + + void closeSync() { + final handle = _handle; + _handle = 0; + + if (handle == 0) { + return; + } + + bindings.session_close_blocking(handle); + } + + void _send(Uint8List data) { + if (_handle == 0) { + throw StateError('session has been closed'); + } + + // TODO: is there a way to do this without having to allocate whole new buffer? + var buffer = malloc(data.length); + + try { + buffer.asTypedList(data.length).setAll(0, data); + bindings.session_channel_send(_handle, buffer, data.length); + } finally { + malloc.free(buffer); + } + } + + Future _receive() async { + await for (final bytes in _stream) { + if (bytes.length < 8) { + continue; + } + + final id = bytes.buffer.asByteData().getUint64(0); + final message = deserialize(bytes.sublist(8)); + + // DEBUG + //print('recv: id: $id, message: $message'); + + if (message is! Map) { + continue; + } + + final isSuccess = message.containsKey('success'); + final isFailure = message.containsKey('failure'); + final isNotification = message.containsKey('notification'); + + if (isSuccess || isFailure) { + final responseCompleter = _responses.remove(id); + if (responseCompleter == null) { + print('unsolicited response'); + continue; + } + + if (isSuccess) { + _handleResponseSuccess(responseCompleter, message['success']); + } else if (isFailure) { + _handleResponseFailure(responseCompleter, message['failure']); + } + } else if (isNotification) { + _subscriptions.handle(id, message['notification']); + } else { + final responseCompleter = _responses.remove(id); + if (responseCompleter != null) { + _handleInvalidResponse(responseCompleter); + } + } + } + } + + void _handleResponseSuccess(Completer completer, Object? payload) { + if (payload == "none") { + completer.complete(null); + return; + } + + if (payload is Map && payload.length == 1) { + completer.complete(payload.entries.single.value); + } else { + _handleInvalidResponse(completer); + } + } + + void _handleResponseFailure(Completer completer, Object? payload) { + if (payload is! List) { + _handleInvalidResponse(completer); + return; + } + + final code = payload[0]; + final message = payload[1]; + + if (code is! int || message is! String) { + _handleInvalidResponse(completer); + return; + } + + final error = Error(ErrorCode.decode(code), message); + completer.completeError(error); + } + + void _handleInvalidResponse(Completer completer) { + final error = Exception('invalid response'); + completer.completeError(error); + } + + Future copyToRawFd(int fileHandle, int fd) { + return _invoke_native_async( + (port) => bindings.file_copy_to_raw_fd( + handle, + fileHandle, + fd, + NativeApi.postCObject, + port, + ), + ); + } + + Subscriptions subscriptions() => _subscriptions; +} + +// Helper to invoke a native async function. +Future _invoke_native_async(void Function(int) fun) async { + final recvPort = ReceivePort(); + + try { + fun(recvPort.sendPort.nativePort); + + final bytes = await recvPort.cast().first; + + if (bytes.isEmpty) { + return; + } + + final code = ErrorCode.decode(bytes.buffer.asByteData().getUint16(0)); + final message = utf8.decode(bytes.sublist(2)); + + if (code == ErrorCode.ok) { + return; + } else { + throw Error(code, message); + } + } finally { + recvPort.close(); + } +} diff --git a/bindings/dart/lib/ouisync.dart b/bindings/dart/lib/ouisync.dart index 010077b2..f75ce0d4 100644 --- a/bindings/dart/lib/ouisync.dart +++ b/bindings/dart/lib/ouisync.dart @@ -36,7 +36,7 @@ class Session { String? _mountPoint; Session._(this._client) - : _networkSubscription = _client.subscribe("network", null); + : _networkSubscription = Subscription(_client, "network", null); /// Creates a new session in this process. /// [configPath] is a path to a directory where configuration files shall be stored. If it @@ -303,7 +303,7 @@ class Repository { final Subscription _subscription; Repository._(this._client, this._handle, this._store) - : _subscription = _client.subscribe("repository", _handle); + : _subscription = Subscription(_client, "repository", _handle); /// Creates a new repository and set access to it based on the following table: /// diff --git a/bindings/dart/lib/state_monitor.dart b/bindings/dart/lib/state_monitor.dart index e19c0af6..4a9dccc4 100644 --- a/bindings/dart/lib/state_monitor.dart +++ b/bindings/dart/lib/state_monitor.dart @@ -109,7 +109,7 @@ class StateMonitor { StateMonitor._(_client, [..._path, childId]); Subscription subscribe() => - _client.subscribe("state_monitor", _path.map((id) => id.toString())); + Subscription(_client, "state_monitor", _path.map((id) => id.toString())); @override String toString() => "StateMonitor($_path)"; From c6052537f9b3ff75901cbd19f3e15d70b4ad4c03 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Thu, 18 Jul 2024 17:48:19 +0200 Subject: [PATCH 03/14] wip --- bindings/dart/lib/internal/channel_client.dart | 6 +++++- bindings/dart/lib/ouisync.dart | 10 ++++++++++ bindings/dart/macos/Classes/OuisyncPlugin.swift | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/bindings/dart/lib/internal/channel_client.dart b/bindings/dart/lib/internal/channel_client.dart index e349b538..d706a540 100644 --- a/bindings/dart/lib/internal/channel_client.dart +++ b/bindings/dart/lib/internal/channel_client.dart @@ -9,9 +9,13 @@ class ChannelClient extends Client { _channel.setMethodCallHandler(_handleMethodCall); } + Future initialize() async { + await _channel.invokeMethod("initialize"); + } + @override Future invoke(String method, [Object? args]) async { - return await _channel.invokeMethod(method, args); + return await _channel.invokeMethod("invoke", [method, args]); } @override diff --git a/bindings/dart/lib/ouisync.dart b/bindings/dart/lib/ouisync.dart index f75ce0d4..28d711bb 100644 --- a/bindings/dart/lib/ouisync.dart +++ b/bindings/dart/lib/ouisync.dart @@ -76,6 +76,16 @@ class Session { return Session._(client); } + // Creates a new session which forwards calls to Ouisync backend running in the + // native code. + // [channelName] is the name of the MethodChannel to be used, equally named channel + // must be created and set up to listen to the commands in the native code. + static Future createChanneled(String channelName) async { + final client = ChannelClient(channelName); + await client.initialize(); + return Session._(client); + } + String? get mountPoint => _mountPoint; // Mount all repositories that are open now or in future in read or diff --git a/bindings/dart/macos/Classes/OuisyncPlugin.swift b/bindings/dart/macos/Classes/OuisyncPlugin.swift index a8a873a7..6da542c6 100644 --- a/bindings/dart/macos/Classes/OuisyncPlugin.swift +++ b/bindings/dart/macos/Classes/OuisyncPlugin.swift @@ -3,6 +3,7 @@ import FlutterMacOS public class OuisyncPlugin: NSObject, FlutterPlugin { public static func register(with registrar: FlutterPluginRegistrar) { + let channel = FlutterMethodChannel(name: "ouisync", binaryMessenger: registrar.messenger) let instance = OuisyncPlugin() registrar.addMethodCallDelegate(instance, channel: channel) From 4a48a81651a57e9d0c64588dd93e107ebdcd79fd Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Fri, 19 Jul 2024 12:56:27 +0200 Subject: [PATCH 04/14] WIP: Got basic communication running between flutter and Swift --- .../dart/lib/internal/channel_client.dart | 84 ++++------ bindings/dart/lib/internal/direct_client.dart | 156 ++++-------------- .../dart/lib/internal/message_matcher.dart | 140 ++++++++++++++++ .../Sources/OuisyncLib/OuisyncMessage.swift | 84 ++++++++-- .../Sources/OuisyncLib/OuisyncSession.swift | 10 +- 5 files changed, 278 insertions(+), 196 deletions(-) create mode 100644 bindings/dart/lib/internal/message_matcher.dart diff --git a/bindings/dart/lib/internal/channel_client.dart b/bindings/dart/lib/internal/channel_client.dart index d706a540..868de535 100644 --- a/bindings/dart/lib/internal/channel_client.dart +++ b/bindings/dart/lib/internal/channel_client.dart @@ -1,55 +1,41 @@ import 'package:flutter/services.dart'; import '../client.dart'; +import 'message_matcher.dart'; class ChannelClient extends Client { - final MethodChannel _channel; - final _subscriptions = Subscriptions(); + final MethodChannel _channel; + bool _isClosed = false; + final _messageMatcher = MessageMatcher(); + + ChannelClient(String channelName) : _channel = MethodChannel(channelName) { + _channel.setMethodCallHandler(_handleMethodCall); + } + + Future initialize() async { + await _messageMatcher.sendAndAwaitResponse("", {}, (Uint8List message) async { + await _channel.invokeMethod("initialize", message); + }); + } + + @override + Future invoke(String method, [Object? args]) async { + return await _messageMatcher.sendAndAwaitResponse(method, args, (Uint8List message) async { + await _channel.invokeMethod("invoke", message); + }); + } + + @override + Future close() async { + _channel.setMethodCallHandler(null); + _messageMatcher.close(); + } + + Future _handleMethodCall(MethodCall call) async { + final args = (call.arguments as List).cast(); + _messageMatcher.handleResponse(Uint8List.fromList(args)); + return null; + } + + Subscriptions subscriptions() => _messageMatcher.subscriptions(); - ChannelClient(String channelName) : _channel = MethodChannel(channelName) { - _channel.setMethodCallHandler(_handleMethodCall); - } - - Future initialize() async { - await _channel.invokeMethod("initialize"); - } - - @override - Future invoke(String method, [Object? args]) async { - return await _channel.invokeMethod("invoke", [method, args]); - } - - @override - Future close() async { - _channel.setMethodCallHandler(null); - } - - Future _handleMethodCall(MethodCall call) async { - if (call.method != "notify") { - throw PlatformException(code: "error", message: "Unsupported method: ${call.method}"); - } - - final args = call.arguments; - - if (args !is List) { - throw PlatformException(code: "error", message: "Invalid arguments"); - } - - if (args[0] !is int) { - throw PlatformException(code: "error", message: "First argument must be subsciption ID"); - } - - final subscriptionId = args[0]; - - if (args[1] !is Object?) { - throw PlatformException(code: "error", message: "Second argument must be of type Object?"); - } - - final payload = args[1]; - - _subscriptions.handle(subscriptionId, payload); - - return null; - } - - Subscriptions subscriptions() => _subscriptions; } diff --git a/bindings/dart/lib/internal/direct_client.dart b/bindings/dart/lib/internal/direct_client.dart index 1fd76a35..9e2a0f1c 100644 --- a/bindings/dart/lib/internal/direct_client.dart +++ b/bindings/dart/lib/internal/direct_client.dart @@ -6,7 +6,8 @@ import 'dart:isolate'; import 'dart:typed_data'; import 'package:ffi/ffi.dart'; -import 'package:msgpack_dart/msgpack_dart.dart'; + +import 'message_matcher.dart'; import '../client.dart'; import '../bindings.dart'; @@ -17,9 +18,7 @@ import '../ouisync.dart' show Error; class DirectClient extends Client { int _handle; final Stream _stream; - var _nextMessageId = 0; - final _responses = HashMap>(); - final _subscriptions = Subscriptions(); + final MessageMatcher _messageMatcher = MessageMatcher(); DirectClient(this._handle, ReceivePort port) : _stream = port.cast(), super() { unawaited(_receive()); @@ -29,36 +28,15 @@ class DirectClient extends Client { @override Future invoke(String method, [Object? args]) async { - final id = _nextMessageId++; - final completer = Completer(); - - _responses[id] = completer; - - final request = {method: args}; - - // DEBUG - //print('send: id: $id, request: $request'); - - try { - // Message format: - // - // +-------------------------------------+-------------------------------------------+ - // | id (big endian 64 bit unsigned int) | request (messagepack encoded byte string) | - // +-------------------------------------+-------------------------------------------+ - // - // This allows the server to decode the id even if the request is malformed so it can send - // error response back. - final message = (BytesBuilder() - ..add((ByteData(8)..setUint64(0, id)).buffer.asUint8List()) - ..add(serialize(request))) - .takeBytes(); - - _send(message); - - return await completer.future as T; - } finally { - _responses.remove(id); - } + // NOTE: Async is used for sender because that's what the MessageMatcher + // expects, and the MessageMatcher expects it because sending over + // ChannelClient is async. In fact, the rust implementation uses an + // umbounded buffer inside the `session_channel_send`, so it might be + // better to convert that into a bounded buffer and make the + // `session_channel_send` async as well. + return await _messageMatcher.sendAndAwaitResponse(method, args, (Uint8List message) async { + _send(message); + }); } @override @@ -70,6 +48,8 @@ class DirectClient extends Client { return; } + _messageMatcher.close(); + await _invoke_native_async( (port) => bindings.session_close( handle, @@ -87,102 +67,15 @@ class DirectClient extends Client { return; } - bindings.session_close_blocking(handle); - } - - void _send(Uint8List data) { - if (_handle == 0) { - throw StateError('session has been closed'); - } + _messageMatcher.close(); - // TODO: is there a way to do this without having to allocate whole new buffer? - var buffer = malloc(data.length); - - try { - buffer.asTypedList(data.length).setAll(0, data); - bindings.session_channel_send(_handle, buffer, data.length); - } finally { - malloc.free(buffer); - } + bindings.session_close_blocking(handle); } Future _receive() async { await for (final bytes in _stream) { - if (bytes.length < 8) { - continue; - } - - final id = bytes.buffer.asByteData().getUint64(0); - final message = deserialize(bytes.sublist(8)); - - // DEBUG - //print('recv: id: $id, message: $message'); - - if (message is! Map) { - continue; - } - - final isSuccess = message.containsKey('success'); - final isFailure = message.containsKey('failure'); - final isNotification = message.containsKey('notification'); - - if (isSuccess || isFailure) { - final responseCompleter = _responses.remove(id); - if (responseCompleter == null) { - print('unsolicited response'); - continue; - } - - if (isSuccess) { - _handleResponseSuccess(responseCompleter, message['success']); - } else if (isFailure) { - _handleResponseFailure(responseCompleter, message['failure']); - } - } else if (isNotification) { - _subscriptions.handle(id, message['notification']); - } else { - final responseCompleter = _responses.remove(id); - if (responseCompleter != null) { - _handleInvalidResponse(responseCompleter); - } - } - } - } - - void _handleResponseSuccess(Completer completer, Object? payload) { - if (payload == "none") { - completer.complete(null); - return; - } - - if (payload is Map && payload.length == 1) { - completer.complete(payload.entries.single.value); - } else { - _handleInvalidResponse(completer); - } - } - - void _handleResponseFailure(Completer completer, Object? payload) { - if (payload is! List) { - _handleInvalidResponse(completer); - return; + _messageMatcher.handleResponse(bytes); } - - final code = payload[0]; - final message = payload[1]; - - if (code is! int || message is! String) { - _handleInvalidResponse(completer); - return; - } - - final error = Error(ErrorCode.decode(code), message); - completer.completeError(error); - } - - void _handleInvalidResponse(Completer completer) { - final error = Exception('invalid response'); - completer.completeError(error); } Future copyToRawFd(int fileHandle, int fd) { @@ -197,7 +90,20 @@ class DirectClient extends Client { ); } - Subscriptions subscriptions() => _subscriptions; + Subscriptions subscriptions() => _messageMatcher.subscriptions(); + + void _send(Uint8List data) { + // TODO: is there a way to do this without having to allocate whole new buffer? + var buffer = malloc(data.length); + + try { + buffer.asTypedList(data.length).setAll(0, data); + bindings.session_channel_send(_handle, buffer, data.length); + } finally { + malloc.free(buffer); + } + } + } // Helper to invoke a native async function. diff --git a/bindings/dart/lib/internal/message_matcher.dart b/bindings/dart/lib/internal/message_matcher.dart new file mode 100644 index 00000000..54f273a5 --- /dev/null +++ b/bindings/dart/lib/internal/message_matcher.dart @@ -0,0 +1,140 @@ +import 'dart:async'; +import 'dart:typed_data'; +import 'dart:collection'; +import 'package:msgpack_dart/msgpack_dart.dart'; + +import '../client.dart'; +import '../ouisync.dart' show Error; +import '../bindings.dart' show ErrorCode; + +typedef Sender = Future Function(Uint8List); + +// Matches requests to responses. +class MessageMatcher { + var _nextMessageId = 0; + final _responses = HashMap>(); + final _subscriptions = Subscriptions(); + bool _isClosed = false; + + Subscriptions subscriptions() => _subscriptions; + + // Throws if the response is an error or if sending the message fails. + Future sendAndAwaitResponse(String method, Object? args, Sender send) async { + final id = _nextMessageId++; + final completer = Completer(); + + _responses[id] = completer; + + final request = {method: args}; + + // DEBUG + //print('send: id: $id, request: $request'); + + try { + // Message format: + // + // +-------------------------------------+-------------------------------------------+ + // | id (big endian 64 bit unsigned int) | request (messagepack encoded byte string) | + // +-------------------------------------+-------------------------------------------+ + // + // This allows the server to decode the id even if the request is malformed so it can send + // error response back. + final message = (BytesBuilder() + ..add((ByteData(8)..setUint64(0, id)).buffer.asUint8List()) + ..add(serialize(request))) + .takeBytes(); + + if (_isClosed) { + throw StateError('session has been closed'); + } + + await send(message); + + return await completer.future as T; + } finally { + _responses.remove(id); + } + } + + void handleResponse(Uint8List bytes) { + if (bytes.length < 8) { + return; + } + + final id = bytes.buffer.asByteData().getUint64(0); + final message = deserialize(bytes.sublist(8)); + + // DEBUG + //print('recv: id: $id, message: $message'); + + if (message is! Map) { + return; + } + + final isSuccess = message.containsKey('success'); + final isFailure = message.containsKey('failure'); + final isNotification = message.containsKey('notification'); + + if (isSuccess || isFailure) { + final responseCompleter = _responses.remove(id); + if (responseCompleter == null) { + print('unsolicited response'); + return; + } + + if (isSuccess) { + _handleResponseSuccess(responseCompleter, message['success']); + } else if (isFailure) { + _handleResponseFailure(responseCompleter, message['failure']); + } + } else if (isNotification) { + _subscriptions.handle(id, message['notification']); + } else { + final responseCompleter = _responses.remove(id); + if (responseCompleter != null) { + _handleInvalidResponse(responseCompleter); + } + } + } + + void close() { + _isClosed = true; + // TODO: Complete completers with an error + } + + void _handleResponseSuccess(Completer completer, Object? payload) { + if (payload == "none") { + completer.complete(null); + return; + } + + if (payload is Map && payload.length == 1) { + completer.complete(payload.entries.single.value); + } else { + _handleInvalidResponse(completer); + } + } + + void _handleResponseFailure(Completer completer, Object? payload) { + if (payload is! List) { + _handleInvalidResponse(completer); + return; + } + + final code = payload[0]; + final message = payload[1]; + + if (code is! int || message is! String) { + _handleInvalidResponse(completer); + return; + } + + final error = Error(ErrorCode.decode(code), message); + completer.completeError(error); + } + + void _handleInvalidResponse(Completer completer) { + final error = Exception('invalid response'); + completer.completeError(error); + } +} diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift index cfa73d7c..d6590dc8 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift @@ -141,29 +141,72 @@ public class MessageRequest { //-------------------------------------------------------------------- -public class IncomingMessage { +public class OutgoingMessage { // TODO: Rename to RequestMessage public let messageId: MessageId - public let payload: IncomingPayload + public let request: MessageRequest - init(_ messageId: MessageId, _ payload: IncomingPayload) { + init(_ messageId: MessageId, _ request: MessageRequest) { self.messageId = messageId - self.payload = payload + self.request = request } - public static func deserialize(_ data: [UInt8]) -> IncomingMessage? { - let idByteCount = (MessageId.bitWidth / UInt8.bitWidth) + public func serialize() -> [UInt8] { + var message: [UInt8] = [] + message.append(contentsOf: withUnsafeBytes(of: messageId.bigEndian, Array.init)) + let payload = [MessagePackValue.string(request.functionName): request.functionArguments] + message.append(contentsOf: pack(MessagePackValue.map(payload))) + return message + } - if data.count < idByteCount { + public static func deserialize(_ data: [UInt8]) -> OutgoingMessage? { + guard let (id, data) = readMessageId(data) else { return nil } - let bigEndianValue = data.withUnsafeBufferPointer { - ($0.baseAddress!.withMemoryRebound(to: MessageId.self, capacity: 1) { $0 }) - }.pointee + let unpacked = (try? unpack(data))?.0 + + guard case let .map(m) = unpacked else { return nil } + if m.count != 1 { return nil } + guard let e = m.first else { return nil } + guard let functionName = e.key.stringValue else { return nil } + let functionArguments = e.value - let id = MessageId(bigEndian: bigEndianValue) + return OutgoingMessage(id, MessageRequest(functionName, functionArguments)) + } +} - let unpacked = (try? unpack(Data(data[idByteCount...])))?.0 +public class IncomingMessage { // TODO: Rename to ResponseMessage + public let messageId: MessageId + public let payload: IncomingPayload + + public init(_ messageId: MessageId, _ payload: IncomingPayload) { + self.messageId = messageId + self.payload = payload + } + + public func serialize() -> [UInt8] { + var message: [UInt8] = [] + message.append(contentsOf: withUnsafeBytes(of: messageId.bigEndian, Array.init)) + let body: MessagePackValue; + switch payload { + case .response(let response): + body = MessagePackValue.map(["success": response.value]) + case .notification(let notification): + body = MessagePackValue.map(["notification": notification.value]) + case .error(let error): + let code = Int64(exactly: error.code.rawValue)! + body = MessagePackValue.map(["failure": .array([.int(code), .string(error.message)])]) + } + message.append(contentsOf: pack(body)) + return message + } + + public static func deserialize(_ bytes: [UInt8]) -> IncomingMessage? { + guard let (id, data) = readMessageId(bytes) else { + return nil + } + + let unpacked = (try? unpack(Data(data)))?.0 if case let .map(m) = unpacked { if let success = m[.string("success")] { @@ -191,6 +234,21 @@ extension IncomingMessage: CustomStringConvertible { } } +fileprivate func readMessageId(_ data: [UInt8]) -> (MessageId, Data)? { + let idByteCount = (MessageId.bitWidth / UInt8.bitWidth) + + if data.count < idByteCount { + return nil + } + + let bigEndianValue = data.withUnsafeBufferPointer { + ($0.baseAddress!.withMemoryRebound(to: MessageId.self, capacity: 1) { $0 }) + }.pointee + + let id = MessageId(bigEndian: bigEndianValue) + + return (id, Data(data[idByteCount...])) +} //-------------------------------------------------------------------- public enum IncomingPayload { @@ -240,7 +298,7 @@ public class Response { // the actual types differ, then it is likely that there is a // mismatch between the front end and the backend in the FFI API. - init(_ value: MessagePackValue) { + public init(_ value: MessagePackValue) { self.value = value } diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift index 169ebf26..798d2697 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift @@ -43,21 +43,13 @@ public class OuisyncSession { synchronized(session) { session.pendingResponses[messageId] = continuation - session.sendDataToOuisyncLib(Self.serialize(messageId, request)); + session.sendDataToOuisyncLib(OutgoingMessage(messageId, request).serialize()); } } return try await onResponse } - fileprivate static func serialize(_ messageId: MessageId, _ request: MessageRequest) -> [UInt8] { - var message: [UInt8] = [] - message.append(contentsOf: withUnsafeBytes(of: messageId.bigEndian, Array.init)) - let payload = [MessagePackValue.string(request.functionName): request.functionArguments] - message.append(contentsOf: pack(MessagePackValue.map(payload))) - return message - } - fileprivate func generateMessageId() -> MessageId { synchronized(self) { let messageId = nextMessageId From 5a17357582f1f1c1c4fd602e060f8bd4eb85605c Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Tue, 23 Jul 2024 08:49:35 +0200 Subject: [PATCH 05/14] WIP: Setup Ouisync backend in the extension --- .../dart/lib/internal/message_matcher.dart | 3 +- bindings/swift/OuisyncLib/Package.swift | 6 +- .../Sources/OuisyncLib/OuisyncFFI.swift | 140 ++++++++++++++++++ .../Sources/OuisyncLib/OuisyncMessage.swift | 12 +- .../Sources/OuisyncLib/OuisyncSession.swift | 55 ++++++- .../OuisyncLibFFI/include/OuisyncFFI.h | 23 +++ .../Sources/OuisyncLibFFI/ouisyncffi.c | 1 + 7 files changed, 231 insertions(+), 9 deletions(-) create mode 100644 bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift create mode 100644 bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h create mode 100644 bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/ouisyncffi.c diff --git a/bindings/dart/lib/internal/message_matcher.dart b/bindings/dart/lib/internal/message_matcher.dart index 54f273a5..ddcef61e 100644 --- a/bindings/dart/lib/internal/message_matcher.dart +++ b/bindings/dart/lib/internal/message_matcher.dart @@ -65,7 +65,7 @@ class MessageMatcher { final message = deserialize(bytes.sublist(8)); // DEBUG - //print('recv: id: $id, message: $message'); + print('recv: id: $id, message: $message'); if (message is! Map) { return; @@ -103,6 +103,7 @@ class MessageMatcher { } void _handleResponseSuccess(Completer completer, Object? payload) { + print(">>>>>>>>>>>>>>>>> received $payload"); if (payload == "none") { completer.complete(null); return; diff --git a/bindings/swift/OuisyncLib/Package.swift b/bindings/swift/OuisyncLib/Package.swift index 4e9d0d70..5f26dd94 100644 --- a/bindings/swift/OuisyncLib/Package.swift +++ b/bindings/swift/OuisyncLib/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version: 5.8 +// swift-tools-version: 5.9 // The swift-tools-version declares the minimum version of Swift required to build this package. import PackageDescription @@ -20,7 +20,9 @@ let package = Package( // Targets can depend on other targets in this package and products from dependencies. .target( name: "OuisyncLib", - dependencies: [.product(name:"MessagePack", package: "MessagePack.swift")]), + dependencies: [.product(name:"MessagePack", package: "MessagePack.swift"), "OuisyncLibFFI"] + ), + .target(name: "OuisyncLibFFI", dependencies: []), .testTarget( name: "OuisyncLibTests", dependencies: ["OuisyncLib"]), diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift new file mode 100644 index 00000000..69ad065f --- /dev/null +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift @@ -0,0 +1,140 @@ +// +// File.swift +// +// +// Created by Peter Jankuliak on 19/07/2024. +// + +import Foundation +import OuisyncLibFFI + +typealias FFISessionKind = UInt8 +typealias FFIContext = UnsafeRawPointer +typealias FFICallback = @convention(c) (FFIContext?, UnsafePointer, CUnsignedLongLong) -> Void; +typealias FFISessionCreate = @convention(c) (FFISessionKind, UnsafePointer, UnsafePointer, UnsafeRawPointer?, FFICallback) -> SessionCreateResult; +typealias FFISessionGrab = @convention(c) (UnsafeRawPointer?, FFICallback) -> SessionCreateResult; +typealias FFISessionClose = @convention(c) (SessionHandle, FFIContext?, FFICallback) -> Void; +typealias FFISessionChannelSend = @convention(c) (SessionHandle, UnsafeRawPointer, UInt64) -> Void; + +class SessionCreateError : Error, CustomStringConvertible { + let message: String + init(_ message: String) { self.message = message } + var description: String { message } +} + +public class OuisyncFFI { + let handle: UnsafeMutableRawPointer + let ffiSessionGrab: FFISessionGrab + let ffiSessionCreate: FFISessionCreate + let ffiSessionChannelSend: FFISessionChannelSend + let ffiSessionClose: FFISessionClose + let sessionKindShared: FFISessionKind = 0; + + public init() { + handle = dlopen("libouisync_ffi.dylib", RTLD_NOW)! + ffiSessionGrab = unsafeBitCast(dlsym(handle, "session_grab"), to: FFISessionGrab.self) + ffiSessionChannelSend = unsafeBitCast(dlsym(handle, "session_channel_send"), to: FFISessionChannelSend.self) + ffiSessionClose = unsafeBitCast(dlsym(handle, "session_close"), to: FFISessionClose.self) + ffiSessionCreate = unsafeBitCast(dlsym(handle, "session_create"), to: FFISessionCreate.self) + } + + // Blocks until Dart creates a session, then returns it. + func waitForSession(_ context: UnsafeRawPointer, _ callback: FFICallback) async throws -> SessionHandle { + // TODO: Might be worth change the ffi function to call a callback when the session becomes created instead of bussy sleeping. + var elapsed: UInt64 = 0; + while true { + let result = ffiSessionGrab(context, callback) + if result.errorCode == 0 { + NSLog("๐Ÿ˜€ Got Ouisync session"); + return result.session + } + NSLog("๐Ÿคจ Ouisync session not yet ready. Code: \(result.errorCode) Message:\(String(cString: result.errorMessage!))"); + + let millisecond: UInt64 = 1_000_000 + let second: UInt64 = 1000 * millisecond + + var timeout = 200 * millisecond + + if elapsed > 10 * second { + timeout = second + } + + try await Task.sleep(nanoseconds: timeout) + elapsed += timeout; + } + } + + func channelSend(_ session: SessionHandle, _ data: [UInt8]) { + let count = data.count; + data.withUnsafeBufferPointer({ maybePointer in + if let pointer = maybePointer.baseAddress { + ffiSessionChannelSend(session, pointer, UInt64(count)) + } + }) + } + + func closeSession(_ session: SessionHandle) async { + typealias C = CheckedContinuation + + class Context { + let session: SessionHandle + let continuation: C + init(_ session: SessionHandle, _ continuation: C) { + self.session = session + self.continuation = continuation + } + } + + await withCheckedContinuation(function: "FFI.closeSession", { continuation in + let context = Self.toRetainedPtr(obj: Context(session, continuation)) + let callback: FFICallback = { context, dataPointer, size in + let context: Context = OuisyncFFI.fromRetainedPtr(ptr: context!) + context.continuation.resume() + } + ffiSessionClose(session, context, callback) + }) + } + + // Retained pointers have their reference counter incremented by 1. + // https://stackoverflow.com/a/33310021/273348 + static func toUnretainedPtr(obj : T) -> UnsafeRawPointer { + return UnsafeRawPointer(Unmanaged.passUnretained(obj).toOpaque()) + } + + static func fromUnretainedPtr(ptr : UnsafeRawPointer) -> T { + return Unmanaged.fromOpaque(ptr).takeUnretainedValue() + } + + static func toRetainedPtr(obj : T) -> UnsafeRawPointer { + return UnsafeRawPointer(Unmanaged.passRetained(obj).toOpaque()) + } + + static func fromRetainedPtr(ptr : UnsafeRawPointer) -> T { + return Unmanaged.fromOpaque(ptr).takeRetainedValue() + } +} + +// --------------------------------------------------------------------------------------- + +typealias Rx = AsyncStream<[UInt8]> +typealias Tx = AsyncStream<[UInt8]>.Continuation + +class Wrap { + let inner: T + init(_ inner: T) { self.inner = inner } +} + +class Channel { + let rx: Rx + let tx: Tx + + init(_ rx: Rx, _ tx: Tx) { self.rx = rx; self.tx = tx } +} + +func makeStream() -> (Rx, Tx) { + var continuation: Rx.Continuation! + let stream = Rx() { continuation = $0 } + return (stream, continuation!) +} + +// --------------------------------------------------------------------------------------- diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift index d6590dc8..da156750 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift @@ -190,7 +190,7 @@ public class IncomingMessage { // TODO: Rename to ResponseMessage let body: MessagePackValue; switch payload { case .response(let response): - body = MessagePackValue.map(["success": response.value]) + body = MessagePackValue.map(["success": Self.responseValue(response.value)]) case .notification(let notification): body = MessagePackValue.map(["notification": notification.value]) case .error(let error): @@ -201,6 +201,16 @@ public class IncomingMessage { // TODO: Rename to ResponseMessage return message } + static func responseValue(_ value: MessagePackValue) -> MessagePackValue { + switch value { + case .nil: return .string("none") + default: + // The flutter code doesn't read the key which is supposed to be a type, + // would still be nice to have a proper mapping. + return .map(["todo-type": value]) + } + } + public static func deserialize(_ bytes: [UInt8]) -> IncomingMessage? { guard let (id, data) = readMessageId(bytes) else { return nil diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift index 798d2697..741f9a65 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift @@ -6,17 +6,54 @@ import Foundation import MessagePack +import OuisyncLibFFI public class OuisyncSession { - // Used to send and receive messages from the Ouisync library - let librarySender: OuisyncLibrarySenderProtocol + var sessionHandle: SessionHandle + let ffi: OuisyncFFI var nextMessageId: MessageId = 0 var pendingResponses: [MessageId: CheckedContinuation] = [:] var notificationSubscriptions: NotificationStream.State = NotificationStream.State() - public init(_ libraryClient: OuisyncLibrarySenderProtocol) { - self.librarySender = libraryClient + fileprivate init(_ sessionHandle: SessionHandle, _ ffi: OuisyncFFI) { + self.sessionHandle = sessionHandle + self.ffi = ffi + } + + public static func create(_ configPath: String, _ logPath: String, _ ffi: OuisyncFFI) throws -> OuisyncSession { + let session = OuisyncSession(0, ffi) + + let callback: FFICallback = { context, dataPointer, size in + let session: OuisyncSession = OuisyncFFI.fromUnretainedPtr(ptr: context!) + let data = Array(UnsafeBufferPointer(start: dataPointer, count: Int(exactly: size)!)) + session.onReceiveDataFromOuisyncLib(data) + } + + let result = ffi.ffiSessionCreate(ffi.sessionKindShared, configPath, logPath, OuisyncFFI.toUnretainedPtr(obj: session), callback); + + if result.errorCode != 0 { + throw SessionCreateError("Failed to create session, code:\(result.errorCode), message:\(result.errorMessage!)") + } + + session.sessionHandle = result.session + return session + } + + // Can be called from a separate thread. + public func invoke(_ requestMsg: OutgoingMessage) async -> IncomingMessage { + let responsePayload: IncomingPayload + + NSLog("AAAAAA \(requestMsg.request.functionName)") + do { + responsePayload = .response(try await sendRequest(requestMsg.request)) + } catch let e as OuisyncError { + responsePayload = .error(e) + } catch let e { + fatalError("Unhandled exception in OuisyncSession.invoke: \(e)") + } + + return IncomingMessage(requestMsg.messageId, responsePayload) } public func listRepositories() async throws -> [OuisyncRepository] { @@ -35,6 +72,7 @@ public class OuisyncSession { return NotificationStream(subscriptionId, notificationSubscriptions) } + // Can be called from a separate thread. internal func sendRequest(_ request: MessageRequest) async throws -> Response { let messageId = generateMessageId() @@ -50,6 +88,7 @@ public class OuisyncSession { return try await onResponse } + // Can be called from a separate thread. fileprivate func generateMessageId() -> MessageId { synchronized(self) { let messageId = nextMessageId @@ -59,7 +98,13 @@ public class OuisyncSession { } fileprivate func sendDataToOuisyncLib(_ data: [UInt8]) { - librarySender.sendDataToOuisyncLib(data); + //librarySender.sendDataToOuisyncLib(data); + let count = data.count; + data.withUnsafeBufferPointer({ maybePointer in + if let pointer = maybePointer.baseAddress { + ffi.ffiSessionChannelSend(sessionHandle, pointer, UInt64(count)) + } + }) } // Use this function to pass data from the backend. diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h b/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h new file mode 100644 index 00000000..3bd735a0 --- /dev/null +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h @@ -0,0 +1,23 @@ +// +// FileProviderProxy.h +// Runner +// +// Created by Peter Jankuliak on 03/04/2024. +// + +#ifndef OuisyncFFI_h +#define OuisyncFFI_h + +#include + +typedef uint64_t SessionHandle; + +struct SessionCreateResult { + SessionHandle session; + uint16_t errorCode; + const char* errorMessage; +}; + +typedef struct SessionCreateResult SessionCreateResult; + +#endif /* OuisyncFFI_h */ diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/ouisyncffi.c b/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/ouisyncffi.c new file mode 100644 index 00000000..f27f0025 --- /dev/null +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/ouisyncffi.c @@ -0,0 +1 @@ +// Blank file because xcode complains if there is nothing to compile. From ce8f80802fe7ec5d70179424cfbc2edc163b92ca Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Tue, 23 Jul 2024 11:43:57 +0200 Subject: [PATCH 06/14] Rename message classes --- .../Sources/OuisyncLib/OuisyncEntry.swift | 4 +- .../Sources/OuisyncLib/OuisyncMessage.swift | 108 +++++++++--------- .../Sources/OuisyncLib/OuisyncSession.swift | 19 ++- 3 files changed, 65 insertions(+), 66 deletions(-) diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncEntry.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncEntry.swift index 020aacf1..1bbd2982 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncEntry.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncEntry.swift @@ -111,7 +111,7 @@ public class OuisyncDirectoryEntry: CustomDebugStringConvertible { } public func listEntries() async throws -> [OuisyncEntry] { - let response = try await repository.session.sendRequest(MessageRequest.listEntries(repository.handle, path)) + let response = try await repository.session.sendRequest(OuisyncRequest.listEntries(repository.handle, path)) let entries = response.value.arrayValue! return entries.map({entry in let name: String = entry[0]!.stringValue! @@ -138,7 +138,7 @@ public class OuisyncDirectoryEntry: CustomDebugStringConvertible { } public func exists() async throws -> Bool { - let response = try await repository.session.sendRequest(MessageRequest.directoryExists(repository.handle, path)) + let response = try await repository.session.sendRequest(OuisyncRequest.directoryExists(repository.handle, path)) return response.value.boolValue! } diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift index da156750..6aa8f45e 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncMessage.swift @@ -10,7 +10,7 @@ import System //-------------------------------------------------------------------- -public class MessageRequest { +public class OuisyncRequest { let functionName: String let functionArguments: MessagePackValue @@ -19,119 +19,119 @@ public class MessageRequest { self.functionArguments = functionArguments } - public static func listRepositories() -> MessageRequest { - return MessageRequest("list_repositories", MessagePackValue.nil) + public static func listRepositories() -> OuisyncRequest { + return OuisyncRequest("list_repositories", MessagePackValue.nil) } - public static func subscribeToRepositoryListChange() -> MessageRequest { - return MessageRequest("list_repositories_subscribe", MessagePackValue.nil) + public static func subscribeToRepositoryListChange() -> OuisyncRequest { + return OuisyncRequest("list_repositories_subscribe", MessagePackValue.nil) } - public static func subscribeToRepositoryChange(_ handle: RepositoryHandle) -> MessageRequest { - return MessageRequest("repository_subscribe", MessagePackValue(handle)) + public static func subscribeToRepositoryChange(_ handle: RepositoryHandle) -> OuisyncRequest { + return OuisyncRequest("repository_subscribe", MessagePackValue(handle)) } - public static func getRepositoryName(_ handle: RepositoryHandle) -> MessageRequest { - return MessageRequest("repository_name", MessagePackValue(handle)) + public static func getRepositoryName(_ handle: RepositoryHandle) -> OuisyncRequest { + return OuisyncRequest("repository_name", MessagePackValue(handle)) } - public static func repositoryMoveEntry(_ repoHandle: RepositoryHandle, _ srcPath: FilePath, _ dstPath: FilePath) -> MessageRequest { - return MessageRequest("repository_move_entry", MessagePackValue([ + public static func repositoryMoveEntry(_ repoHandle: RepositoryHandle, _ srcPath: FilePath, _ dstPath: FilePath) -> OuisyncRequest { + return OuisyncRequest("repository_move_entry", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(repoHandle), MessagePackValue("src"): MessagePackValue(srcPath.description), MessagePackValue("dst"): MessagePackValue(dstPath.description), ])) } - public static func listEntries(_ handle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("directory_open", MessagePackValue([ + public static func listEntries(_ handle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("directory_open", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(handle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func getEntryVersionHash(_ handle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("repository_entry_version_hash", MessagePackValue([ + public static func getEntryVersionHash(_ handle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("repository_entry_version_hash", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(handle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func directoryExists(_ handle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("directory_exists", MessagePackValue([ + public static func directoryExists(_ handle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("directory_exists", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(handle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func directoryRemove(_ handle: RepositoryHandle, _ path: FilePath, _ recursive: Bool) -> MessageRequest { - return MessageRequest("directory_remove", MessagePackValue([ + public static func directoryRemove(_ handle: RepositoryHandle, _ path: FilePath, _ recursive: Bool) -> OuisyncRequest { + return OuisyncRequest("directory_remove", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(handle), MessagePackValue("path"): MessagePackValue(path.description), MessagePackValue("recursive"): MessagePackValue(recursive), ])) } - public static func directoryCreate(_ repoHandle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("directory_create", MessagePackValue([ + public static func directoryCreate(_ repoHandle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("directory_create", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(repoHandle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func fileOpen(_ repoHandle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("file_open", MessagePackValue([ + public static func fileOpen(_ repoHandle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("file_open", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(repoHandle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func fileExists(_ handle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("file_exists", MessagePackValue([ + public static func fileExists(_ handle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("file_exists", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(handle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func fileRemove(_ handle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("file_remove", MessagePackValue([ + public static func fileRemove(_ handle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("file_remove", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(handle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func fileClose(_ fileHandle: FileHandle) -> MessageRequest { - return MessageRequest("file_close", MessagePackValue(fileHandle)) + public static func fileClose(_ fileHandle: FileHandle) -> OuisyncRequest { + return OuisyncRequest("file_close", MessagePackValue(fileHandle)) } - public static func fileRead(_ fileHandle: FileHandle, _ offset: UInt64, _ len: UInt64) -> MessageRequest { - return MessageRequest("file_read", MessagePackValue([ + public static func fileRead(_ fileHandle: FileHandle, _ offset: UInt64, _ len: UInt64) -> OuisyncRequest { + return OuisyncRequest("file_read", MessagePackValue([ MessagePackValue("file"): MessagePackValue(fileHandle), MessagePackValue("offset"): MessagePackValue(offset), MessagePackValue("len"): MessagePackValue(len), ])) } - public static func fileTruncate(_ fileHandle: FileHandle, _ len: UInt64) -> MessageRequest { - return MessageRequest("file_truncate", MessagePackValue([ + public static func fileTruncate(_ fileHandle: FileHandle, _ len: UInt64) -> OuisyncRequest { + return OuisyncRequest("file_truncate", MessagePackValue([ MessagePackValue("file"): MessagePackValue(fileHandle), MessagePackValue("len"): MessagePackValue(len), ])) } - public static func fileLen(_ fileHandle: FileHandle) -> MessageRequest { - return MessageRequest("file_len", MessagePackValue(fileHandle)) + public static func fileLen(_ fileHandle: FileHandle) -> OuisyncRequest { + return OuisyncRequest("file_len", MessagePackValue(fileHandle)) } - public static func fileCreate(_ repoHandle: RepositoryHandle, _ path: FilePath) -> MessageRequest { - return MessageRequest("file_create", MessagePackValue([ + public static func fileCreate(_ repoHandle: RepositoryHandle, _ path: FilePath) -> OuisyncRequest { + return OuisyncRequest("file_create", MessagePackValue([ MessagePackValue("repository"): MessagePackValue(repoHandle), MessagePackValue("path"): MessagePackValue(path.description), ])) } - public static func fileWrite(_ fileHandle: FileHandle, _ offset: UInt64, _ data: Data) -> MessageRequest { - return MessageRequest("file_write", MessagePackValue([ + public static func fileWrite(_ fileHandle: FileHandle, _ offset: UInt64, _ data: Data) -> OuisyncRequest { + return OuisyncRequest("file_write", MessagePackValue([ MessagePackValue("file"): MessagePackValue(fileHandle), MessagePackValue("offset"): MessagePackValue(offset), MessagePackValue("data"): MessagePackValue(data), @@ -141,11 +141,11 @@ public class MessageRequest { //-------------------------------------------------------------------- -public class OutgoingMessage { // TODO: Rename to RequestMessage +public class OuisyncRequestMessage { public let messageId: MessageId - public let request: MessageRequest + public let request: OuisyncRequest - init(_ messageId: MessageId, _ request: MessageRequest) { + init(_ messageId: MessageId, _ request: OuisyncRequest) { self.messageId = messageId self.request = request } @@ -158,7 +158,7 @@ public class OutgoingMessage { // TODO: Rename to RequestMessage return message } - public static func deserialize(_ data: [UInt8]) -> OutgoingMessage? { + public static func deserialize(_ data: [UInt8]) -> OuisyncRequestMessage? { guard let (id, data) = readMessageId(data) else { return nil } @@ -171,15 +171,15 @@ public class OutgoingMessage { // TODO: Rename to RequestMessage guard let functionName = e.key.stringValue else { return nil } let functionArguments = e.value - return OutgoingMessage(id, MessageRequest(functionName, functionArguments)) + return OuisyncRequestMessage(id, OuisyncRequest(functionName, functionArguments)) } } -public class IncomingMessage { // TODO: Rename to ResponseMessage +public class OuisyncResponseMessage { public let messageId: MessageId - public let payload: IncomingPayload + public let payload: OuisyncResponsePayload - public init(_ messageId: MessageId, _ payload: IncomingPayload) { + public init(_ messageId: MessageId, _ payload: OuisyncResponsePayload) { self.messageId = messageId self.payload = payload } @@ -211,7 +211,7 @@ public class IncomingMessage { // TODO: Rename to ResponseMessage } } - public static func deserialize(_ bytes: [UInt8]) -> IncomingMessage? { + public static func deserialize(_ bytes: [UInt8]) -> OuisyncResponseMessage? { guard let (id, data) = readMessageId(bytes) else { return nil } @@ -221,15 +221,15 @@ public class IncomingMessage { // TODO: Rename to ResponseMessage if case let .map(m) = unpacked { if let success = m[.string("success")] { if let value = parseResponse(success) { - return IncomingMessage(id, IncomingPayload.response(value)) + return OuisyncResponseMessage(id, OuisyncResponsePayload.response(value)) } } else if let error = m[.string("failure")] { if let response = parseFailure(error) { - return IncomingMessage(id, IncomingPayload.error(response)) + return OuisyncResponseMessage(id, OuisyncResponsePayload.error(response)) } } else if let notification = m[.string("notification")] { if let value = parseNotification(notification) { - return IncomingMessage(id, IncomingPayload.notification(value)) + return OuisyncResponseMessage(id, OuisyncResponsePayload.notification(value)) } } } @@ -238,7 +238,7 @@ public class IncomingMessage { // TODO: Rename to ResponseMessage } } -extension IncomingMessage: CustomStringConvertible { +extension OuisyncResponseMessage: CustomStringConvertible { public var description: String { return "IncomingMessage(\(messageId), \(payload))" } @@ -261,13 +261,13 @@ fileprivate func readMessageId(_ data: [UInt8]) -> (MessageId, Data)? { } //-------------------------------------------------------------------- -public enum IncomingPayload { +public enum OuisyncResponsePayload { case response(Response) case notification(OuisyncNotification) case error(OuisyncError) } -extension IncomingPayload: CustomStringConvertible { +extension OuisyncResponsePayload: CustomStringConvertible { public var description: String { switch self { case .response(let response): diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift index 741f9a65..904bb606 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift @@ -41,10 +41,9 @@ public class OuisyncSession { } // Can be called from a separate thread. - public func invoke(_ requestMsg: OutgoingMessage) async -> IncomingMessage { - let responsePayload: IncomingPayload + public func invoke(_ requestMsg: OuisyncRequestMessage) async -> OuisyncResponseMessage { + let responsePayload: OuisyncResponsePayload - NSLog("AAAAAA \(requestMsg.request.functionName)") do { responsePayload = .response(try await sendRequest(requestMsg.request)) } catch let e as OuisyncError { @@ -53,27 +52,27 @@ public class OuisyncSession { fatalError("Unhandled exception in OuisyncSession.invoke: \(e)") } - return IncomingMessage(requestMsg.messageId, responsePayload) + return OuisyncResponseMessage(requestMsg.messageId, responsePayload) } public func listRepositories() async throws -> [OuisyncRepository] { - let response = try await sendRequest(MessageRequest.listRepositories()); + let response = try await sendRequest(OuisyncRequest.listRepositories()); let handles = response.toUInt64Array() return handles.map({ OuisyncRepository($0, self) }) } public func subscribeToRepositoryListChange() async throws -> NotificationStream { - let subscriptionId = try await sendRequest(MessageRequest.subscribeToRepositoryListChange()).toUInt64(); + let subscriptionId = try await sendRequest(OuisyncRequest.subscribeToRepositoryListChange()).toUInt64(); return NotificationStream(subscriptionId, notificationSubscriptions) } public func subscribeToRepositoryChange(_ repo: RepositoryHandle) async throws -> NotificationStream { - let subscriptionId = try await sendRequest(MessageRequest.subscribeToRepositoryChange(repo)).toUInt64(); + let subscriptionId = try await sendRequest(OuisyncRequest.subscribeToRepositoryChange(repo)).toUInt64(); return NotificationStream(subscriptionId, notificationSubscriptions) } // Can be called from a separate thread. - internal func sendRequest(_ request: MessageRequest) async throws -> Response { + internal func sendRequest(_ request: OuisyncRequest) async throws -> Response { let messageId = generateMessageId() async let onResponse = withCheckedThrowingContinuation { [weak self] continuation in @@ -81,7 +80,7 @@ public class OuisyncSession { synchronized(session) { session.pendingResponses[messageId] = continuation - session.sendDataToOuisyncLib(OutgoingMessage(messageId, request).serialize()); + session.sendDataToOuisyncLib(OuisyncRequestMessage(messageId, request).serialize()); } } @@ -110,7 +109,7 @@ public class OuisyncSession { // Use this function to pass data from the backend. // It may be called from a separate thread. public func onReceiveDataFromOuisyncLib(_ data: [UInt8]) { - let maybe_message = IncomingMessage.deserialize(data) + let maybe_message = OuisyncResponseMessage.deserialize(data) guard let message = maybe_message else { let hex = data.map({String(format:"%02x", $0)}).joined(separator: ",") From 0b9b14d7eef99d63baed5de70889e9897db571e7 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Tue, 23 Jul 2024 13:12:31 +0200 Subject: [PATCH 07/14] Add comment --- .../swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift | 2 ++ cli/src/protocol.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift index 904bb606..4ea83b53 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift @@ -22,6 +22,8 @@ public class OuisyncSession { } public static func create(_ configPath: String, _ logPath: String, _ ffi: OuisyncFFI) throws -> OuisyncSession { + // Init with an invalid sessionHandle because we need the OuisyncSession instance to + // create the callback, which is in turn needed to create the proper sessionHandle. let session = OuisyncSession(0, ffi) let callback: FFICallback = { context, dataPointer, size in diff --git a/cli/src/protocol.rs b/cli/src/protocol.rs index 699d6c03..bfa25393 100644 --- a/cli/src/protocol.rs +++ b/cli/src/protocol.rs @@ -276,7 +276,7 @@ pub(crate) enum Request { /// Repository token #[arg(short, long)] token: String, - } + }, } #[derive(Clone, Copy, Debug, Serialize, Deserialize, ValueEnum)] From aed55e4b845edf3aae1a4fff2bdd2a0dea724082 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Tue, 23 Jul 2024 15:56:43 +0200 Subject: [PATCH 08/14] Decrease swift-tools-version to the latest stable --- bindings/swift/OuisyncLib/Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/swift/OuisyncLib/Package.swift b/bindings/swift/OuisyncLib/Package.swift index 5f26dd94..78e5691e 100644 --- a/bindings/swift/OuisyncLib/Package.swift +++ b/bindings/swift/OuisyncLib/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version: 5.9 +// swift-tools-version: 5.8 // The swift-tools-version declares the minimum version of Swift required to build this package. import PackageDescription From 887488064bfaf9063df02ce3a830bfd0c33ed827 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Wed, 24 Jul 2024 13:43:07 +0200 Subject: [PATCH 09/14] Enable creating new FFI client connections --- .../Sources/OuisyncLib/OuisyncClient.swift | 76 +++++++++++++++++++ .../Sources/OuisyncLib/OuisyncFFI.swift | 43 ++--------- .../Sources/OuisyncLib/OuisyncSession.swift | 53 ++++--------- .../OuisyncLibFFI/include/OuisyncFFI.h | 8 +- 4 files changed, 101 insertions(+), 79 deletions(-) create mode 100644 bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift new file mode 100644 index 00000000..ef9fae5e --- /dev/null +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift @@ -0,0 +1,76 @@ +// +// File.swift +// +// +// Created by Peter Jankuliak on 23/07/2024. +// + +import Foundation +import OuisyncLibFFI + +public class OuisyncClient { + var clientHandle: OuisyncClientHandle + let ffi: OuisyncFFI + public var onReceiveFromBackend: OuisyncOnReceiveFromBackend? = nil + + public static func create(_ configPath: String, _ logPath: String, _ ffi: OuisyncFFI) throws -> OuisyncClient { + // Init with an invalid sessionHandle because we need the OuisyncSession instance to + // create the callback, which is in turn needed to create the proper sessionHandle. + let client = OuisyncClient(0, ffi) + + let callback: FFICallback = { context, dataPointer, size in + let client: OuisyncClient = OuisyncFFI.fromUnretainedPtr(ptr: context!) + guard let onReceive = client.onReceiveFromBackend else { + fatalError("OuisyncClient has no onReceive handler set") + } + onReceive(Array(UnsafeBufferPointer(start: dataPointer, count: Int(exactly: size)!))) + } + + let result = ffi.ffiSessionCreate(ffi.sessionKindShared, configPath, logPath, OuisyncFFI.toUnretainedPtr(obj: client), callback); + + if result.errorCode != 0 { + throw SessionCreateError("Failed to create session, code:\(result.errorCode), message:\(result.errorMessage!)") + } + + client.clientHandle = result.clientHandle + return client + } + + fileprivate init(_ clientHandle: OuisyncClientHandle, _ ffi: OuisyncFFI) { + self.clientHandle = clientHandle + self.ffi = ffi + } + + public func sendToBackend(_ data: [UInt8]) { + let count = data.count; + data.withUnsafeBufferPointer({ maybePointer in + if let pointer = maybePointer.baseAddress { + ffi.ffiSessionChannelSend(clientHandle, pointer, UInt64(count)) + } + }) + } + + func close() async { + typealias Continuation = CheckedContinuation + + class Context { + let clientHandle: OuisyncClientHandle + let continuation: Continuation + init(_ clientHandle: OuisyncClientHandle, _ continuation: Continuation) { + self.clientHandle = clientHandle + self.continuation = continuation + } + } + + await withCheckedContinuation(function: "FFI.closeSession", { continuation in + let context = OuisyncFFI.toRetainedPtr(obj: Context(clientHandle, continuation)) + let callback: FFICallback = { context, dataPointer, size in + let context: Context = OuisyncFFI.fromRetainedPtr(ptr: context!) + context.continuation.resume() + } + ffi.ffiSessionClose(clientHandle, context, callback) + }) + } +} + +public typealias OuisyncOnReceiveFromBackend = ([UInt8]) -> Void diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift index 69ad065f..960aa220 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift @@ -11,10 +11,10 @@ import OuisyncLibFFI typealias FFISessionKind = UInt8 typealias FFIContext = UnsafeRawPointer typealias FFICallback = @convention(c) (FFIContext?, UnsafePointer, CUnsignedLongLong) -> Void; -typealias FFISessionCreate = @convention(c) (FFISessionKind, UnsafePointer, UnsafePointer, UnsafeRawPointer?, FFICallback) -> SessionCreateResult; -typealias FFISessionGrab = @convention(c) (UnsafeRawPointer?, FFICallback) -> SessionCreateResult; -typealias FFISessionClose = @convention(c) (SessionHandle, FFIContext?, FFICallback) -> Void; -typealias FFISessionChannelSend = @convention(c) (SessionHandle, UnsafeRawPointer, UInt64) -> Void; +typealias FFISessionCreate = @convention(c) (FFISessionKind, UnsafePointer, UnsafePointer, UnsafeRawPointer?, FFICallback) -> OuisyncSessionCreateResult; +typealias FFISessionGrab = @convention(c) (UnsafeRawPointer?, FFICallback) -> OuisyncSessionCreateResult; +typealias FFISessionClose = @convention(c) (OuisyncClientHandle, FFIContext?, FFICallback) -> Void; +typealias FFISessionChannelSend = @convention(c) (OuisyncClientHandle, UnsafeRawPointer, UInt64) -> Void; class SessionCreateError : Error, CustomStringConvertible { let message: String @@ -39,14 +39,14 @@ public class OuisyncFFI { } // Blocks until Dart creates a session, then returns it. - func waitForSession(_ context: UnsafeRawPointer, _ callback: FFICallback) async throws -> SessionHandle { + func waitForSession(_ context: UnsafeRawPointer, _ callback: FFICallback) async throws -> OuisyncClientHandle { // TODO: Might be worth change the ffi function to call a callback when the session becomes created instead of bussy sleeping. var elapsed: UInt64 = 0; while true { let result = ffiSessionGrab(context, callback) if result.errorCode == 0 { NSLog("๐Ÿ˜€ Got Ouisync session"); - return result.session + return result.clientHandle } NSLog("๐Ÿคจ Ouisync session not yet ready. Code: \(result.errorCode) Message:\(String(cString: result.errorMessage!))"); @@ -64,37 +64,6 @@ public class OuisyncFFI { } } - func channelSend(_ session: SessionHandle, _ data: [UInt8]) { - let count = data.count; - data.withUnsafeBufferPointer({ maybePointer in - if let pointer = maybePointer.baseAddress { - ffiSessionChannelSend(session, pointer, UInt64(count)) - } - }) - } - - func closeSession(_ session: SessionHandle) async { - typealias C = CheckedContinuation - - class Context { - let session: SessionHandle - let continuation: C - init(_ session: SessionHandle, _ continuation: C) { - self.session = session - self.continuation = continuation - } - } - - await withCheckedContinuation(function: "FFI.closeSession", { continuation in - let context = Self.toRetainedPtr(obj: Context(session, continuation)) - let callback: FFICallback = { context, dataPointer, size in - let context: Context = OuisyncFFI.fromRetainedPtr(ptr: context!) - context.continuation.resume() - } - ffiSessionClose(session, context, callback) - }) - } - // Retained pointers have their reference counter incremented by 1. // https://stackoverflow.com/a/33310021/273348 static func toUnretainedPtr(obj : T) -> UnsafeRawPointer { diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift index 4ea83b53..a7d1df6a 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncSession.swift @@ -9,37 +9,27 @@ import MessagePack import OuisyncLibFFI public class OuisyncSession { - var sessionHandle: SessionHandle - let ffi: OuisyncFFI + let configsPath: String + let logsPath: String + + let client: OuisyncClient var nextMessageId: MessageId = 0 var pendingResponses: [MessageId: CheckedContinuation] = [:] var notificationSubscriptions: NotificationStream.State = NotificationStream.State() - fileprivate init(_ sessionHandle: SessionHandle, _ ffi: OuisyncFFI) { - self.sessionHandle = sessionHandle - self.ffi = ffi - } - - public static func create(_ configPath: String, _ logPath: String, _ ffi: OuisyncFFI) throws -> OuisyncSession { - // Init with an invalid sessionHandle because we need the OuisyncSession instance to - // create the callback, which is in turn needed to create the proper sessionHandle. - let session = OuisyncSession(0, ffi) - - let callback: FFICallback = { context, dataPointer, size in - let session: OuisyncSession = OuisyncFFI.fromUnretainedPtr(ptr: context!) - let data = Array(UnsafeBufferPointer(start: dataPointer, count: Int(exactly: size)!)) - session.onReceiveDataFromOuisyncLib(data) - } - - let result = ffi.ffiSessionCreate(ffi.sessionKindShared, configPath, logPath, OuisyncFFI.toUnretainedPtr(obj: session), callback); + public init(_ configsPath: String, _ logsPath: String, _ ffi: OuisyncFFI) throws { + self.configsPath = configsPath + self.logsPath = logsPath - if result.errorCode != 0 { - throw SessionCreateError("Failed to create session, code:\(result.errorCode), message:\(result.errorMessage!)") + client = try OuisyncClient.create(configsPath, logsPath, ffi) + client.onReceiveFromBackend = { [weak self] data in + self?.onReceiveDataFromOuisyncLib(data) } + } - session.sessionHandle = result.session - return session + public func connectNewClient() throws -> OuisyncClient { + return try OuisyncClient.create(configsPath, logsPath, client.ffi) } // Can be called from a separate thread. @@ -82,7 +72,8 @@ public class OuisyncSession { synchronized(session) { session.pendingResponses[messageId] = continuation - session.sendDataToOuisyncLib(OuisyncRequestMessage(messageId, request).serialize()); + let data = OuisyncRequestMessage(messageId, request).serialize() + session.client.sendToBackend(data) } } @@ -98,16 +89,6 @@ public class OuisyncSession { } } - fileprivate func sendDataToOuisyncLib(_ data: [UInt8]) { - //librarySender.sendDataToOuisyncLib(data); - let count = data.count; - data.withUnsafeBufferPointer({ maybePointer in - if let pointer = maybePointer.baseAddress { - ffi.ffiSessionChannelSend(sessionHandle, pointer, UInt64(count)) - } - }) - } - // Use this function to pass data from the backend. // It may be called from a separate thread. public func onReceiveDataFromOuisyncLib(_ data: [UInt8]) { @@ -167,10 +148,6 @@ fileprivate func synchronized(_ lock: AnyObject, _ closure: () throws -> T) r return try closure() } -public protocol OuisyncLibrarySenderProtocol { - func sendDataToOuisyncLib(_ data: [UInt8]) -} - public class NotificationStream { typealias Id = UInt64 typealias Rx = AsyncStream<()> diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h b/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h index 3bd735a0..3521606b 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLibFFI/include/OuisyncFFI.h @@ -10,14 +10,14 @@ #include -typedef uint64_t SessionHandle; +typedef uint64_t OuisyncClientHandle; -struct SessionCreateResult { - SessionHandle session; +struct OuisyncSessionCreateResult { + OuisyncClientHandle clientHandle; uint16_t errorCode; const char* errorMessage; }; -typedef struct SessionCreateResult SessionCreateResult; +typedef struct OuisyncSessionCreateResult OuisyncSessionCreateResult; #endif /* OuisyncFFI_h */ From d7d09df0b94e8d5b147256ab454466bb01eac32a Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Wed, 24 Jul 2024 14:16:26 +0200 Subject: [PATCH 10/14] Remove debug output --- bindings/dart/lib/internal/message_matcher.dart | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bindings/dart/lib/internal/message_matcher.dart b/bindings/dart/lib/internal/message_matcher.dart index ddcef61e..54f273a5 100644 --- a/bindings/dart/lib/internal/message_matcher.dart +++ b/bindings/dart/lib/internal/message_matcher.dart @@ -65,7 +65,7 @@ class MessageMatcher { final message = deserialize(bytes.sublist(8)); // DEBUG - print('recv: id: $id, message: $message'); + //print('recv: id: $id, message: $message'); if (message is! Map) { return; @@ -103,7 +103,6 @@ class MessageMatcher { } void _handleResponseSuccess(Completer completer, Object? payload) { - print(">>>>>>>>>>>>>>>>> received $payload"); if (payload == "none") { completer.complete(null); return; From cc2d58ca61a87a4bff0addfce66ee0a78745412d Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Thu, 25 Jul 2024 15:52:19 +0200 Subject: [PATCH 11/14] Add code to OuisyncError debug print --- bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncError.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncError.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncError.swift index dd137f83..1061c818 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncError.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncError.swift @@ -79,6 +79,6 @@ public class OuisyncError : Error, CustomDebugStringConvertible { case .Other: codeStr = "Unspecified error" } - return "OuisyncError(code:\"\(codeStr)\", message:\"\(message)\")" + return "OuisyncError(code:\(code), codeStr:\"\(codeStr)\", message:\"\(message)\")" } } From c0fde0b768a0a07b6ec767e69a76f8baa67a0440 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Thu, 25 Jul 2024 16:27:27 +0200 Subject: [PATCH 12/14] Fix dart analyze {lib,example} --- .../example/linux/flutter/generated_plugin_registrant.cc | 6 +++--- .../dart/example/linux/flutter/generated_plugins.cmake | 2 +- .../example/macos/Flutter/GeneratedPluginRegistrant.swift | 2 ++ .../windows/flutter/generated_plugin_registrant.cc | 2 +- .../dart/example/windows/flutter/generated_plugins.cmake | 2 +- bindings/dart/lib/client.dart | 5 +---- bindings/dart/lib/internal/channel_client.dart | 3 +-- bindings/dart/lib/internal/direct_client.dart | 8 ++++---- 8 files changed, 14 insertions(+), 16 deletions(-) diff --git a/bindings/dart/example/linux/flutter/generated_plugin_registrant.cc b/bindings/dart/example/linux/flutter/generated_plugin_registrant.cc index aee705b7..e4b55871 100644 --- a/bindings/dart/example/linux/flutter/generated_plugin_registrant.cc +++ b/bindings/dart/example/linux/flutter/generated_plugin_registrant.cc @@ -6,10 +6,10 @@ #include "generated_plugin_registrant.h" -#include +#include void fl_register_plugins(FlPluginRegistry* registry) { - g_autoptr(FlPluginRegistrar) ouisync_plugin_registrar = + g_autoptr(FlPluginRegistrar) ouisync_registrar = fl_plugin_registry_get_registrar_for_plugin(registry, "OuisyncPlugin"); - ouisync_plugin_register_with_registrar(ouisync_plugin_registrar); + ouisync_plugin_register_with_registrar(ouisync_registrar); } diff --git a/bindings/dart/example/linux/flutter/generated_plugins.cmake b/bindings/dart/example/linux/flutter/generated_plugins.cmake index cdc6b59a..4d87356c 100644 --- a/bindings/dart/example/linux/flutter/generated_plugins.cmake +++ b/bindings/dart/example/linux/flutter/generated_plugins.cmake @@ -3,7 +3,7 @@ # list(APPEND FLUTTER_PLUGIN_LIST - ouisync_plugin + ouisync ) list(APPEND FLUTTER_FFI_PLUGIN_LIST diff --git a/bindings/dart/example/macos/Flutter/GeneratedPluginRegistrant.swift b/bindings/dart/example/macos/Flutter/GeneratedPluginRegistrant.swift index e777c67d..7e32157b 100644 --- a/bindings/dart/example/macos/Flutter/GeneratedPluginRegistrant.swift +++ b/bindings/dart/example/macos/Flutter/GeneratedPluginRegistrant.swift @@ -5,8 +5,10 @@ import FlutterMacOS import Foundation +import ouisync import path_provider_foundation func RegisterGeneratedPlugins(registry: FlutterPluginRegistry) { + OuisyncPlugin.register(with: registry.registrar(forPlugin: "OuisyncPlugin")) PathProviderPlugin.register(with: registry.registrar(forPlugin: "PathProviderPlugin")) } diff --git a/bindings/dart/example/windows/flutter/generated_plugin_registrant.cc b/bindings/dart/example/windows/flutter/generated_plugin_registrant.cc index 3c56d201..fad947ce 100644 --- a/bindings/dart/example/windows/flutter/generated_plugin_registrant.cc +++ b/bindings/dart/example/windows/flutter/generated_plugin_registrant.cc @@ -6,7 +6,7 @@ #include "generated_plugin_registrant.h" -#include +#include void RegisterPlugins(flutter::PluginRegistry* registry) { OuisyncPluginRegisterWithRegistrar( diff --git a/bindings/dart/example/windows/flutter/generated_plugins.cmake b/bindings/dart/example/windows/flutter/generated_plugins.cmake index 692829f8..4a728288 100644 --- a/bindings/dart/example/windows/flutter/generated_plugins.cmake +++ b/bindings/dart/example/windows/flutter/generated_plugins.cmake @@ -3,7 +3,7 @@ # list(APPEND FLUTTER_PLUGIN_LIST - ouisync_plugin + ouisync ) list(APPEND FLUTTER_FFI_PLUGIN_LIST diff --git a/bindings/dart/lib/client.dart b/bindings/dart/lib/client.dart index 85d258f6..1f8accc2 100644 --- a/bindings/dart/lib/client.dart +++ b/bindings/dart/lib/client.dart @@ -1,10 +1,8 @@ import 'dart:async'; import 'dart:collection'; -import 'internal/direct_client.dart'; export 'internal/direct_client.dart'; -import 'internal/channel_client.dart'; export 'internal/channel_client.dart'; abstract class Client { @@ -51,10 +49,9 @@ class Subscription { Stream get stream => _controller.stream; - @override Future close() async { if (_controller.hasListener) { - await _controller.close(); + return await _controller.close(); } } diff --git a/bindings/dart/lib/internal/channel_client.dart b/bindings/dart/lib/internal/channel_client.dart index 868de535..e62cd7d3 100644 --- a/bindings/dart/lib/internal/channel_client.dart +++ b/bindings/dart/lib/internal/channel_client.dart @@ -4,7 +4,6 @@ import 'message_matcher.dart'; class ChannelClient extends Client { final MethodChannel _channel; - bool _isClosed = false; final _messageMatcher = MessageMatcher(); ChannelClient(String channelName) : _channel = MethodChannel(channelName) { @@ -36,6 +35,6 @@ class ChannelClient extends Client { return null; } + @override Subscriptions subscriptions() => _messageMatcher.subscriptions(); - } diff --git a/bindings/dart/lib/internal/direct_client.dart b/bindings/dart/lib/internal/direct_client.dart index 9e2a0f1c..55e76380 100644 --- a/bindings/dart/lib/internal/direct_client.dart +++ b/bindings/dart/lib/internal/direct_client.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:collection'; import 'dart:convert'; import 'dart:ffi'; import 'dart:isolate'; @@ -50,7 +49,7 @@ class DirectClient extends Client { _messageMatcher.close(); - await _invoke_native_async( + await _invokeNativeAsync( (port) => bindings.session_close( handle, NativeApi.postCObject, @@ -79,7 +78,7 @@ class DirectClient extends Client { } Future copyToRawFd(int fileHandle, int fd) { - return _invoke_native_async( + return _invokeNativeAsync( (port) => bindings.file_copy_to_raw_fd( handle, fileHandle, @@ -90,6 +89,7 @@ class DirectClient extends Client { ); } + @override Subscriptions subscriptions() => _messageMatcher.subscriptions(); void _send(Uint8List data) { @@ -107,7 +107,7 @@ class DirectClient extends Client { } // Helper to invoke a native async function. -Future _invoke_native_async(void Function(int) fun) async { +Future _invokeNativeAsync(void Function(int) fun) async { final recvPort = ReceivePort(); try { From b9ddede0bc775a16fbad183ead689f36454f9261 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Fri, 26 Jul 2024 14:59:42 +0200 Subject: [PATCH 13/14] Build the ouisync ffi as part of the package on MacOS Also remove automatic loading of the dylib library from Flutter. It'll only be loaded when session is created using the `Session.create` function but not when created with the `Session.createChanneled` function). --- bindings/dart/lib/bindings.dart | 6 +- bindings/dart/lib/internal/direct_client.dart | 11 +-- bindings/dart/lib/ouisync.dart | 13 ++- bindings/swift/OuisyncLib/Package.swift | 4 +- .../OuisyncDyLibBuilder.swift | 89 +++++++++++++++++++ .../Sources/OuisyncLib/OuisyncFFI.swift | 31 ++----- 6 files changed, 117 insertions(+), 37 deletions(-) create mode 100644 bindings/swift/OuisyncLib/Plugins/OuisyncDyLibBuilder/OuisyncDyLibBuilder.swift diff --git a/bindings/dart/lib/bindings.dart b/bindings/dart/lib/bindings.dart index b14c8c88..41315ebf 100644 --- a/bindings/dart/lib/bindings.dart +++ b/bindings/dart/lib/bindings.dart @@ -8,7 +8,7 @@ import 'package:flutter/foundation.dart' show kReleaseMode; export 'bindings.g.dart'; -final bindings = Bindings(_defaultLib()); +Bindings? bindings; typedef PostCObject = Int8 Function(Int64, Pointer); @@ -86,6 +86,10 @@ class Bindings { .lookup>('free_string') .asFunction(); + static Bindings loadDefault() { + return Bindings(_defaultLib()); + } + final session_create_dart session_create; final session_channel_send_dart session_channel_send; final session_close_dart session_close; diff --git a/bindings/dart/lib/internal/direct_client.dart b/bindings/dart/lib/internal/direct_client.dart index 55e76380..598bb6db 100644 --- a/bindings/dart/lib/internal/direct_client.dart +++ b/bindings/dart/lib/internal/direct_client.dart @@ -18,8 +18,9 @@ class DirectClient extends Client { int _handle; final Stream _stream; final MessageMatcher _messageMatcher = MessageMatcher(); + final Bindings _bindings; - DirectClient(this._handle, ReceivePort port) : _stream = port.cast(), super() { + DirectClient(this._handle, ReceivePort port, this._bindings) : _stream = port.cast(), super() { unawaited(_receive()); } @@ -50,7 +51,7 @@ class DirectClient extends Client { _messageMatcher.close(); await _invokeNativeAsync( - (port) => bindings.session_close( + (port) => _bindings.session_close( handle, NativeApi.postCObject, port, @@ -68,7 +69,7 @@ class DirectClient extends Client { _messageMatcher.close(); - bindings.session_close_blocking(handle); + _bindings.session_close_blocking(handle); } Future _receive() async { @@ -79,7 +80,7 @@ class DirectClient extends Client { Future copyToRawFd(int fileHandle, int fd) { return _invokeNativeAsync( - (port) => bindings.file_copy_to_raw_fd( + (port) => _bindings.file_copy_to_raw_fd( handle, fileHandle, fd, @@ -98,7 +99,7 @@ class DirectClient extends Client { try { buffer.asTypedList(data.length).setAll(0, data); - bindings.session_channel_send(_handle, buffer, data.length); + _bindings.session_channel_send(_handle, buffer, data.length); } finally { malloc.free(buffer); } diff --git a/bindings/dart/lib/ouisync.dart b/bindings/dart/lib/ouisync.dart index 28d711bb..493f365d 100644 --- a/bindings/dart/lib/ouisync.dart +++ b/bindings/dart/lib/ouisync.dart @@ -52,7 +52,12 @@ class Session { } final recvPort = ReceivePort(); - final result = _withPoolSync((pool) => bindings.session_create( + + if (bindings == null) { + bindings = Bindings.loadDefault(); + } + + final result = _withPoolSync((pool) => bindings!.session_create( kind.encode(), pool.toNativeUtf8(configPath), logPath != null ? pool.toNativeUtf8(logPath) : nullptr, @@ -71,7 +76,7 @@ class Session { throw Error(errorCode, errorMessage); } - final client = DirectClient(handle, recvPort); + final client = DirectClient(handle, recvPort, bindings!); return Session._(client); } @@ -923,7 +928,7 @@ class File { /// Print log message void logPrint(LogLevel level, String scope, String message) => - _withPoolSync((pool) => bindings.log_print( + _withPoolSync((pool) => bindings!.log_print( level.encode(), pool.toNativeUtf8(scope), pool.toNativeUtf8(message), @@ -992,5 +997,5 @@ extension Utf8Pointer on Pointer { // Free a pointer that was allocated by the native side. void freeString(Pointer ptr) { - bindings.free_string(ptr.cast()); + bindings!.free_string(ptr.cast()); } diff --git a/bindings/swift/OuisyncLib/Package.swift b/bindings/swift/OuisyncLib/Package.swift index 78e5691e..f47e53d2 100644 --- a/bindings/swift/OuisyncLib/Package.swift +++ b/bindings/swift/OuisyncLib/Package.swift @@ -22,7 +22,9 @@ let package = Package( name: "OuisyncLib", dependencies: [.product(name:"MessagePack", package: "MessagePack.swift"), "OuisyncLibFFI"] ), - .target(name: "OuisyncLibFFI", dependencies: []), + .target(name: "OuisyncLibFFI", dependencies: ["OuisyncDyLibBuilder"]), + .plugin(name: "OuisyncDyLibBuilder", capability: .buildTool()), + .testTarget( name: "OuisyncLibTests", dependencies: ["OuisyncLib"]), diff --git a/bindings/swift/OuisyncLib/Plugins/OuisyncDyLibBuilder/OuisyncDyLibBuilder.swift b/bindings/swift/OuisyncLib/Plugins/OuisyncDyLibBuilder/OuisyncDyLibBuilder.swift new file mode 100644 index 00000000..f8136ef7 --- /dev/null +++ b/bindings/swift/OuisyncLib/Plugins/OuisyncDyLibBuilder/OuisyncDyLibBuilder.swift @@ -0,0 +1,89 @@ +// +// File.swift +// +// +// Created by Peter Jankuliak on 26/07/2024. +// + +// A package plugin which builds the libouisync_ffi.dylib library and includes it in the package. +// https://github.com/swiftlang/swift-package-manager/blob/main/Documentation/Plugins.md +// +// TODO: Right now it creates only Debug version of the library and only for the current architecture. +// Can we detect the build type and architecture here? This github ticket seems to indicate it's not +// currently possible: +// https://github.com/swiftlang/swift-package-manager/issues/7110 + +import Foundation +import PackagePlugin + +@main +struct OuisyncDyLibBuilder: BuildToolPlugin { + func createBuildCommands(context: PackagePlugin.PluginContext, target: PackagePlugin.Target) async throws -> [PackagePlugin.Command] { + let workDir = context.pluginWorkDirectory + let dylibName = "libouisync_ffi.dylib" + let dylibPath = workDir.appending("debug").appending(dylibName) + let cargoPath = shell("which cargo").trimmingCharacters(in: .whitespacesAndNewlines) + let inputFiles = findInputFiles() + + return [ + .buildCommand( + displayName: "Build Ouisync FFI .dylib", + executable: Path(cargoPath), + arguments: ["build", "-p", "ouisync-ffi", "--target-dir", workDir], + environment: [:], + inputFiles: inputFiles.map { Path($0) }, + outputFiles: [ dylibPath ]) + ] + } +} + +// This finds files which when changed, the Swift builder will re-execute the build above build command. +// The implementation is not very good because if a new directory/module is added on which this package +// depends, the package builder won't rebuild the ffi library. +// TODO: use `cargo build --build-plan` once it's in stable. +func findInputFiles() -> [String] { + var files = [String]() + let startAts = [ + URL(string: "../../../bridge")!, + URL(string: "../../../deadlock")!, + URL(string: "../../../ffi")!, + URL(string: "../../../lib")!, + URL(string: "../../../net")!, + URL(string: "../../../rand")!, + URL(string: "../../../scoped_task")!, + URL(string: "../../../state_monitor")!, + URL(string: "../../../tracing_fmt")!, + URL(string: "../../../utils")!, + URL(string: "../../../vfs")!, + ] + for startAt in startAts { + if let enumerator = FileManager.default.enumerator(at: startAt, includingPropertiesForKeys: [.isRegularFileKey], options: [.skipsHiddenFiles, .skipsPackageDescendants]) { + for case let fileURL as URL in enumerator { + do { + let fileAttributes = try fileURL.resourceValues(forKeys:[.isRegularFileKey]) + if fileAttributes.isRegularFile! { + files.append(fileURL.path) + } + } catch { fatalError("Error finding input files: \(error), \(fileURL)") } + } + } + } + return files +} + +func shell(_ command: String) -> String { + let task = Process() + let pipe = Pipe() + + task.standardOutput = pipe + task.standardError = pipe + task.arguments = ["-c", command] + task.launchPath = "/bin/zsh" + task.standardInput = nil + task.launch() + + let data = pipe.fileHandleForReading.readDataToEndOfFile() + let output = String(data: data, encoding: .utf8)! + + return output +} diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift index 960aa220..0c763a11 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift @@ -31,7 +31,11 @@ public class OuisyncFFI { let sessionKindShared: FFISessionKind = 0; public init() { - handle = dlopen("libouisync_ffi.dylib", RTLD_NOW)! + // The .dylib is created using the OuisyncDyLibBuilder package plugin in this Swift package. + let libraryName = "libouisync_ffi.dylib" + let resourcePath = Bundle.main.resourcePath! + "/OuisyncLib_OuisyncLibFFI.bundle/Contents/Resources" + handle = dlopen("\(resourcePath)/\(libraryName)", RTLD_NOW)! + ffiSessionGrab = unsafeBitCast(dlsym(handle, "session_grab"), to: FFISessionGrab.self) ffiSessionChannelSend = unsafeBitCast(dlsym(handle, "session_channel_send"), to: FFISessionChannelSend.self) ffiSessionClose = unsafeBitCast(dlsym(handle, "session_close"), to: FFISessionClose.self) @@ -82,28 +86,3 @@ public class OuisyncFFI { return Unmanaged.fromOpaque(ptr).takeRetainedValue() } } - -// --------------------------------------------------------------------------------------- - -typealias Rx = AsyncStream<[UInt8]> -typealias Tx = AsyncStream<[UInt8]>.Continuation - -class Wrap { - let inner: T - init(_ inner: T) { self.inner = inner } -} - -class Channel { - let rx: Rx - let tx: Tx - - init(_ rx: Rx, _ tx: Tx) { self.rx = rx; self.tx = tx } -} - -func makeStream() -> (Rx, Tx) { - var continuation: Rx.Continuation! - let stream = Rx() { continuation = $0 } - return (stream, continuation!) -} - -// --------------------------------------------------------------------------------------- From 5aa8e8c6182920e004c5563a864abf2b313f3606 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Mon, 9 Sep 2024 17:41:21 +0200 Subject: [PATCH 14/14] Fix FFI API change bug after the merge with master --- .../swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift | 3 ++- bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift index ef9fae5e..a482f621 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncClient.swift @@ -26,7 +26,8 @@ public class OuisyncClient { onReceive(Array(UnsafeBufferPointer(start: dataPointer, count: Int(exactly: size)!))) } - let result = ffi.ffiSessionCreate(ffi.sessionKindShared, configPath, logPath, OuisyncFFI.toUnretainedPtr(obj: client), callback); + let logTag = "ouisync-backend" + let result = ffi.ffiSessionCreate(ffi.sessionKindShared, configPath, logPath, logTag, OuisyncFFI.toUnretainedPtr(obj: client), callback); if result.errorCode != 0 { throw SessionCreateError("Failed to create session, code:\(result.errorCode), message:\(result.errorMessage!)") diff --git a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift index 0c763a11..f4095cb7 100644 --- a/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift +++ b/bindings/swift/OuisyncLib/Sources/OuisyncLib/OuisyncFFI.swift @@ -11,7 +11,7 @@ import OuisyncLibFFI typealias FFISessionKind = UInt8 typealias FFIContext = UnsafeRawPointer typealias FFICallback = @convention(c) (FFIContext?, UnsafePointer, CUnsignedLongLong) -> Void; -typealias FFISessionCreate = @convention(c) (FFISessionKind, UnsafePointer, UnsafePointer, UnsafeRawPointer?, FFICallback) -> OuisyncSessionCreateResult; +typealias FFISessionCreate = @convention(c) (FFISessionKind, UnsafePointer, UnsafePointer, UnsafePointer, UnsafeRawPointer?, FFICallback) -> OuisyncSessionCreateResult; typealias FFISessionGrab = @convention(c) (UnsafeRawPointer?, FFICallback) -> OuisyncSessionCreateResult; typealias FFISessionClose = @convention(c) (OuisyncClientHandle, FFIContext?, FFICallback) -> Void; typealias FFISessionChannelSend = @convention(c) (OuisyncClientHandle, UnsafeRawPointer, UInt64) -> Void;