Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for passing IObserver<T> objects over RPC #346

Closed
6 tasks done
AArnott opened this issue Oct 8, 2019 · 13 comments · Fixed by #536
Closed
6 tasks done

Add support for passing IObserver<T> objects over RPC #346

AArnott opened this issue Oct 8, 2019 · 13 comments · Fixed by #536
Assignees
Milestone

Comments

@AArnott
Copy link
Member

AArnott commented Oct 8, 2019

We might define a protocol by which an IObservable<T> property's Subscribe method could be turned into an RPC call to start the remote subscription. We'd then need to define how callbacks from the server regarding that property would be sent.
Then the local JsonRpc instance can track the IObserver<T> object until the observable's OnError or OnComplete method is called.
Also consider supporting IObserver<T> being passed as an argument to an RPC method so that the server can take other args that configure exactly what data should be passed to that observer.

Prerequisites:

  1. Add support for serializing exceptions #505 Add support for serializing exceptions (to support IObserver<T>.OnError(Exception)).
  2. Add JsonRpc.AddLocalRpcTarget<T> method #519 AddLocalRpcTarget<T> to expose an object, limited to one interface it implements.
  3. JsonRpc should preserve order of invocations on RPC server by default #521 JsonRpc orders messages by default (to support IObserver<T>.OnNext(T)`).
  4. JsonRpc.NotifyAsync needs an overload with declared parameter types #523 JsonRpc.NotifyAsync needs an overload with declared parameter types
  5. Dynamic proxies should implement void returning methods by sending notifications #522 Dynamic proxies allow void returning methods, implemented as notifications.
  6. Add support for passing IDisposable objects over RPC #532 Add support for marshaling IDisposable

Tracked internally with https://dev.azure.com/devdiv/DevDiv/_workitems/edit/1146152

@bddckr
Copy link

bddckr commented Oct 15, 2019

Then the local JsonRpc instance can track the IObserver object until the observable's OnError or OnComplete method is called.

Is the intention here to forward these as onError/onComplete on the receiver side, basically completely hiding the fact that JSON-RPC was the wire format (and that there's a pipe feeding it actually)?


Given the power of this project already this enhancement sounds like it may kill the need for some internal gRPC I have in a project that ships to client machines...

@AArnott
Copy link
Member Author

AArnott commented Oct 16, 2019

Yes, @bddckr, that's the idea. We already do similar things with IProgress<T> and even Stream and similar types to enable them to be passed as RPC args. When used in combination with our dynamic proxy generation, it makes JSON-RPC become almost an implementation detail as you can simply call services using very natural, local syntax.

We're using this library instead of gRPC for many of our features.

@NickRedwood
Copy link

NickRedwood commented Nov 3, 2019

I'm looking at starting a small project using either this library or GRPC, and subscribing to IObservables at the remote end is a vital part of it. There are also some IAsyncEnumerable return types from the remote functions, though for my purposes those can be safely converted to hot IObservable and sent back just once.

If I want to return an IObservable from a remote function call today, are there any existing examples showing what I should do?

@AArnott
Copy link
Member Author

AArnott commented Nov 4, 2019

Not yet. The closest thing we have is IProgress<T> support. It will be some considerable development to support this. But there is certainly interest in it.

@NickRedwood
Copy link

NickRedwood commented Nov 4, 2019

Ok, so best general case currently for me to return multiple values (spread over time) from a remote function is to try piggyback off IProgress - might be doable for my particular use case (single immediate subscriber, just want to get the data back to local machine as it's produced).
Thanks.

@AArnott AArnott self-assigned this Jun 24, 2020
@AArnott
Copy link
Member Author

AArnott commented Jun 24, 2020

This is in progress in my iobserver branch.

@NickRedwood
Copy link

NickRedwood commented Jun 24, 2020

Awesome. FYI I did get a reliable proxy of IObservable working over streamJsonRpc, using IProgress for messages and CancellationToken for subscription.

@AArnott
Copy link
Member Author

AArnott commented Jun 24, 2020

One question that comes up from this work is what to do about the IDisposable that is typically returned from an IObservable<T>. Strictly speaking, it's out of scope of just supporting IObserver<T>, which is how I'm scoping it right now, but it seems like something folks are likely to want to do. Marshaling an IDisposable object will be more work.

@AArnott AArnott changed the title Add support for interfaces with IObservable<T> properties Add support for passing IObserver<T> objects over RPC Jun 24, 2020
@NickRedwood
Copy link

That does sound difficult if you are going for a very general solution where IObservable and/or IObserver can be passed around both as parameters and also returned from methods. My usage was narrower - the server is an IObservable and the clients are subscribers. The server object provides a subscribe method that takes an IProgress instance and CancellationToken. The CancellationToken is key because it's used to dispose the subscription IDisposable when the client end cancels it.

public Task Subscribe(IProgress<INotification<T>> progressSubscriber, CancellationToken token)

System.Reactive provides a Subscribe extension method that takes a CancellationToken and manages the disposal internally, and returns void.
There are adapters on both the client and server end to turn it back into IObservable.

Assuming you want to support the actual IObservable.Subscribe method
public void Subscribe(IObserver<T> observer)
... then yes that's more difficult as there is no CancellationToken provided as an argument. Instead of trying to support IDisposable, would you consider adapting to/from CancellationToken/CancellationTokenSource and IDisposable at both ends? I'm not sure off the top of my head whether this could be supported seamlessly without the user knowing, or not.

Might as well post what I am using, in case it is any use to you. Apologies for VB.NET but that part of the codebase is in that language.

Client side:

Imports System.Reactive.Disposables
Imports System.Reactive
Imports System.Reactive.Linq

Public Class JsonRpcObservable
    Public Shared Function CreateRaw(rpc As JsonRpc) As IObservable(Of JToken)
        Return New JsonRpcObservable(Of JToken)(rpc)
    End Function

    Public Shared Function CreateWithExceptionsIgnored(Of T)(rpc As JsonRpc) As IObservable(Of T)
        ' Default behaviour ignores type errors.
        Return New JsonRpcObservable(Of T)(rpc)
    End Function

    Public Shared Function CreateWithMaterializedExceptions(Of T)(rpc As JsonRpc, serializer As JsonSerializer) As IObservable(Of IResult(Of Exception, T))
        If serializer Is Nothing Then Throw New ArgumentNullException(NameOf(serializer))
        Return CreateRaw(rpc).Select(Function(j As JToken) Result.WrapException(Function() j.ToObject(Of T)(serializer)))
    End Function

    Public Shared Function CreateWithExceptionsFatal(Of T)(rpc As JsonRpc, serializer As JsonSerializer) As IObservable(Of T)
        If serializer Is Nothing Then Throw New ArgumentNullException(NameOf(serializer))
        Return CreateRaw(rpc).Select(Function(j As JToken) j.ToObject(Of T)(serializer))
    End Function
End Class


Friend Class JsonRpcObservable(Of T)
    Implements IObservable(Of T)

    Private ReadOnly rpc As JsonRpc

    Public Sub New(rpc As JsonRpc)
        If rpc Is Nothing Then Throw New ArgumentNullException(NameOf(rpc))
        Me.rpc = rpc
    End Sub

    Public Function Subscribe(observer As IObserver(Of T)) As IDisposable Implements IObservable(Of T).Subscribe
        If observer Is Nothing Then Throw New ArgumentNullException(NameOf(observer))

        Dim cts As New CancellationTokenSource
        Dim p = observer.ToMaterializedProgress()
        Dim subscriptionTask = rpc.InvokeWithCancellationAsync(NameOf(IObservable(Of Object).Subscribe), {p}, cts.Token)

        Dim continuation = subscriptionTask.ContinueWith(Sub(t)
                                                             If t.IsFaulted Then p.Report(CreateOnError(Of T)(t.Exception))
                                                             cts.Dispose()
                                                         End Sub)
        Return Disposable.Create(Sub()
                                     Try
                                         cts.Cancel()
                                     Catch
                                         ' ignore disposed error
                                     End Try
                                     cts.Dispose()
                                 End Sub)

    End Function

End Class

Public Module ObservableExtensions
    <Extension> Public Function ToMaterializedProgress(Of T)(obs As IObserver(Of T)) As IProgress(Of INotification(Of T))
        If obs Is Nothing Then Throw New ArgumentNullException(NameOf(obs))
        ' Do not use Progress(of T) class here as it will post using SynchronizationContext, which will affect ordering.
        Dim hasCompleted = 0
        Return New AnonymousProgress(Of INotification(Of T))(Sub(n)
                                                                 Select Case n.Kind
                                                                     Case NotificationKind.OnNext
                                                                         If hasCompleted <> 0 Then Throw New InvalidOperationException("Once completion notification has been sent, no further notifications may be sent.")
                                                                         obs.OnNext(n.Value)
                                                                     Case NotificationKind.OnError
                                                                         If Interlocked.Exchange(hasCompleted, 1) <> 0 Then Throw New InvalidOperationException("Once completion notification has been sent, no further notifications may be sent.")
                                                                         obs.OnError(n.Exception)
                                                                     Case NotificationKind.OnCompleted
                                                                         If Interlocked.Exchange(hasCompleted, 1) <> 0 Then Throw New InvalidOperationException("Once completion notification has been sent, no further notifications may be sent.")
                                                                         obs.OnCompleted()
                                                                 End Select
                                                             End Sub)
    End Function

Public Interface INotification(Of Out T)
    ReadOnly Property Kind As NotificationKind
    ReadOnly Property Value As T
    ReadOnly Property Exception As Exception
End Interface

Friend Class AnonymousProgress(Of T)
    Implements IProgress(Of T)

    Private ReadOnly reportAction As Action(Of T)

    Friend Sub New(reportAction As Action(Of T))
        If reportAction Is Nothing Then Throw New ArgumentNullException(NameOf(reportAction))
        Me.reportAction = reportAction
    End Sub

    Public Sub Report(value As T) Implements IProgress(Of T).Report
        reportAction(value)
    End Sub
End Class

End Module

Server side:

Imports System.Reactive.Disposables
Imports System.Reactive
Imports System.Reactive.Linq
''' <summary>
''' A streamJsonRpc-compatible interface that is isomorphic to <see cref="IObservable(Of T)"/>.
''' </summary>
''' <typeparam name="T"></typeparam>
Public Interface IObservableJsonRpcWrapper(Of Out T)

    Function Subscribe(progressSubscriber As IProgress(Of INotification(Of T)), token As CancellationToken) As Task

End Interface


Public Class ObservableJsonRpcWrapper(Of T)
    Implements IObservableJsonRpcWrapper(Of T)

    Friend Sub New(source As IObservable(Of T))
        If source Is Nothing Then Throw New ArgumentNullException(NameOf(source))
        Me.source = source
    End Sub


    Private ReadOnly source As IObservable(Of T)

    Public Function Subscribe(progressSubscriber As IProgress(Of INotification(Of T)), token As CancellationToken) As Task Implements IObservableJsonRpcWrapper(Of T).Subscribe
        If progressSubscriber Is Nothing Then Throw New ArgumentNullException(NameOf(progressSubscriber))
        If (Not token.IsCancellationRequested) Then
            Dim obs = progressSubscriber.ToDematerializedObserver()
            source.Subscribe(obs, token)
        End If
        ' The token reprents the disposer of the subscription. We need to return a task that does not complete until the end
        ' of the subscription, as otherwise the connection may be closed. (NB: This note was written from memory, not at time of writing code).
        Return token.ToTask
 ' TODO -should also complete the returned task when the source IObservable completes/errors
    End Function
End Class

Public Module ObservableExtensions
    ''' <summary>
    ''' Creates an <see cref="IObserver(Of T)"/> that, when signalled, passes the message to 
    ''' the <see cref="IProgress(Of T).Report(T)"/> method as a materialized <see cref="Notification"/>. 
    ''' </summary>
    ''' <typeparam name="T"></typeparam>
    ''' <param name="progress"></param>
    ''' <returns></returns>
    <Extension> Public Function ToDematerializedObserver(Of T)(progress As IProgress(Of INotification(Of T))) As IObserver(Of T)
        If progress Is Nothing Then Throw New ArgumentNullException(NameOf(progress))
        Return Observer.Create(Of T)(Sub(v) progress.Report(NotificationCovariant.CreateOnNext(v)),
                                     Sub(e) progress.Report(NotificationCovariant.CreateOnError(Of T)(e)),
                                     Sub() progress.Report(NotificationCovariant.CreateOnCompleted(Of T)()))
    End Function
End Module

Other:

    public static class CancellationTokenExtensions
    {
        public static Task ToTask(this CancellationToken token)
        {
            if (!token.CanBeCanceled) return Task.Delay(-1);
            var tcs = new TaskCompletionSource<bool>();
#pragma warning disable CA2000 // Dispose objects before losing scope
            var dd = new System.Reactive.Disposables.SingleAssignmentDisposable();
#pragma warning restore CA2000 // Dispose objects before losing scope
            dd.Disposable = token.Register(() => 
                                {
                                    tcs.TrySetResult(true);
                                    dd.Dispose();
                                });
            return tcs.Task;
        }


    }

@pcharbon70
Copy link

pcharbon70 commented Aug 24, 2020

This is definitely a feature we would be interested in seeing. Our desktop application is built with Rx from the ground up, so that would allow a seemless integration with the server. Very excited to see it made the 2.6 milestone!

AArnott added a commit to AArnott/vs-streamjsonrpc that referenced this issue Aug 24, 2020
AArnott added a commit to AArnott/vs-streamjsonrpc that referenced this issue Aug 24, 2020
AArnott added a commit to AArnott/vs-streamjsonrpc that referenced this issue Aug 25, 2020
AArnott added a commit to AArnott/vs-streamjsonrpc that referenced this issue Aug 25, 2020
@bddckr
Copy link

bddckr commented Sep 17, 2020

@AArnott A huge thanks for this, it's so good that my team's Rx codebase is basically usable via JSON-RPC in minutes!

The project I'm working on hosts a frontend and we'd like to utilize JSON-RPC for the interop between our .NET side and their TypeScript/JS world. We're just doing some good old browser interop as we're realizing this via platform-specific WebView components.

I've been looking at vscode-jsonrpc. Looks like it already has support for cancellation and disposables, but observables - or more generally marshalled objects - seem to not be compatible yet. Is that correct?

I did notice you seem to have started looking into that already perhaps, according to microsoft/vscode-languageserver-node#661 and microsoft/vscode-languageserver-node#657. Was your intention to get vscode-jsonrpc compliant with the marshalled objects functionality that this .NET library offers now? Any idea on any sort of timeline on that front?
I'm primarily trying to figure out if it's worth getting my frontend colleagues to see whether we can hack in simple support for it that works for us for now, or whether something is cooking already 😄

Sorry if this is the wrong place to discuss, feel free to forward me elsewhere! And thanks again for your work on all of this.

@AArnott
Copy link
Member Author

AArnott commented Sep 18, 2020

I'm so glad to hear it's fitting your requirements, @bddckr.

Yes, maintaining interop with vscode-jsonrpc is a high priority, and we're a bit behind on adding this functionality to that library. We may land somewhere between three ends: enhance that library, document how to do it, and/or wrap that library in another that adds functionality. I'm not sure where/when this will be though. It could easily be a few months away.
If you need it sooner and develop your own solution, I'd love to hear about it.
And if you happen to develop it in a way that can eventually become a PR, so much the better. But I can't make any promises on behalf of vscode-jsonrpc. I don't own that repo, and the owners have specific ideas in mind of what they want to support directly in the library vs. leave it to the app (or a wrapping library) to do. So this functionality may never actually be in their repo directly.

@bddckr
Copy link

bddckr commented Sep 18, 2020

That is more than enough info for me and the team, thanks! I do understand the relationship between the two projects, you and the teams involved. This is however giving me a good overview of where things are currently.

I'll see what our priorities allow and will definitely be coming back to this repo and/or the vscode-jsonrpc library's repo to raise suggestions (and PRs when we can).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants