From 6cadf4e0ea8e0824f0aa21bb1e211d9ef7f656b3 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Tue, 27 Dec 2022 18:15:58 -0500 Subject: [PATCH 1/7] Make use of syncutil.With --- rpc/rpc.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 29c2a677..809ae8b8 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -694,18 +694,25 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn return nil } - c.lk.Lock() - if c.lk.answers[id] != nil { - c.lk.Unlock() - releaseCall() - return rpcerr.Failedf("incoming call: answer ID %d reused", id) - } + var ( + err error + p parsedCall + parseErr error + ) + syncutil.With(&c.lk, func() { + if c.lk.answers[id] != nil { + rl.Add(releaseCall) + err = rpcerr.Failedf("incoming call: answer ID %d reused", id) + return + } - var p parsedCall - parseErr := c.parseCall(&p, call) // parseCall sets CapTable + parseErr = c.parseCall(&p, call) // parseCall sets CapTable + }) + if err != nil { + return err + } // Create return message. - c.lk.Unlock() ret, send, retReleaser, err := c.newReturn() if err != nil { err = rpcerr.Annotate(err, "incoming call") From 561756942fa69284bd6009cbfce5ff2def38d425 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Tue, 27 Dec 2022 18:28:23 -0500 Subject: [PATCH 2/7] Minor cleanup Giving this a name makes it easier to follow what's going on here. --- rpc/rpc.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 809ae8b8..53ca1848 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -663,6 +663,16 @@ func (c *Conn) handleBootstrap(ctx context.Context, id answerID) error { return err } +func makeIdempotent(f func()) func() { + called := false + return func() { + if !called { + called = true + f() + } + } +} + func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capnp.ReleaseFunc) error { rl := &releaseList{} defer rl.Release() @@ -744,14 +754,9 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn releaseCall() return nil } - released := false - releaseArgs := func() { - if released { - return - } - released = true - releaseCall() - } + + releaseArgs := makeIdempotent(releaseCall) + switch p.target.which { case rpccp.MessageTarget_Which_importedCap: ent := c.findExport(p.target.importedCap) From 28f46dece574b1e6382f92ed99dd416e5a62a240 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Tue, 27 Dec 2022 18:40:16 -0500 Subject: [PATCH 3/7] Factor out construction of Recv. --- rpc/rpc.go | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 53ca1848..2bf245e7 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -755,7 +755,12 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn return nil } - releaseArgs := makeIdempotent(releaseCall) + recv := capnp.Recv{ + Args: p.args, + Method: p.method, + ReleaseArgs: makeIdempotent(releaseCall), + Returner: ans, + } switch p.target.which { case rpccp.MessageTarget_Which_importedCap: @@ -773,12 +778,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) c.lk.Unlock() - pcall := ent.client.RecvCall(callCtx, capnp.Recv{ - Args: p.args, - Method: p.method, - ReleaseArgs: releaseArgs, - Returner: ans, - }) + pcall := ent.client.RecvCall(callCtx, recv) // Place PipelineCaller into answer. Since the receive goroutine is // the only one that uses answer.pcall, it's fine that there's a // time gap for this being set. @@ -837,12 +837,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) c.lk.Unlock() - pcall := tgt.RecvCall(callCtx, capnp.Recv{ - Args: p.args, - Method: p.method, - ReleaseArgs: releaseArgs, - Returner: ans, - }) + pcall := tgt.RecvCall(callCtx, recv) ans.setPipelineCaller(p.method, pcall) } else { // Results not ready, use pipeline caller. @@ -852,12 +847,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn tgt := tgtAns.pcall c.tasks.Add(1) // will be finished by answer.Return c.lk.Unlock() - pcall := tgt.PipelineRecv(callCtx, p.target.transform, capnp.Recv{ - Args: p.args, - Method: p.method, - ReleaseArgs: releaseArgs, - Returner: ans, - }) + pcall := tgt.PipelineRecv(callCtx, p.target.transform, recv) tgtAns.pcalls.Done() ans.setPipelineCaller(p.method, pcall) } From 8aa94d585c99d59726d69bd78420b9dfe9532076 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Tue, 27 Dec 2022 18:48:01 -0500 Subject: [PATCH 4/7] Centralize remaining unlock in handleCall I don't love this, but at least it gets us to a place where the locking is reasonably simple. --- rpc/rpc.go | 63 +++++++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 2bf245e7..887f2237 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -737,7 +737,6 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn ret.SetReleaseParamCaps(false) // Find target and start call. - c.lk.Lock() ans := &answer{ c: c, id: id, @@ -745,13 +744,17 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn sendMsg: send, msgReleaser: retReleaser, } + c.lk.Lock() + defer c.lk.Unlock() + c.lk.answers[id] = ans if parseErr != nil { parseErr = rpcerr.Annotate(parseErr, "incoming call") ans.sendException(rl, parseErr) - c.lk.Unlock() - c.er.ReportError(parseErr) - releaseCall() + rl.Add(func() { + c.er.ReportError(parseErr) + releaseCall() + }) return nil } @@ -769,20 +772,22 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn ans.ret = rpccp.Return{} ans.sendMsg = nil ans.msgReleaser = nil - c.lk.Unlock() - retReleaser.Decr() - releaseCall() + rl.Add(func() { + retReleaser.Decr() + releaseCall() + }) return rpcerr.Failedf("incoming call: unknown export ID %d", id) } c.tasks.Add(1) // will be finished by answer.Return var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) - c.lk.Unlock() - pcall := ent.client.RecvCall(callCtx, recv) - // Place PipelineCaller into answer. Since the receive goroutine is - // the only one that uses answer.pcall, it's fine that there's a - // time gap for this being set. - ans.setPipelineCaller(p.method, pcall) + rl.Add(func() { + pcall := ent.client.RecvCall(callCtx, recv) + // Place PipelineCaller into answer. Since the receive goroutine is + // the only one that uses answer.pcall, it's fine that there's a + // time gap for this being set. + ans.setPipelineCaller(p.method, pcall) + }) return nil case rpccp.MessageTarget_Which_promisedAnswer: tgtAns := c.lk.answers[p.target.promisedAnswer] @@ -790,16 +795,16 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn ans.ret = rpccp.Return{} ans.sendMsg = nil ans.msgReleaser = nil - c.lk.Unlock() - retReleaser.Decr() - releaseCall() + rl.Add(func() { + retReleaser.Decr() + releaseCall() + }) return rpcerr.Failedf("incoming call: use of unknown or finished answer ID %d for promised answer target", p.target.promisedAnswer) } if tgtAns.flags.Contains(resultsReady) { if tgtAns.err != nil { ans.sendException(rl, tgtAns.err) - c.lk.Unlock() - releaseCall() + rl.Add(releaseCall) return nil } // tgtAns.results is guaranteed to stay alive because it hasn't @@ -810,8 +815,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn if err != nil { err = rpcerr.Failedf("incoming call: read results from target answer: %w", err) ans.sendException(rl, err) - c.lk.Unlock() - releaseCall() + rl.Add(releaseCall) c.er.ReportError(err) return nil } @@ -819,8 +823,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn if err != nil { // Not reporting, as this is the caller's fault. ans.sendException(rl, err) - c.lk.Unlock() - releaseCall() + rl.Add(releaseCall) return nil } iface := sub.Interface() @@ -836,9 +839,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn c.tasks.Add(1) // will be finished by answer.Return var callCtx context.Context callCtx, ans.cancel = context.WithCancel(c.bgctx) - c.lk.Unlock() - pcall := tgt.RecvCall(callCtx, recv) - ans.setPipelineCaller(p.method, pcall) + rl.Add(func() { + pcall := tgt.RecvCall(callCtx, recv) + ans.setPipelineCaller(p.method, pcall) + }) } else { // Results not ready, use pipeline caller. tgtAns.pcalls.Add(1) // will be finished by answer.Return @@ -846,10 +850,11 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn callCtx, ans.cancel = context.WithCancel(c.bgctx) tgt := tgtAns.pcall c.tasks.Add(1) // will be finished by answer.Return - c.lk.Unlock() - pcall := tgt.PipelineRecv(callCtx, p.target.transform, recv) - tgtAns.pcalls.Done() - ans.setPipelineCaller(p.method, pcall) + rl.Add(func() { + pcall := tgt.PipelineRecv(callCtx, p.target.transform, recv) + tgtAns.pcalls.Done() + ans.setPipelineCaller(p.method, pcall) + }) } return nil default: From 6da1fd2d63b667b4512525d26f7abddf907f6f10 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 28 Dec 2022 04:33:45 -0500 Subject: [PATCH 5/7] Minor stylistic change. Per Louis's suggestion --- rpc/rpc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 887f2237..f49fe56f 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -663,7 +663,7 @@ func (c *Conn) handleBootstrap(ctx context.Context, id answerID) error { return err } -func makeIdempotent(f func()) func() { +func idempotent(f func()) func() { called := false return func() { if !called { @@ -761,7 +761,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn recv := capnp.Recv{ Args: p.args, Method: p.method, - ReleaseArgs: makeIdempotent(releaseCall), + ReleaseArgs: idempotent(releaseCall), Returner: ans, } From 067def27d52a5b489c53e39b81ea83b2799d6f7a Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 28 Dec 2022 04:40:19 -0500 Subject: [PATCH 6/7] handleReturn: hierarchical locking --- rpc/rpc.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 3b767eba..67c76582 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -941,11 +941,14 @@ func parseTransform(list rpccp.PromisedAnswer_Op_List) ([]capnp.PipelineOp, erro } func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, release capnp.ReleaseFunc) error { + rl := &releaseList{} + defer rl.Release() c.lk.Lock() + defer c.lk.Unlock() + qid := questionID(ret.AnswerId()) if uint32(qid) >= uint32(len(c.lk.questions)) { - c.lk.Unlock() - release() + rl.Add(release) return rpcerr.Failedf("incoming return: question %d does not exist", qid) } // Pop the question from the table. Receiving the Return message @@ -954,8 +957,7 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, release capnp q := c.lk.questions[qid] c.lk.questions[qid] = nil if q == nil { - c.lk.Unlock() - release() + rl.Add(release) return rpcerr.Failedf("incoming return: question %d does not exist", qid) } canceled := q.flags&finished != 0 @@ -969,11 +971,9 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, release capnp if q.flags&finishSent != 0 { c.lk.questionID.remove(uint32(qid)) } - c.lk.Unlock() - release() + rl.Add(release) default: - c.lk.Unlock() - release() + rl.Add(release) go func() { <-q.finishMsgSend @@ -996,7 +996,6 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, release capnp // client or an error), so we save the ReleaseFunc for later: q.release = release } - c.lk.Unlock() // We're going to potentially block fulfilling some promises so fork // off a goroutine to avoid blocking the receive loop. go func() { From 96b62116b0f5ffe2341ab16645a26243400d6813 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 28 Dec 2022 04:44:18 -0500 Subject: [PATCH 7/7] handleDisembargo: hierarchical locking. This is the last place in the rpc package where we're unlocking c.lk, and it isn't either: - using synctuil.With() - in a defer statement, immediately after the Lock(). --- rpc/rpc.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 67c76582..2e8c45d4 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -1348,16 +1348,18 @@ func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo, release defer release() id := embargoID(d.Context().ReceiverLoopback()) - c.lk.Lock() - e := c.findEmbargo(id) + var e *embargo + syncutil.With(&c.lk, func() { + e = c.findEmbargo(id) + if e != nil { + // TODO(soon): verify target matches the right import. + c.lk.embargoes[id] = nil + c.lk.embargoID.remove(uint32(id)) + } + }) if e == nil { - c.lk.Unlock() return rpcerr.Failedf("incoming disembargo: received sender loopback for unknown ID %d", id) } - // TODO(soon): verify target matches the right import. - c.lk.embargoes[id] = nil - c.lk.embargoID.remove(uint32(id)) - c.lk.Unlock() e.lift() case rpccp.Disembargo_context_Which_senderLoopback: