Skip to content

Commit

Permalink
Merge pull request #381 from zenhack/bundle-transport-returns
Browse files Browse the repository at this point in the history
Cleanup: bundle return values from transport
  • Loading branch information
zenhack authored Dec 14, 2022
2 parents 9d4bfd6 + 67ac5cf commit 48494e4
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 192 deletions.
10 changes: 5 additions & 5 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,25 @@ func errorAnswer(c *Conn, id answerID, err error) *answer {
// all references to it are dropped; the caller is responsible for one reference. This will not
// happen before the message is sent, as the returned send function retains a reference.
func (c *Conn) newReturn(ctx context.Context) (_ rpccp.Return, sendMsg func(), _ *rc.Releaser, _ error) {
msg, send, releaseMsg, err := c.transport.NewMessage()
outMsg, err := c.transport.NewMessage()
if err != nil {
return rpccp.Return{}, nil, nil, rpcerr.Failedf("create return: %w", err)
}
ret, err := msg.NewReturn()
ret, err := outMsg.Message.NewReturn()
if err != nil {
releaseMsg()
outMsg.Release()
return rpccp.Return{}, nil, nil, rpcerr.Failedf("create return: %w", err)
}

// Before releasing the message, we need to wait both until it is sent and
// until the local vat is done with it. We therefore implement a simple
// ref-counting mechanism such that 'release' must be called twice before
// 'releaseMsg' is called.
releaser := rc.NewReleaser(2, releaseMsg)
releaser := rc.NewReleaser(2, outMsg.Release)

return ret, func() {
c.sender.Send(asyncSend{
send: send,
send: outMsg.Send,
release: releaser.Decr,
onSent: func(err error) {
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions rpc/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/flowcontrol"
"capnproto.org/go/capnp/v3/rpc/internal/testcapnp"
rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc"
"capnproto.org/go/capnp/v3/rpc/transport"
)

// measureTransport is a wrapper around another transport, and measures the
Expand All @@ -28,15 +28,15 @@ type measuringTransport struct {
inUse, maxInUse uint64
}

func (t *measuringTransport) RecvMessage() (rpccp.Message, capnp.ReleaseFunc, error) {
msg, release, err := t.Transport.RecvMessage()
func (t *measuringTransport) RecvMessage() (transport.IncomingMessage, error) {
inMsg, err := t.Transport.RecvMessage()
if err != nil {
return msg, release, err
return inMsg, err
}

size, err := capnp.Struct(msg).Message().TotalSize()
size, err := capnp.Struct(inMsg.Message).Message().TotalSize()
if err != nil {
return msg, release, err
return inMsg, err
}

t.mu.Lock()
Expand All @@ -46,14 +46,14 @@ func (t *measuringTransport) RecvMessage() (rpccp.Message, capnp.ReleaseFunc, er
}
t.mu.Unlock()

oldRelease := release
release = capnp.ReleaseFunc(func() {
oldRelease := inMsg.Release
inMsg.Release = capnp.ReleaseFunc(func() {
oldRelease()
t.mu.Lock()
defer t.mu.Unlock()
t.inUse -= size
})
return msg, release, err
return inMsg, err
}

// Test that attaching a fixed-size FlowLimiter results in actually limiting the
Expand Down
Loading

0 comments on commit 48494e4

Please sign in to comment.