Skip to content

Commit

Permalink
kgo: clear controller/coordinator caches on failed dials
Browse files Browse the repository at this point in the history
If we are repeatedly unable to dial the controller or coordinator, it is
possible that the broker is just gone. We need to clear our cache
internally so that we refresh the cache and pick up a new controller /
coordinator.

We retry 3 times just to handle temporary dial errors.

Closes #239.
  • Loading branch information
twmb committed Dec 6, 2022
1 parent b845981 commit e2e80bf
Showing 1 changed file with 49 additions and 17 deletions.
66 changes: 49 additions & 17 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,11 +810,27 @@ type retriable struct {
// that can fail / do not need to retry forever.
limitRetries int

// parseRetryErr, if non-nil, can parse a retriable error out of the
// response and return it. This error is *not* returned from the
// request if the req cannot be retried due to timeout or retry limits,
// but it *can* allow a retry if neither limit is hit yet.
parseRetryErr func(kmsg.Response) error
// parseRetryErr, if non-nil, can delete stale cached brokers. We do
// *not* return the error from this function to the caller, but we do
// use it to potentially retry. It is not necessary, but also not
// harmful, to return the input error.
parseRetryErr func(kmsg.Response, error) error
}

type failDial struct{ fails int8 }

// The controller and group/txn coordinators are cached. If dialing the broker
// repeatedly fails, we need to forget our cache to force a re-load: the broker
// may have completely died.
func (d *failDial) isRepeatedDialFail(err error) bool {
if isDialErr(err) {
d.fails++
if d.fails == 3 {
d.fails = 0
return true
}
}
return false
}

func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
Expand All @@ -831,8 +847,8 @@ start:
var retryErr error
if err == nil {
resp, err = r.last.waitResp(ctx, req)
if err == nil && r.parseRetryErr != nil {
retryErr = r.parseRetryErr(resp)
if r.parseRetryErr != nil {
retryErr = r.parseRetryErr(resp, err)
}
}

Expand Down Expand Up @@ -1098,7 +1114,6 @@ func (cl *Client) controller(ctx context.Context) (*broker, error) {
func (cl *Client) forgetControllerID(id int32) {
cl.controllerIDMu.Lock()
defer cl.controllerIDMu.Unlock()

if cl.controllerID == id {
cl.controllerID = unknownControllerID
}
Expand Down Expand Up @@ -1288,18 +1303,21 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error)
case errors.Is(err, kerr.CoordinatorNotAvailable),
errors.Is(err, kerr.CoordinatorLoadInProgress),
errors.Is(err, kerr.NotCoordinator):

cl.coordinatorsMu.Lock()
delete(cl.coordinators, coordinatorKey{
name: name,
typ: typ,
})
cl.coordinatorsMu.Unlock()
cl.deleteStaleCoordinator(name, typ)
return true
}
return false
}

func (cl *Client) deleteStaleCoordinator(name string, typ int8) {
cl.coordinatorsMu.Lock()
defer cl.coordinatorsMu.Unlock()
delete(cl.coordinators, coordinatorKey{
name: name,
typ: typ,
})
}

type brokerOrErr struct {
b *broker
err error
Expand All @@ -1325,7 +1343,14 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response
cl.maybeDeleteMappedMetadata(topics...)
}

r.parseRetryErr = func(resp kmsg.Response) error {
var d failDial
r.parseRetryErr = func(resp kmsg.Response, err error) error {
if err != nil {
if d.isRepeatedDialFail(err) {
cl.forgetControllerID(r.last.meta.NodeID)
}
return err
}
var code int16
switch t := resp.(type) {
case *kmsg.CreateTopicsResponse:
Expand Down Expand Up @@ -1455,7 +1480,14 @@ func (cl *Client) handleReqWithCoordinator(
req kmsg.Request,
) (*broker, kmsg.Response, error) {
r := cl.retriableBrokerFn(coordinator)
r.parseRetryErr = func(resp kmsg.Response) error {
var d failDial
r.parseRetryErr = func(resp kmsg.Response, err error) error {
if err != nil {
if d.isRepeatedDialFail(err) {
cl.deleteStaleCoordinator(name, typ)
}
return err
}
var code int16
switch t := resp.(type) {
// TXN
Expand Down

0 comments on commit e2e80bf

Please sign in to comment.