Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send promise resolution messages #471

Merged
merged 16 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@ func (ans *answer) prepareSendException(c *lockedConn, rl *releaseList, ex error
ans.c.er.ReportError(exc.WrapError("send exception", err))
ans.sendMsg = nil
} else {
e.SetType(rpccp.Exception_Type(exc.TypeOf(ex)))
if err := e.SetReason(ex.Error()); err != nil {
if err := e.MarshalError(ex); err != nil {
zenhack marked this conversation as resolved.
Show resolved Hide resolved
ans.c.er.ReportError(exc.WrapError("send exception", err))
ans.sendMsg = nil
}
Expand Down
80 changes: 63 additions & 17 deletions rpc/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ type exportID uint32

// expent is an entry in a Conn's export table.
type expent struct {
client capnp.Client
wireRefs uint32
client capnp.Client
wireRefs uint32
lthibault marked this conversation as resolved.
Show resolved Hide resolved
isPromise bool
}

// A key for use in a client's Metadata, whose value is the export
Expand Down Expand Up @@ -154,26 +155,71 @@ func (c *lockedConn) sendCap(d rpccp.CapDescriptor, client capnp.Client) (_ expo
state.Metadata.Lock()
defer state.Metadata.Unlock()
id, ok := c.findExportID(state.Metadata)
var ee *expent
if ok {
ent := c.lk.exports[id]
ent.wireRefs++
d.SetSenderHosted(uint32(id))
return id, true, nil
ee = c.lk.exports[id]
ee.wireRefs++
} else {
// Not already present; allocate an export id for it:
ee = &expent{
client: client.AddRef(),
wireRefs: 1,
isPromise: state.IsPromise,
}
id = exportID(c.lk.exportID.next())
if int64(id) == int64(len(c.lk.exports)) {
c.lk.exports = append(c.lk.exports, ee)
} else {
c.lk.exports[id] = ee
}
c.setExportID(state.Metadata, id)
}
if ee.isPromise {
// Send a promise, wait for the resolution asynchronously, then send
lthibault marked this conversation as resolved.
Show resolved Hide resolved
// a resolve message:
d.SetSenderPromise(uint32(id))
waitRef := client.AddRef()
go func() {
unlockedConn := (*Conn)(c)
lthibault marked this conversation as resolved.
Show resolved Hide resolved
waitErr := waitRef.Resolve(c.bgctx)
zenhack marked this conversation as resolved.
Show resolved Hide resolved
unlockedConn.withLocked(func(c *lockedConn) {
// Export was removed from the table at some point;
// remote peer is uninterested in the resolution, so
// drop the reference and we're done:
if c.lk.exports[id] != ee {
go waitRef.Release()
return
}

// Not already present; allocate an export id for it:
ee := &expent{
client: client.AddRef(),
wireRefs: 1,
}
id = exportID(c.lk.exportID.next())
if int64(id) == int64(len(c.lk.exports)) {
c.lk.exports = append(c.lk.exports, ee)
c.sendMessage(c.bgctx, func(m rpccp.Message) error {
res, err := m.NewResolve()
if err != nil {
return err
}
res.SetPromiseId(uint32(id))
if waitErr != nil {
ex, err := res.NewException()
if err != nil {
return err
}
return ex.MarshalError(waitErr)
}
desc, err := res.NewCap()
if err != nil {
return err
}
_, _, err = c.sendCap(desc, waitRef)
return err
}, func(err error) {
if err != nil {
waitRef.Release()
}
})
})
}()
} else {
c.lk.exports[id] = ee
d.SetSenderHosted(uint32(id))
}
c.setExportID(state.Metadata, id)
d.SetSenderHosted(uint32(id))
return id, true, nil
}

Expand Down
10 changes: 10 additions & 0 deletions std/capnp/rpc/exception.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package rpc

import "capnproto.org/go/capnp/v3/exc"

// MarshalError fills in the fields of e according to err. Returns a non-nil
// error if marshalling fails.
func (e Exception) MarshalError(err error) error {
e.SetType(Exception_Type(exc.TypeOf(err)))
return e.SetReason(err.Error())
}