Skip to content

Commit

Permalink
certexchange: cancel request context when returning (#512)
Browse files Browse the repository at this point in the history
Because the request is async and we may abort for some reason (e.g.,
invalid certs), make sure to actually cancel the underlying request
before returning.

Also return early without spawning a goroutine if we're only asking for
a single power table.
  • Loading branch information
Stebalien authored Jul 23, 2024
1 parent a800e25 commit dff5722
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 22 deletions.
74 changes: 52 additions & 22 deletions certexchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res
// Reset the stream if the parent context is canceled. We never call the returned stop
// function because we call the cancel function returned by `withDeadline` (which cancels
// the entire context tree).
context.AfterFunc(ctx, func() { _ = stream.Reset() })
unbindReset := context.AfterFunc(ctx, func() { _ = stream.Reset() })

if deadline, ok := ctx.Deadline(); ok {
// Not all transports support deadlines.
Expand Down Expand Up @@ -95,6 +95,20 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res
return nil, nil, err
}

// If we aren't expecting any certificates, return immediately. We may _only_ want the power
// table.
if req.Limit == 0 {
// Reset immediately instead of waiting for it to get run async (better cleanup
// behavior).
if unbindReset() {
_ = stream.Reset()
}

ch := make(chan *certs.FinalityCertificate)
close(ch)
return &resp, ch, nil
}

ch := make(chan *certs.FinalityCertificate, 1)
// copy this in case the caller decides to re-use the request object...
request := *req
Expand All @@ -107,6 +121,13 @@ func (c *Client) Request(ctx context.Context, p peer.ID, req *Request) (_rh *Res
if perr := recover(); perr != nil {
log.Errorf("panicked while receiving certificates from peer %s: %v\n%s", p, perr, string(debug.Stack()))
}

// Reset immediately instead of waiting for it to get run async (better cleanup
// behavior). Also, because I don't fully trust AfterFunc.
if unbindReset() {
_ = stream.Reset()
}

cancelReq()
close(ch)
}()
Expand Down Expand Up @@ -152,34 +173,43 @@ func FindInitialPowerTable(ctx context.Context, c Client, powerTableCID cid.Cid,
ticker := clk.Ticker(ecPeriod / 2)
defer ticker.Stop()

pollOne := func(ctx context.Context, p peer.ID) (gpbft.PowerEntries, bool) {
targetProtocol := FetchProtocolName(c.NetworkName)
if proto, err := c.Host.Peerstore().FirstSupportedProtocol(p, targetProtocol); err != nil ||
proto != targetProtocol {

return nil, false
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

rh, _, err := c.Request(ctx, p, &request)
if err != nil {
log.Infow("requesting initial power table", "error", err, "peer", p)
return nil, false
}
ptCID, err := certs.MakePowerTableCID(rh.PowerTable)
if err != nil {
log.Infow("computing iniital power table CID", "error", err)
return nil, false
}
if !bytes.Equal(ptCID, powerTableCID.Bytes()) {
log.Infow("peer returned missmatching power table", "peer", p)
return nil, false
}
return rh.PowerTable, true
}

for {
for _, p := range c.Host.Network().Peers() {
if ctx.Err() != nil {
return nil, ctx.Err()
}

targetProtocol := FetchProtocolName(c.NetworkName)
if proto, err := c.Host.Peerstore().FirstSupportedProtocol(p, targetProtocol); err != nil ||
proto != targetProtocol {

continue
}

rh, _, err := c.Request(ctx, p, &request)
if err != nil {
log.Infow("requesting initial power table", "error", err, "peer", p)
continue
}
ptCID, err := certs.MakePowerTableCID(rh.PowerTable)
if err != nil {
log.Infow("computing iniital power table CID", "error", err)
continue
}
if !bytes.Equal(ptCID, powerTableCID.Bytes()) {
log.Infow("peer returned missmatching power table", "peer", p)
continue
if res, found := pollOne(ctx, p); found {
return res, nil
}
return rh.PowerTable, nil
}

log.Infof("could not find anyone with initial power table, retrying after sleep")
Expand Down
5 changes: 5 additions & 0 deletions certexchange/polling/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (p *Poller) CatchUp(ctx context.Context) (uint64, error) {
// 2. An error if something went wrong internally (e.g., the certificate store returned an error).
func (p *Poller) Poll(ctx context.Context, peer peer.ID) (*PollResult, error) {
res := new(PollResult)

// Cancel this context on exit in case we exit early before the request finishes.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for {
// Requests take time, so always try to catch-up between requests in case there has
// been some "local" action from the GPBFT instance.
Expand Down

0 comments on commit dff5722

Please sign in to comment.