Skip to content

Commit

Permalink
Merge branch 'proxy-session' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Sep 16, 2024
2 parents 8b9a27a + 8c90fde commit 3ba76a2
Show file tree
Hide file tree
Showing 18 changed files with 832 additions and 308 deletions.
6 changes: 5 additions & 1 deletion bindings/dart/lib/bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import 'package:flutter/foundation.dart' show kReleaseMode;

export 'bindings.g.dart';

final bindings = Bindings(_defaultLib());
Bindings? bindings;

typedef PostCObject = Int8 Function(Int64, Pointer<Dart_CObject>);

Expand Down Expand Up @@ -88,6 +88,10 @@ class Bindings {
.lookup<NativeFunction<_free_string_c>>('free_string')
.asFunction();

static Bindings loadDefault() {
return Bindings(_defaultLib());
}

final session_create_dart session_create;
final session_channel_send_dart session_channel_send;
final session_close_dart session_close;
Expand Down
178 changes: 17 additions & 161 deletions bindings/dart/lib/client.dart
Original file line number Diff line number Diff line change
@@ -1,172 +1,27 @@
import 'dart:async';
import 'dart:collection';
import 'dart:ffi';
import 'dart:isolate';
import 'dart:typed_data';

import 'package:ffi/ffi.dart';
import 'package:msgpack_dart/msgpack_dart.dart';
export 'internal/direct_client.dart';

import 'bindings.dart';
import 'ouisync.dart' show Error;
export 'internal/channel_client.dart';

/// Client to interface with ouisync
class Client {
int _handle;
final Stream<Uint8List> _stream;
var _nextMessageId = 0;
final _responses = HashMap<int, Completer<Object?>>();
final _subscriptions = HashMap<int, StreamSink<Object?>>();

Client(this._handle, ReceivePort port) : _stream = port.cast<Uint8List>() {
unawaited(_receive());
}

int get handle => _handle;

Future<T> invoke<T>(String method, [Object? args]) async {
final id = _nextMessageId++;
final completer = Completer();

_responses[id] = completer;

final request = {method: args};

// DEBUG
//print('send: id: $id, request: $request');

try {
// Message format:
//
// +-------------------------------------+-------------------------------------------+
// | id (big endian 64 bit unsigned int) | request (messagepack encoded byte string) |
// +-------------------------------------+-------------------------------------------+
//
// This allows the server to decode the id even if the request is malformed so it can send
// error response back.
final message = (BytesBuilder()
..add((ByteData(8)..setUint64(0, id)).buffer.asUint8List())
..add(serialize(request)))
.takeBytes();

_send(message);

return await completer.future as T;
} finally {
_responses.remove(id);
}
}

int close() {
final handle = _handle;
_handle = 0;
return handle;
}

bool get isClosed => _handle == 0;

void _send(Uint8List data) {
if (_handle == 0) {
throw StateError('session has been closed');
}

// TODO: is there a way to do this without having to allocate whole new buffer?
var buffer = malloc<Uint8>(data.length);

try {
buffer.asTypedList(data.length).setAll(0, data);
bindings.session_channel_send(_handle, buffer, data.length);
} finally {
malloc.free(buffer);
}
}

Future<void> _receive() async {
await for (final bytes in _stream) {
if (bytes.length < 8) {
continue;
}

final id = bytes.buffer.asByteData().getUint64(0);
final message = deserialize(bytes.sublist(8));

// DEBUG
//print('recv: id: $id, message: $message');

if (message is! Map) {
continue;
}

final isSuccess = message.containsKey('success');
final isFailure = message.containsKey('failure');
final isNotification = message.containsKey('notification');

if (isSuccess || isFailure) {
final responseCompleter = _responses.remove(id);
if (responseCompleter == null) {
print('unsolicited response');
continue;
}

if (isSuccess) {
_handleResponseSuccess(responseCompleter, message['success']);
} else if (isFailure) {
_handleResponseFailure(responseCompleter, message['failure']);
}
} else if (isNotification) {
final subscription = _subscriptions[id];
if (subscription == null) {
print('unsolicited notification');
continue;
}

_handleNotification(subscription, message['notification']);
} else {
final responseCompleter = _responses.remove(id);
if (responseCompleter != null) {
_handleInvalidResponse(responseCompleter);
}
}
}
}

void _handleResponseSuccess(Completer<Object?> completer, Object? payload) {
if (payload == "none") {
completer.complete(null);
return;
}

if (payload is Map && payload.length == 1) {
completer.complete(payload.entries.single.value);
} else {
_handleInvalidResponse(completer);
}
}
abstract class Client {
Future<T> invoke<T>(String method, [Object? args]);
Future<void> close();
Subscriptions subscriptions();
}

void _handleResponseFailure(Completer<Object?> completer, Object? payload) {
if (payload is! List) {
_handleInvalidResponse(completer);
return;
}
class Subscriptions {
final _sinks = HashMap<int, StreamSink<Object?>>();

final code = payload[0];
final message = payload[1];
void handle(int subscriptionId, Object? payload) {
final sink = _sinks[subscriptionId];

if (code is! int || message is! String) {
_handleInvalidResponse(completer);
if (sink == null) {
print('unsolicited notification');
return;
}

final error = Error(ErrorCode.decode(code), message);
completer.completeError(error);
}

void _handleInvalidResponse(Completer<Object?> completer) {
final error = Exception('invalid response');
completer.completeError(error);
}

void _handleNotification(StreamSink<Object?> sink, Object? payload) {
try {
if (payload is String) {
sink.add(null);
Expand Down Expand Up @@ -203,7 +58,7 @@ class Subscription {

Future<void> close() async {
if (_controller.hasListener) {
await _controller.close();
return await _controller.close();
}
}

Expand Down Expand Up @@ -253,7 +108,7 @@ class Subscription {
return;
}

_client._subscriptions[_id] = _controller.sink;
_client.subscriptions()._sinks[_id] = _controller.sink;
} catch (e) {
print('failed to subscribe to $_name: $e');
}
Expand All @@ -264,7 +119,7 @@ class Subscription {
return;
}

_client._subscriptions.remove(_id);
_client.subscriptions()._sinks.remove(_id);

try {
await _client.invoke('unsubscribe', _id);
Expand All @@ -281,3 +136,4 @@ enum _SubscriptionState {
subscribing,
unsubscribing,
}

4 changes: 4 additions & 0 deletions bindings/dart/lib/internal/channel.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

//class Channel {
// send(ByteData message) -> Future
//}
40 changes: 40 additions & 0 deletions bindings/dart/lib/internal/channel_client.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import 'package:flutter/services.dart';
import '../client.dart';
import 'message_matcher.dart';

class ChannelClient extends Client {
final MethodChannel _channel;
final _messageMatcher = MessageMatcher();

ChannelClient(String channelName) : _channel = MethodChannel(channelName) {
_channel.setMethodCallHandler(_handleMethodCall);
}

Future<void> initialize() async {
await _messageMatcher.sendAndAwaitResponse("", {}, (Uint8List message) async {
await _channel.invokeMethod("initialize", message);
});
}

@override
Future<T> invoke<T>(String method, [Object? args]) async {
return await _messageMatcher.sendAndAwaitResponse(method, args, (Uint8List message) async {
await _channel.invokeMethod("invoke", message);
});
}

@override
Future<void> close() async {
_channel.setMethodCallHandler(null);
_messageMatcher.close();
}

Future<dynamic> _handleMethodCall(MethodCall call) async {
final args = (call.arguments as List<Object?>).cast<int>();
_messageMatcher.handleResponse(Uint8List.fromList(args));
return null;
}

@override
Subscriptions subscriptions() => _messageMatcher.subscriptions();
}
Loading

0 comments on commit 3ba76a2

Please sign in to comment.