diff --git a/capability.go b/capability.go index 5cdc80ae..6bdd43b2 100644 --- a/capability.go +++ b/capability.go @@ -203,7 +203,7 @@ func NewPromisedClient(hook ClientHook) (Client, Resolver[Client]) { } // newPromisedClient is the same as NewPromisedClient, but the return -// value exposes the concrete type of the fulfiller. +// value exposes the concrete type of the resolver. func newPromisedClient(hook ClientHook) (Client, *clientPromise) { if hook == nil { panic("NewPromisedClient(nil)") diff --git a/rpc/answer.go b/rpc/answer.go index 2fa285cc..067b5dd2 100644 --- a/rpc/answer.go +++ b/rpc/answer.go @@ -311,12 +311,9 @@ func (ans *answer) prepareSendException(c *lockedConn, rl *releaseList, ex error if e, err := ans.ret.NewException(); err != nil { ans.c.er.ReportError(exc.WrapError("send exception", err)) ans.sendMsg = nil - } else { - e.SetType(rpccp.Exception_Type(exc.TypeOf(ex))) - if err := e.SetReason(ex.Error()); err != nil { - ans.c.er.ReportError(exc.WrapError("send exception", err)) - ans.sendMsg = nil - } + } else if err := e.MarshalError(ex); err != nil { + ans.c.er.ReportError(exc.WrapError("send exception", err)) + ans.sendMsg = nil } } } diff --git a/rpc/export.go b/rpc/export.go index d7f51dcd..d5f5f7e8 100644 --- a/rpc/export.go +++ b/rpc/export.go @@ -5,6 +5,7 @@ import ( "errors" "capnproto.org/go/capnp/v3" + "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/internal/str" "capnproto.org/go/capnp/v3/internal/syncutil" rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" @@ -15,8 +16,12 @@ type exportID uint32 // expent is an entry in a Conn's export table. type expent struct { - client capnp.Client - wireRefs uint32 + client capnp.Client + wireRefs uint32 + isPromise bool + + // Should be called when removing this entry from the exports table: + cancel context.CancelFunc } // A key for use in a client's Metadata, whose value is the export @@ -64,6 +69,7 @@ func (c *lockedConn) releaseExport(id exportID, count uint32) (capnp.Client, err } switch { case count == ent.wireRefs: + defer ent.cancel() client := ent.client c.lk.exports[id] = nil c.lk.exportID.remove(uint32(id)) @@ -148,35 +154,111 @@ func (c *lockedConn) sendCap(d rpccp.CapDescriptor, client capnp.Client) (_ expo } } - // TODO(someday): Check for unresolved client for senderPromise. - - // Default to sender-hosted (export). + // Default to export. state.Metadata.Lock() defer state.Metadata.Unlock() id, ok := c.findExportID(state.Metadata) + var ee *expent if ok { - ent := c.lk.exports[id] - ent.wireRefs++ - d.SetSenderHosted(uint32(id)) - return id, true, nil - } - - // Not already present; allocate an export id for it: - ee := &expent{ - client: client.AddRef(), - wireRefs: 1, + ee = c.lk.exports[id] + ee.wireRefs++ + } else { + // Not already present; allocate an export id for it: + ee = &expent{ + client: client.AddRef(), + wireRefs: 1, + isPromise: state.IsPromise, + cancel: func() {}, + } + id = exportID(c.lk.exportID.next()) + if int64(id) == int64(len(c.lk.exports)) { + c.lk.exports = append(c.lk.exports, ee) + } else { + c.lk.exports[id] = ee + } + c.setExportID(state.Metadata, id) } - id = exportID(c.lk.exportID.next()) - if int64(id) == int64(len(c.lk.exports)) { - c.lk.exports = append(c.lk.exports, ee) + if ee.isPromise { + c.sendSenderPromise(id, client, d) } else { - c.lk.exports[id] = ee + d.SetSenderHosted(uint32(id)) } - c.setExportID(state.Metadata, id) - d.SetSenderHosted(uint32(id)) return id, true, nil } +// sendSenderPromise is a helper for sendCap that handles the senderPromise case. +func (c *lockedConn) sendSenderPromise(id exportID, client capnp.Client, d rpccp.CapDescriptor) { + // Send a promise, wait for the resolution asynchronously, then send + // a resolve message: + ee := c.lk.exports[id] + d.SetSenderPromise(uint32(id)) + ctx, cancel := context.WithCancel(c.bgctx) + ee.cancel = cancel + waitRef := client.AddRef() + go func() { + defer cancel() + defer waitRef.Release() + // Logically we don't hold the lock anymore; it's held by the + // goroutine that spawned this one. So cast back to an unlocked + // Conn before trying to use it again: + unlockedConn := (*Conn)(c) + + waitErr := waitRef.Resolve(ctx) + unlockedConn.withLocked(func(c *lockedConn) { + // Export was removed from the table at some point; + // remote peer is uninterested in the resolution, so + // drop the reference and we're done: + if c.lk.exports[id] != ee { + return + } + + sendRef := waitRef.AddRef() + var ( + resolvedID exportID + isExport bool + ) + c.sendMessage(c.bgctx, func(m rpccp.Message) error { + res, err := m.NewResolve() + if err != nil { + return err + } + res.SetPromiseId(uint32(id)) + if waitErr != nil { + ex, err := res.NewException() + if err != nil { + return err + } + return ex.MarshalError(waitErr) + } + desc, err := res.NewCap() + if err != nil { + return err + } + resolvedID, isExport, err = c.sendCap(desc, sendRef) + return err + }, func(err error) { + sendRef.Release() + if err != nil && isExport { + // release 1 ref of the thing it resolved to. + client, err := withLockedConn2( + unlockedConn, + func(c *lockedConn) (capnp.Client, error) { + return c.releaseExport(resolvedID, 1) + }, + ) + if err != nil { + c.er.ReportError( + exc.WrapError("releasing export due to failure to send resolve", err), + ) + } else { + client.Release() + } + } + }) + }) + }() +} + // fillPayloadCapTable adds descriptors of payload's message's // capabilities into payload's capability table and returns the // reference counts that have been added to the exports table. diff --git a/rpc/internal/testcapnp/test.capnp b/rpc/internal/testcapnp/test.capnp index 39c69ec6..8e5b1a21 100644 --- a/rpc/internal/testcapnp/test.capnp +++ b/rpc/internal/testcapnp/test.capnp @@ -6,6 +6,15 @@ using Go = import "/go.capnp"; $Go.package("testcapnp"); $Go.import("capnproto.org/go/capnp/v3/rpc/internal/testcapnp"); +interface Empty { + # Empty interface, handy for testing shutdown hooks and stuff that just + # needs an arbitrary capability. +} + +interface EmptyProvider { + getEmpty @0 () -> (empty :Empty); +} + interface PingPong { echoNum @0 (n :Int64) -> (n :Int64); } diff --git a/rpc/internal/testcapnp/test.capnp.go b/rpc/internal/testcapnp/test.capnp.go index 9ad3e312..9c40a128 100644 --- a/rpc/internal/testcapnp/test.capnp.go +++ b/rpc/internal/testcapnp/test.capnp.go @@ -12,6 +12,428 @@ import ( context "context" ) +type Empty capnp.Client + +// Empty_TypeID is the unique identifier for the type Empty. +const Empty_TypeID = 0xc8b14e937b2cb741 + +func (c Empty) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + +// String returns a string that identifies this capability for debugging +// purposes. Its format should not be depended on: in particular, it +// should not be used to compare clients. Use IsSame to compare clients +// for equality. +func (c Empty) String() string { + return "Empty(" + capnp.Client(c).String() + ")" +} + +// AddRef creates a new Client that refers to the same capability as c. +// If c is nil or has resolved to null, then AddRef returns nil. +func (c Empty) AddRef() Empty { + return Empty(capnp.Client(c).AddRef()) +} + +// Release releases a capability reference. If this is the last +// reference to the capability, then the underlying resources associated +// with the capability will be released. +// +// Release will panic if c has already been released, but not if c is +// nil or resolved to null. +func (c Empty) Release() { + capnp.Client(c).Release() +} + +// Resolve blocks until the capability is fully resolved or the Context +// expires. +func (c Empty) Resolve(ctx context.Context) error { + return capnp.Client(c).Resolve(ctx) +} + +func (c Empty) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Client(c).EncodeAsPtr(seg) +} + +func (Empty) DecodeFromPtr(p capnp.Ptr) Empty { + return Empty(capnp.Client{}.DecodeFromPtr(p)) +} + +// IsValid reports whether c is a valid reference to a capability. +// A reference is invalid if it is nil, has resolved to null, or has +// been released. +func (c Empty) IsValid() bool { + return capnp.Client(c).IsValid() +} + +// IsSame reports whether c and other refer to a capability created by the +// same call to NewClient. This can return false negatives if c or other +// are not fully resolved: use Resolve if this is an issue. If either +// c or other are released, then IsSame panics. +func (c Empty) IsSame(other Empty) bool { + return capnp.Client(c).IsSame(capnp.Client(other)) +} + +// Update the flowcontrol.FlowLimiter used to manage flow control for +// this client. This affects all future calls, but not calls already +// waiting to send. Passing nil sets the value to flowcontrol.NopLimiter, +// which is also the default. +func (c Empty) SetFlowLimiter(lim fc.FlowLimiter) { + capnp.Client(c).SetFlowLimiter(lim) +} + +// Get the current flowcontrol.FlowLimiter used to manage flow control +// for this client. +func (c Empty) GetFlowLimiter() fc.FlowLimiter { + return capnp.Client(c).GetFlowLimiter() +} + +// A Empty_Server is a Empty with a local implementation. +type Empty_Server interface { +} + +// Empty_NewServer creates a new Server from an implementation of Empty_Server. +func Empty_NewServer(s Empty_Server) *server.Server { + c, _ := s.(server.Shutdowner) + return server.New(Empty_Methods(nil, s), s, c) +} + +// Empty_ServerToClient creates a new Client from an implementation of Empty_Server. +// The caller is responsible for calling Release on the returned Client. +func Empty_ServerToClient(s Empty_Server) Empty { + return Empty(capnp.NewClient(Empty_NewServer(s))) +} + +// Empty_Methods appends Methods to a slice that invoke the methods on s. +// This can be used to create a more complicated Server. +func Empty_Methods(methods []server.Method, s Empty_Server) []server.Method { + if cap(methods) == 0 { + methods = make([]server.Method, 0, 0) + } + + return methods +} + +// Empty_List is a list of Empty. +type Empty_List = capnp.CapList[Empty] + +// NewEmpty creates a new list of Empty. +func NewEmpty_List(s *capnp.Segment, sz int32) (Empty_List, error) { + l, err := capnp.NewPointerList(s, sz) + return capnp.CapList[Empty](l), err +} + +type EmptyProvider capnp.Client + +// EmptyProvider_TypeID is the unique identifier for the type EmptyProvider. +const EmptyProvider_TypeID = 0xea38d4d6dca1e80e + +func (c EmptyProvider) GetEmpty(ctx context.Context, params func(EmptyProvider_getEmpty_Params) error) (EmptyProvider_getEmpty_Results_Future, capnp.ReleaseFunc) { + + s := capnp.Send{ + Method: capnp.Method{ + InterfaceID: 0xea38d4d6dca1e80e, + MethodID: 0, + InterfaceName: "test.capnp:EmptyProvider", + MethodName: "getEmpty", + }, + } + if params != nil { + s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} + s.PlaceArgs = func(s capnp.Struct) error { return params(EmptyProvider_getEmpty_Params(s)) } + } + + ans, release := capnp.Client(c).SendCall(ctx, s) + return EmptyProvider_getEmpty_Results_Future{Future: ans.Future()}, release + +} + +func (c EmptyProvider) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + +// String returns a string that identifies this capability for debugging +// purposes. Its format should not be depended on: in particular, it +// should not be used to compare clients. Use IsSame to compare clients +// for equality. +func (c EmptyProvider) String() string { + return "EmptyProvider(" + capnp.Client(c).String() + ")" +} + +// AddRef creates a new Client that refers to the same capability as c. +// If c is nil or has resolved to null, then AddRef returns nil. +func (c EmptyProvider) AddRef() EmptyProvider { + return EmptyProvider(capnp.Client(c).AddRef()) +} + +// Release releases a capability reference. If this is the last +// reference to the capability, then the underlying resources associated +// with the capability will be released. +// +// Release will panic if c has already been released, but not if c is +// nil or resolved to null. +func (c EmptyProvider) Release() { + capnp.Client(c).Release() +} + +// Resolve blocks until the capability is fully resolved or the Context +// expires. +func (c EmptyProvider) Resolve(ctx context.Context) error { + return capnp.Client(c).Resolve(ctx) +} + +func (c EmptyProvider) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Client(c).EncodeAsPtr(seg) +} + +func (EmptyProvider) DecodeFromPtr(p capnp.Ptr) EmptyProvider { + return EmptyProvider(capnp.Client{}.DecodeFromPtr(p)) +} + +// IsValid reports whether c is a valid reference to a capability. +// A reference is invalid if it is nil, has resolved to null, or has +// been released. +func (c EmptyProvider) IsValid() bool { + return capnp.Client(c).IsValid() +} + +// IsSame reports whether c and other refer to a capability created by the +// same call to NewClient. This can return false negatives if c or other +// are not fully resolved: use Resolve if this is an issue. If either +// c or other are released, then IsSame panics. +func (c EmptyProvider) IsSame(other EmptyProvider) bool { + return capnp.Client(c).IsSame(capnp.Client(other)) +} + +// Update the flowcontrol.FlowLimiter used to manage flow control for +// this client. This affects all future calls, but not calls already +// waiting to send. Passing nil sets the value to flowcontrol.NopLimiter, +// which is also the default. +func (c EmptyProvider) SetFlowLimiter(lim fc.FlowLimiter) { + capnp.Client(c).SetFlowLimiter(lim) +} + +// Get the current flowcontrol.FlowLimiter used to manage flow control +// for this client. +func (c EmptyProvider) GetFlowLimiter() fc.FlowLimiter { + return capnp.Client(c).GetFlowLimiter() +} + +// A EmptyProvider_Server is a EmptyProvider with a local implementation. +type EmptyProvider_Server interface { + GetEmpty(context.Context, EmptyProvider_getEmpty) error +} + +// EmptyProvider_NewServer creates a new Server from an implementation of EmptyProvider_Server. +func EmptyProvider_NewServer(s EmptyProvider_Server) *server.Server { + c, _ := s.(server.Shutdowner) + return server.New(EmptyProvider_Methods(nil, s), s, c) +} + +// EmptyProvider_ServerToClient creates a new Client from an implementation of EmptyProvider_Server. +// The caller is responsible for calling Release on the returned Client. +func EmptyProvider_ServerToClient(s EmptyProvider_Server) EmptyProvider { + return EmptyProvider(capnp.NewClient(EmptyProvider_NewServer(s))) +} + +// EmptyProvider_Methods appends Methods to a slice that invoke the methods on s. +// This can be used to create a more complicated Server. +func EmptyProvider_Methods(methods []server.Method, s EmptyProvider_Server) []server.Method { + if cap(methods) == 0 { + methods = make([]server.Method, 0, 1) + } + + methods = append(methods, server.Method{ + Method: capnp.Method{ + InterfaceID: 0xea38d4d6dca1e80e, + MethodID: 0, + InterfaceName: "test.capnp:EmptyProvider", + MethodName: "getEmpty", + }, + Impl: func(ctx context.Context, call *server.Call) error { + return s.GetEmpty(ctx, EmptyProvider_getEmpty{call}) + }, + }) + + return methods +} + +// EmptyProvider_getEmpty holds the state for a server call to EmptyProvider.getEmpty. +// See server.Call for documentation. +type EmptyProvider_getEmpty struct { + *server.Call +} + +// Args returns the call's arguments. +func (c EmptyProvider_getEmpty) Args() EmptyProvider_getEmpty_Params { + return EmptyProvider_getEmpty_Params(c.Call.Args()) +} + +// AllocResults allocates the results struct. +func (c EmptyProvider_getEmpty) AllocResults() (EmptyProvider_getEmpty_Results, error) { + r, err := c.Call.AllocResults(capnp.ObjectSize{DataSize: 0, PointerCount: 1}) + return EmptyProvider_getEmpty_Results(r), err +} + +// EmptyProvider_List is a list of EmptyProvider. +type EmptyProvider_List = capnp.CapList[EmptyProvider] + +// NewEmptyProvider creates a new list of EmptyProvider. +func NewEmptyProvider_List(s *capnp.Segment, sz int32) (EmptyProvider_List, error) { + l, err := capnp.NewPointerList(s, sz) + return capnp.CapList[EmptyProvider](l), err +} + +type EmptyProvider_getEmpty_Params capnp.Struct + +// EmptyProvider_getEmpty_Params_TypeID is the unique identifier for the type EmptyProvider_getEmpty_Params. +const EmptyProvider_getEmpty_Params_TypeID = 0x9a27082d77b8c289 + +func NewEmptyProvider_getEmpty_Params(s *capnp.Segment) (EmptyProvider_getEmpty_Params, error) { + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) + return EmptyProvider_getEmpty_Params(st), err +} + +func NewRootEmptyProvider_getEmpty_Params(s *capnp.Segment) (EmptyProvider_getEmpty_Params, error) { + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) + return EmptyProvider_getEmpty_Params(st), err +} + +func ReadRootEmptyProvider_getEmpty_Params(msg *capnp.Message) (EmptyProvider_getEmpty_Params, error) { + root, err := msg.Root() + return EmptyProvider_getEmpty_Params(root.Struct()), err +} + +func (s EmptyProvider_getEmpty_Params) String() string { + str, _ := text.Marshal(0x9a27082d77b8c289, capnp.Struct(s)) + return str +} + +func (s EmptyProvider_getEmpty_Params) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Struct(s).EncodeAsPtr(seg) +} + +func (EmptyProvider_getEmpty_Params) DecodeFromPtr(p capnp.Ptr) EmptyProvider_getEmpty_Params { + return EmptyProvider_getEmpty_Params(capnp.Struct{}.DecodeFromPtr(p)) +} + +func (s EmptyProvider_getEmpty_Params) ToPtr() capnp.Ptr { + return capnp.Struct(s).ToPtr() +} +func (s EmptyProvider_getEmpty_Params) IsValid() bool { + return capnp.Struct(s).IsValid() +} + +func (s EmptyProvider_getEmpty_Params) Message() *capnp.Message { + return capnp.Struct(s).Message() +} + +func (s EmptyProvider_getEmpty_Params) Segment() *capnp.Segment { + return capnp.Struct(s).Segment() +} + +// EmptyProvider_getEmpty_Params_List is a list of EmptyProvider_getEmpty_Params. +type EmptyProvider_getEmpty_Params_List = capnp.StructList[EmptyProvider_getEmpty_Params] + +// NewEmptyProvider_getEmpty_Params creates a new list of EmptyProvider_getEmpty_Params. +func NewEmptyProvider_getEmpty_Params_List(s *capnp.Segment, sz int32) (EmptyProvider_getEmpty_Params_List, error) { + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) + return capnp.StructList[EmptyProvider_getEmpty_Params](l), err +} + +// EmptyProvider_getEmpty_Params_Future is a wrapper for a EmptyProvider_getEmpty_Params promised by a client call. +type EmptyProvider_getEmpty_Params_Future struct{ *capnp.Future } + +func (f EmptyProvider_getEmpty_Params_Future) Struct() (EmptyProvider_getEmpty_Params, error) { + p, err := f.Future.Ptr() + return EmptyProvider_getEmpty_Params(p.Struct()), err +} + +type EmptyProvider_getEmpty_Results capnp.Struct + +// EmptyProvider_getEmpty_Results_TypeID is the unique identifier for the type EmptyProvider_getEmpty_Results. +const EmptyProvider_getEmpty_Results_TypeID = 0x93281cc60d6060cd + +func NewEmptyProvider_getEmpty_Results(s *capnp.Segment) (EmptyProvider_getEmpty_Results, error) { + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) + return EmptyProvider_getEmpty_Results(st), err +} + +func NewRootEmptyProvider_getEmpty_Results(s *capnp.Segment) (EmptyProvider_getEmpty_Results, error) { + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) + return EmptyProvider_getEmpty_Results(st), err +} + +func ReadRootEmptyProvider_getEmpty_Results(msg *capnp.Message) (EmptyProvider_getEmpty_Results, error) { + root, err := msg.Root() + return EmptyProvider_getEmpty_Results(root.Struct()), err +} + +func (s EmptyProvider_getEmpty_Results) String() string { + str, _ := text.Marshal(0x93281cc60d6060cd, capnp.Struct(s)) + return str +} + +func (s EmptyProvider_getEmpty_Results) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Struct(s).EncodeAsPtr(seg) +} + +func (EmptyProvider_getEmpty_Results) DecodeFromPtr(p capnp.Ptr) EmptyProvider_getEmpty_Results { + return EmptyProvider_getEmpty_Results(capnp.Struct{}.DecodeFromPtr(p)) +} + +func (s EmptyProvider_getEmpty_Results) ToPtr() capnp.Ptr { + return capnp.Struct(s).ToPtr() +} +func (s EmptyProvider_getEmpty_Results) IsValid() bool { + return capnp.Struct(s).IsValid() +} + +func (s EmptyProvider_getEmpty_Results) Message() *capnp.Message { + return capnp.Struct(s).Message() +} + +func (s EmptyProvider_getEmpty_Results) Segment() *capnp.Segment { + return capnp.Struct(s).Segment() +} +func (s EmptyProvider_getEmpty_Results) Empty() Empty { + p, _ := capnp.Struct(s).Ptr(0) + return Empty(p.Interface().Client()) +} + +func (s EmptyProvider_getEmpty_Results) HasEmpty() bool { + return capnp.Struct(s).HasPtr(0) +} + +func (s EmptyProvider_getEmpty_Results) SetEmpty(v Empty) error { + if !v.IsValid() { + return capnp.Struct(s).SetPtr(0, capnp.Ptr{}) + } + seg := s.Segment() + in := capnp.NewInterface(seg, seg.Message().AddCap(capnp.Client(v))) + return capnp.Struct(s).SetPtr(0, in.ToPtr()) +} + +// EmptyProvider_getEmpty_Results_List is a list of EmptyProvider_getEmpty_Results. +type EmptyProvider_getEmpty_Results_List = capnp.StructList[EmptyProvider_getEmpty_Results] + +// NewEmptyProvider_getEmpty_Results creates a new list of EmptyProvider_getEmpty_Results. +func NewEmptyProvider_getEmpty_Results_List(s *capnp.Segment, sz int32) (EmptyProvider_getEmpty_Results_List, error) { + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}, sz) + return capnp.StructList[EmptyProvider_getEmpty_Results](l), err +} + +// EmptyProvider_getEmpty_Results_Future is a wrapper for a EmptyProvider_getEmpty_Results promised by a client call. +type EmptyProvider_getEmpty_Results_Future struct{ *capnp.Future } + +func (f EmptyProvider_getEmpty_Results_Future) Struct() (EmptyProvider_getEmpty_Results, error) { + p, err := f.Future.Ptr() + return EmptyProvider_getEmpty_Results(p.Struct()), err +} +func (p EmptyProvider_getEmpty_Results_Future) Empty() Empty { + return Empty(p.Future.Field(0, nil).Client()) +} + type PingPong capnp.Client // PingPong_TypeID is the unique identifier for the type PingPong. @@ -1376,50 +1798,61 @@ func (p PingPongProvider_pingPong_Results_Future) PingPong() PingPong { return PingPong(p.Future.Field(0, nil).Client()) } -const schema_ef12a34b9807e19c = "x\xda\x94TMH\x14o\x18\x7f\x9ey\xdf\xf9\x8f\xf2" + - "o\xd9\xde\x1d\xe9\x9b>D;H,\xa9\x04e\xd9j" + - "a\x0bF\xb2cz\xa8\x0e1\xe9\xa8k;\xeb2\xb3" + - "\x9b\x14\x94]\xc4S\x07\x0fiFt\xb0\x88:t(" + - "($/%\x05E\x12\x1d\x8a\x08\x93\xd2S\x11\x98\xd5" + - "E\x88\x9ax\xdf\xd9qg\xfd\x82\xae\xfb<\xf3\xfbz" + - "~\xef\xee|\x815\xb4<\x10Q@\xd2\x9a\xe4\xff\x9c" + - "\xe1\x8e\xbex\xc3\x85\x82\x8b\xc0\xd6\"\x80\x8c\x0a@e" + - "/\xd9\x80\x80\xea%\x12\x01\xfc\xb3\xbdtb\xe0\xf7d" + - "\xafV\x84\x08@\xf9\xf8.)\xe6\xe3\x07|\xec\xec\xa9" + - "\xf8\xd2]Z\xf4\xf02\xb0\xff\x89smJ\xb9r\xf8" + - "F\xe8\x1b\x00\xaao\xc9\xb4:E\x14\x00u\x92D\xd5" + - "B\xfe\xa5\x13\xd8:;\x16x\xf6k\xc0%\x13`\xdf" + - "9\x18u*\x98\xf3R\x1e?4\xe8\x97\xf1\xde\xe5\x99" + - "\x12<\xe3\xab;\xfb{^\x9d\x1eY\xc4\x83\xf4\x91\x8b" + - "\xae\xca\xb4Om\x16<\xc6\xcd\x19\x98\x98\xd5u&\xae\xb4\x1aV\x0cQ\xa3" + - "D\xf6e\x8f^\x04\x8c\xd5\x83\xc4\x0a\x15'\x95\xfd\x08" + - "\x00j0\x86\xb8\xb2EO\xd8\xb2[\xb6\x91h[R" + - "~YN~\x90/!\xcbU\x0d\x10\x99\xcf\x0bz\x98" + - "\x11\x17\x94\xdb(\x106\xbcG\x84^\xc1Yy\x19H" + - "\xacT\xc1\\G\xd0\xab8[\xcfg\x01%\xc8\x95\xd7" + - "\xb8\xac\xf9\x169\xd1\xd1\xb4e\xe8[L\x8f\xc7\x8d\xcb" + - "\xeb\x01&\xef=\xee\xae\xbczr\x881\x8e%+\xc1" + - "T\xc6\xee\xc8\x07\xa1\x0b\x93o5\xac\xb0\x97*\xef\x84" + - "\xa2\x9b\xf6\xca\xe7^\xa28+_{\xc9\xd4\xb3(y" + - "\xee\x04\x95\xc2\x95\xcd{\xf3\x1e\x1bB\xf6\x1f\x87\xb1\x03" + - "\xc2[OV\xce\xbf\xd8k4\xec`f\xc1\xa5\xeb\x01" + - "\xb4U\x04\xb5u\x12\xfa\xdb\x85,\xf7X\x17\x1c\x9cx" + - "w\x10g\x08\xf3\x8c\x85\x1b\x13\x97\xadP\xab\x9e\xd61" + - "\x00\x12\x06\x00\xff\x06\x00\x00\xff\xff\xfe*\x86\xac" +const schema_ef12a34b9807e19c = "x\xda|T]h#U\x14>g\xe6^'E\xc7" + + "x3E\xcd\xbau\xed\xb2\xf1gY\x83\xe9RX\xeb" + + "O\xbb\x8a\x06+\x94\x99\xaa\x0f*\xe2\x8e\xcdl\x9a5" + + "I\x87\xcc\xc4\xa2\xa2\xf5eY\xf4mY\xdc\xd5E\x14" + + "W\x11}\xac\xa0VKA\x8bJ\xd5V\x14\x8a>\xb4" + + "E-\x08U\xa1V}\xa9\x88\x8e\xdc;\xb9\xc9$i" + + "\xf36\xcc\xfd\xeew\xbe\xef\x9c\xef\x9e\x9bnP\x86H" + + "F\x7f-\x06\x8a\x95\xa3\x17\x05\x17\xc6O\x15F\x9e\x8d" + + "=\x07\xec\x0a\x04\xa0\xa8\x01\x1c\xee'{\x10\xd0\xb8\x8d" + + "\x0c\x02\xfewmj\xe5\xec\xbfk'\xadnD\x00\xc2" + + "\x8f\x1f!\xfb\xf9\xb1\xc3\x8f\x83\xa5c\xc7\xf4\xcf\xf7^" + + "\x7f\x06X\xb2~\xff$\x19\xe5\x80\xd3\x02ps\xdf/" + + "\x93\xa9\xee\xf7_\x04v\xb1\x1a\xbc\xf2\x93\xf6\xd2\xbdo" + + "$~\x07@c\x9a\xac\x1bs\x9c\xd0\xf8\x90d\x8d\x9f" + + "\xf9W\xa0_\xb35\xaf\x7f\xf6\xcf\xd9P\x8d\xa8\xb6\xc4" + + "\xab\x91\xa0\x8f\x05_\xd1\xc5\xbb\xcfEu\xbe\x17\x0a\x99" + + "\x13u\x9e\x9f\x9f\x99\xbc1v\xdd\xf9P\x88\xb8\xbaF" + + "\x86\xf9\xd5\xc5\xcbN\x9c\x9e\xfa\xfa\xf1\x996\x05_\x90" + + "\x8f\x8co\x85\x82%r\xca\xe8\xa1\\\x81\xf3\xe6&>" + + "\xfc\xf6\xad\xb3m`J\xdf5t\x0e1\xbah\xd6\xc8" + + "\x08\xf0\xd1\x0f\x0e=}fdz\xa1\x0d\x9c\xa4/\x18" + + "\xbd\x02\xdcC\xb5\x1a\xf3\xa1\xbf\xae\x9e\xfa\xa4\x7fc\x19" + + "\xd8^)\x90\xd2\xc7\xb8\xc0\xde\xae\x1f^}\xe7\xc7s" + + "\xdfC\xa4\xc9\x7f\x843\xf8[x{fv\xe0\xf8C" + + "\xb7?\xb0\x1eiK\x92\xee\xe1W/\xddx}\xf5\xbb" + + "\xe5#\xbf\xb6)@\xfaeDn\xbfP\xf0\xd4\xe6\xf6" + + "\xbc\xff)\xd9j\x03\xf7\xd0\x0bFJ\x80{i\xd6\xb8" + + "G\x80\x7f\xfbf\xa6tK\xb6\xf0g(7lx\x86" + + "\x9e\x10\xc1\xa0\\\xd4\xcaU\x95\x85\xb7V\x8fl\x03\xbb" + + "\xbc\x0e\xb0i\x82\x03\x0at\x10\x1e\x0c|\xc7\xf3\xd3c" + + "\xb6\xab\x96\xdd\x81;m\xf7h%\xef\xdd\x1f\xfe*\x16" + + "\x0f\x98v\xc5VK\x9eET\x02@\x10\x80\xe9\xfb\x01" + + "\xac\x98\x8aV\xb7\x82\xda\x98\xedb\x82\xa8\x80\x98\x00l" + + "b2\x0b\xe5\xbc9Q\xce\xa7\x9d\xb1\xf1\x89\x91j\xe9" + + "\xc0\xa8\xe3U\xb5\xa2\xdfD\x95hPa\x19)(H" + + "#4\xa4\xec\x0e\xdcUr\xfd'\xcd\xca\xc4\x13\x85\x9c" + + "SI\xe7\x1d_\xfc\x10dE\x1f\x9b\xc8\xfa\x1ad\xfb" + + "\x1c\x8eB\xd6\x98< \xb2\x08\xb5\x12Q(\xd8\xb5\x9c" + + "S1\x11-\xa2\xd2H\x06Pv\x97\xb1aPX\x97" + + "\x16\xb8\xb5K\x000\x84&b\xe7\xeeI\xcf\xbb\xa2<" + + "\xa7x|\xc7\xce\x1cl\x98\x89s\x10\xb2\xc6\xfbh\xf1" + + "\xd2\xa1M|z%\x0f\xa0\x8eEY\x7f0\x14\xc0-" + + "\xc7\x84e\xb9`P\xbem\x969\x08\x0aKi\xd8\xc8" + + "5\xca\xd7\xcd\x92\xfcL\xd7\xe2\xdc\xe5P\xa8\xb0\xb9\x1d" + + "\xbc\xd0}~\xc5\xb1\xf7\x95d\x9d\xb0\xb52\x8eX\x9e" + + "\xfex\xf2\xf0\xf9G_f\x8csQ-\xeeV\xbd\xf1" + + "v\x12a\x04\xc0D4U\x1a=$\xad#\xe4\xce\xe5" + + "x\xb8s\xcd.y\x9d#\xb9C\xb8;'r\xc7\xf1" + + "\xd5X\x9a\xb2\xd54\x0f\xa1\xbef_\xae?\x94\x0b\xb9" + + "\x9e,9\xb4\xb6d\xa1\x94\xaeq\xa7u&\xb9\x8d\x10" + + "j\xab\x9f\xb1;D#\xa7j\xf6\x9aY:\xb7k\xd4" + + "\xf1\xe2\xd5\x96\x08\x0e\x03X\x97\xa8h]\xa9`4\xf6" + + "\xc8\x1a\x0b\xaa%\x89\xaa\x1c\xba\x98y\x9a\x0f4\x8c " + + "\xee\x9a\xed\x9c\xed\xdb\xa8\x83\x82:\xe0\xff\x01\x00\x00\xff" + + "\xff\xe4@\xf6j" func RegisterSchema(reg *schemas.Registry) { reg.Register(&schemas.Schema{ @@ -1427,14 +1860,18 @@ func RegisterSchema(reg *schemas.Registry) { Nodes: []uint64{ 0x80087e4e698768a2, 0x85ddfd96db252600, + 0x93281cc60d6060cd, 0x95b6142577e93239, 0x96fbc50dc2f0200d, 0x9746cc05cbff1132, + 0x9a27082d77b8c289, 0xb86bce7f916a10cc, 0xbb3ca85b01eea465, + 0xc8b14e937b2cb741, 0xd4e835c17f1ef32c, 0xd797e0a99edf0921, 0xe2553e5a663abb7d, + 0xea38d4d6dca1e80e, 0xf004c474c2f8ee7a, 0xf269473b6db8d0eb, 0xf838dca6c8721bdb, diff --git a/rpc/rpc.go b/rpc/rpc.go index 56698b0f..a30992b6 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -593,7 +593,9 @@ func (c *Conn) receive(ctx context.Context) func() error { switch in.Message.Which() { case rpccp.Message_Which_unimplemented: - // no-op for now to avoid feedback loop + if err := c.handleUnimplemented(in); err != nil { + return err + } case rpccp.Message_Which_abort: c.handleAbort(in) @@ -661,6 +663,48 @@ func (c *Conn) handleAbort(in transport.IncomingMessage) { c.er.ReportError(exc.New(exc.Type(e.Type()), "rpc", "remote abort: "+reason)) } +func (c *Conn) handleUnimplemented(in transport.IncomingMessage) error { + defer in.Release() + + msg, err := in.Message.Unimplemented() + if err != nil { + return exc.WrapError("read unimplemented", err) + } + if msg.Which() == rpccp.Message_Which_resolve { + // If we get unimplemented for a resolve message, we should + // release the reference we sent, since it won't be used. + resolve, err := msg.Resolve() + if err != nil { + return exc.WrapError("read unimplemented.resolve", err) + } + if resolve.Which() == rpccp.Resolve_Which_cap { + desc, err := resolve.Cap() + if err != nil { + return exc.WrapError("read unimplemented.resolve.cap", err) + } + var id exportID + switch desc.Which() { + case rpccp.CapDescriptor_Which_senderHosted: + id = exportID(desc.SenderHosted()) + case rpccp.CapDescriptor_Which_senderPromise: + id = exportID(desc.SenderPromise()) + default: + return nil + } + client, err := withLockedConn2(c, func(c *lockedConn) (capnp.Client, error) { + return c.releaseExport(id, 1) + }) + if err != nil { + return err + } + client.Release() + return nil + } + } + // For other cases we should just ignore the message. + return nil +} + func (c *Conn) handleBootstrap(in transport.IncomingMessage) error { defer in.Release() diff --git a/rpc/senderpromise_test.go b/rpc/senderpromise_test.go new file mode 100644 index 00000000..3117ec28 --- /dev/null +++ b/rpc/senderpromise_test.go @@ -0,0 +1,216 @@ +package rpc_test + +import ( + "context" + "testing" + + "capnproto.org/go/capnp/v3" + "capnproto.org/go/capnp/v3/rpc" + "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" + "capnproto.org/go/capnp/v3/rpc/transport" + rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" + "github.com/stretchr/testify/assert" +) + +func TestSenderPromiseFulfill(t *testing.T) { + t.Parallel() + + ctx := context.Background() + p, r := capnp.NewLocalPromise[testcapnp.PingPong]() + + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + + conn := rpc.NewConn(p1, &rpc.Options{ + ErrorReporter: testErrorReporter{tb: t}, + BootstrapClient: capnp.Client(p), + }) + defer finishTest(t, conn, p2) + + // 1. Send bootstrap. + { + msg := &rpcMessage{ + Which: rpccp.Message_Which_bootstrap, + Bootstrap: &rpcBootstrap{QuestionID: 0}, + } + assert.NoError(t, sendMessage(ctx, p2, msg)) + } + // 2. Receive return. + var bootExportID uint32 + { + rmsg, release, err := recvMessage(ctx, p2) + assert.NoError(t, err) + defer release() + assert.Equal(t, rpccp.Message_Which_return, rmsg.Which) + assert.Equal(t, uint32(0), rmsg.Return.AnswerID) + assert.Equal(t, rpccp.Return_Which_results, rmsg.Return.Which) + assert.Equal(t, 1, len(rmsg.Return.Results.CapTable)) + desc := rmsg.Return.Results.CapTable[0] + assert.Equal(t, rpccp.CapDescriptor_Which_senderPromise, desc.Which) + bootExportID = desc.SenderPromise + } + // 3. Fulfill promise + { + pp := testcapnp.PingPong_ServerToClient(&pingPonger{}) + defer pp.Release() + r.Fulfill(pp) + } + // 4. Receive resolve. + { + rmsg, release, err := recvMessage(ctx, p2) + assert.NoError(t, err) + defer release() + assert.Equal(t, rpccp.Message_Which_resolve, rmsg.Which) + assert.Equal(t, bootExportID, rmsg.Resolve.PromiseID) + assert.Equal(t, rpccp.Resolve_Which_cap, rmsg.Resolve.Which) + desc := rmsg.Resolve.Cap + assert.Equal(t, rpccp.CapDescriptor_Which_senderHosted, desc.Which) + assert.NotEqual(t, bootExportID, desc.SenderHosted) + } +} + +// Tests that if we get an unimplemented message in response to a resolve message, we correctly +// drop the capability. +func TestResolveUnimplementedDrop(t *testing.T) { + t.Parallel() + + ctx := context.Background() + p, r := capnp.NewLocalPromise[testcapnp.Empty]() + + provider := testcapnp.EmptyProvider_ServerToClient(emptyShutdownerProvider{ + result: p, + }) + + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + + conn := rpc.NewConn(p1, &rpc.Options{ + ErrorReporter: testErrorReporter{tb: t}, + BootstrapClient: capnp.Client(provider), + }) + defer finishTest(t, conn, p2) + + // 1. Send bootstrap. + { + msg := &rpcMessage{ + Which: rpccp.Message_Which_bootstrap, + Bootstrap: &rpcBootstrap{QuestionID: 0}, + } + assert.NoError(t, sendMessage(ctx, p2, msg)) + } + // 2. Receive return. + var bootExportID uint32 + { + rmsg, release, err := recvMessage(ctx, p2) + assert.NoError(t, err) + defer release() + assert.Equal(t, rpccp.Message_Which_return, rmsg.Which) + assert.Equal(t, uint32(0), rmsg.Return.AnswerID) + assert.Equal(t, rpccp.Return_Which_results, rmsg.Return.Which) + assert.Equal(t, 1, len(rmsg.Return.Results.CapTable)) + desc := rmsg.Return.Results.CapTable[0] + assert.Equal(t, rpccp.CapDescriptor_Which_senderHosted, desc.Which) + bootExportID = desc.SenderHosted + } + onShutdown := make(chan struct{}) + // 3. Send finish + { + assert.NoError(t, sendMessage(ctx, p2, &rpcMessage{ + Which: rpccp.Message_Which_finish, + Finish: &rpcFinish{ + QuestionID: 0, + ReleaseResultCaps: false, + }, + })) + } + // 4. Call getEmpty + { + assert.NoError(t, sendMessage(ctx, p2, &rpcMessage{ + Which: rpccp.Message_Which_call, + Call: &rpcCall{ + QuestionID: 1, + Target: rpcMessageTarget{ + Which: rpccp.MessageTarget_Which_importedCap, + ImportedCap: bootExportID, + }, + InterfaceID: testcapnp.EmptyProvider_TypeID, + MethodID: 0, + Params: rpcPayload{}, + }, + })) + } + // 5. Receive return. + var emptyExportID uint32 + { + rmsg, release, err := recvMessage(ctx, p2) + assert.NoError(t, err) + defer release() + assert.Equal(t, uint32(1), rmsg.Return.AnswerID) + assert.Equal(t, rpccp.Return_Which_results, rmsg.Return.Which) + assert.Nil(t, rmsg.Return.Exception) + assert.Equal(t, 1, len(rmsg.Return.Results.CapTable)) + desc := rmsg.Return.Results.CapTable[0] + assert.Equal(t, rpccp.CapDescriptor_Which_senderPromise, desc.Which) + emptyExportID = desc.SenderPromise + } + // 7. Fulfill promise + { + pp := testcapnp.Empty_ServerToClient(emptyShutdowner{ + onShutdown: onShutdown, + }) + r.Fulfill(pp) + pp.Release() + } + // 8. Receive resolve, send unimplemented + { + rmsg, release, err := recvMessage(ctx, p2) + assert.NoError(t, err) + defer release() + assert.Equal(t, rpccp.Message_Which_resolve, rmsg.Which) + assert.Equal(t, emptyExportID, rmsg.Resolve.PromiseID) + assert.Equal(t, rpccp.Resolve_Which_cap, rmsg.Resolve.Which) + desc := rmsg.Resolve.Cap + assert.Equal(t, rpccp.CapDescriptor_Which_senderHosted, desc.Which) + assert.NoError(t, sendMessage(ctx, p2, &rpcMessage{ + Which: rpccp.Message_Which_unimplemented, + Unimplemented: rmsg, + })) + } + // 9. Drop the promise on our side. Otherwise it will stay alive because of + // the bootstrap interface: + { + p.Release() + } + // 6. Send finish + { + assert.NoError(t, sendMessage(ctx, p2, &rpcMessage{ + Which: rpccp.Message_Which_finish, + Finish: &rpcFinish{ + QuestionID: 1, + ReleaseResultCaps: true, + }, + })) + } + <-onShutdown // Will hang unless the capability is dropped +} + +type emptyShutdownerProvider struct { + result testcapnp.Empty +} + +func (e emptyShutdownerProvider) GetEmpty(ctx context.Context, p testcapnp.EmptyProvider_getEmpty) error { + results, err := p.AllocResults() + if err != nil { + return err + } + results.SetEmpty(e.result) + return nil +} + +type emptyShutdowner struct { + onShutdown chan<- struct{} // closed on shutdown +} + +func (s emptyShutdowner) Shutdown() { + close(s.onShutdown) +} diff --git a/std/capnp/rpc/exception.go b/std/capnp/rpc/exception.go new file mode 100644 index 00000000..0d80a52d --- /dev/null +++ b/std/capnp/rpc/exception.go @@ -0,0 +1,10 @@ +package rpc + +import "capnproto.org/go/capnp/v3/exc" + +// MarshalError fills in the fields of e according to err. Returns a non-nil +// error if marshalling fails. +func (e Exception) MarshalError(err error) error { + e.SetType(Exception_Type(exc.TypeOf(err))) + return e.SetReason(err.Error()) +}