Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send promise resolution messages #471

Merged
merged 16 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func NewPromisedClient(hook ClientHook) (Client, Resolver[Client]) {
}

// newPromisedClient is the same as NewPromisedClient, but the return
// value exposes the concrete type of the fulfiller.
// value exposes the concrete type of the resolver.
func newPromisedClient(hook ClientHook) (Client, *clientPromise) {
if hook == nil {
panic("NewPromisedClient(nil)")
Expand Down
9 changes: 3 additions & 6 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,9 @@ func (ans *answer) prepareSendException(c *lockedConn, rl *releaseList, ex error
if e, err := ans.ret.NewException(); err != nil {
ans.c.er.ReportError(exc.WrapError("send exception", err))
ans.sendMsg = nil
} else {
e.SetType(rpccp.Exception_Type(exc.TypeOf(ex)))
if err := e.SetReason(ex.Error()); err != nil {
ans.c.er.ReportError(exc.WrapError("send exception", err))
ans.sendMsg = nil
}
} else if err := e.MarshalError(ex); err != nil {
ans.c.er.ReportError(exc.WrapError("send exception", err))
ans.sendMsg = nil
}
}
}
Expand Down
124 changes: 103 additions & 21 deletions rpc/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/exc"
"capnproto.org/go/capnp/v3/internal/str"
"capnproto.org/go/capnp/v3/internal/syncutil"
rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc"
Expand All @@ -15,8 +16,12 @@ type exportID uint32

// expent is an entry in a Conn's export table.
type expent struct {
client capnp.Client
wireRefs uint32
client capnp.Client
wireRefs uint32
lthibault marked this conversation as resolved.
Show resolved Hide resolved
isPromise bool

// Should be called when removing this entry from the exports table:
cancel context.CancelFunc
}

// A key for use in a client's Metadata, whose value is the export
Expand Down Expand Up @@ -64,6 +69,7 @@ func (c *lockedConn) releaseExport(id exportID, count uint32) (capnp.Client, err
}
switch {
case count == ent.wireRefs:
defer ent.cancel()
client := ent.client
c.lk.exports[id] = nil
c.lk.exportID.remove(uint32(id))
Expand Down Expand Up @@ -148,35 +154,111 @@ func (c *lockedConn) sendCap(d rpccp.CapDescriptor, client capnp.Client) (_ expo
}
}

// TODO(someday): Check for unresolved client for senderPromise.

// Default to sender-hosted (export).
// Default to export.
state.Metadata.Lock()
defer state.Metadata.Unlock()
id, ok := c.findExportID(state.Metadata)
var ee *expent
if ok {
ent := c.lk.exports[id]
ent.wireRefs++
d.SetSenderHosted(uint32(id))
return id, true, nil
}

// Not already present; allocate an export id for it:
ee := &expent{
client: client.AddRef(),
wireRefs: 1,
ee = c.lk.exports[id]
ee.wireRefs++
} else {
// Not already present; allocate an export id for it:
ee = &expent{
client: client.AddRef(),
wireRefs: 1,
isPromise: state.IsPromise,
cancel: func() {},
}
id = exportID(c.lk.exportID.next())
if int64(id) == int64(len(c.lk.exports)) {
c.lk.exports = append(c.lk.exports, ee)
} else {
c.lk.exports[id] = ee
}
c.setExportID(state.Metadata, id)
}
id = exportID(c.lk.exportID.next())
if int64(id) == int64(len(c.lk.exports)) {
c.lk.exports = append(c.lk.exports, ee)
if ee.isPromise {
c.sendSenderPromise(id, client, d)
} else {
c.lk.exports[id] = ee
d.SetSenderHosted(uint32(id))
}
c.setExportID(state.Metadata, id)
d.SetSenderHosted(uint32(id))
return id, true, nil
}

// sendSenderPromise is a helper for sendCap that handles the senderPromise case.
func (c *lockedConn) sendSenderPromise(id exportID, client capnp.Client, d rpccp.CapDescriptor) {
// Send a promise, wait for the resolution asynchronously, then send
// a resolve message:
ee := c.lk.exports[id]
d.SetSenderPromise(uint32(id))
ctx, cancel := context.WithCancel(c.bgctx)
ee.cancel = cancel
waitRef := client.AddRef()
go func() {
defer cancel()
defer waitRef.Release()
// Logically we don't hold the lock anymore; it's held by the
// goroutine that spawned this one. So cast back to an unlocked
// Conn before trying to use it again:
unlockedConn := (*Conn)(c)

waitErr := waitRef.Resolve(ctx)
unlockedConn.withLocked(func(c *lockedConn) {
// Export was removed from the table at some point;
// remote peer is uninterested in the resolution, so
// drop the reference and we're done:
if c.lk.exports[id] != ee {
return
}

sendRef := waitRef.AddRef()
var (
resolvedID exportID
isExport bool
)
c.sendMessage(c.bgctx, func(m rpccp.Message) error {
res, err := m.NewResolve()
if err != nil {
return err
}
res.SetPromiseId(uint32(id))
if waitErr != nil {
ex, err := res.NewException()
if err != nil {
return err
}
return ex.MarshalError(waitErr)
}
desc, err := res.NewCap()
if err != nil {
return err
}
resolvedID, isExport, err = c.sendCap(desc, sendRef)
return err
}, func(err error) {
sendRef.Release()
if err != nil && isExport {
// release 1 ref of the thing it resolved to.
client, err := withLockedConn2(
unlockedConn,
func(c *lockedConn) (capnp.Client, error) {
return c.releaseExport(resolvedID, 1)
},
)
if err != nil {
c.er.ReportError(
exc.WrapError("releasing export due to failure to send resolve", err),
)
} else {
client.Release()
}
}
})
})
}()
}

// fillPayloadCapTable adds descriptors of payload's message's
// capabilities into payload's capability table and returns the
// reference counts that have been added to the exports table.
Expand Down
9 changes: 9 additions & 0 deletions rpc/internal/testcapnp/test.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ using Go = import "/go.capnp";
$Go.package("testcapnp");
$Go.import("capnproto.org/go/capnp/v3/rpc/internal/testcapnp");

interface Empty {
# Empty interface, handy for testing shutdown hooks and stuff that just
# needs an arbitrary capability.
}

interface EmptyProvider {
getEmpty @0 () -> (empty :Empty);
}

interface PingPong {
echoNum @0 (n :Int64) -> (n :Int64);
}
Expand Down
Loading