Skip to content

Commit

Permalink
kgo: do not return response ErrorCode's as shard errors
Browse files Browse the repository at this point in the history
See embedded comment. Not having this logic was making it look like
issuing a request failed with something like NOT_LEADER_FOR_PARTITION
(if there were enough retries) -- this is not a failed request, and
_some_ of the response may be successful.
  • Loading branch information
twmb committed Jan 21, 2024
1 parent ee1f011 commit 012cd7c
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2250,8 +2250,12 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
}

resp, err := broker.waitResp(ctx, myIssue.req)
var errIsFromResp bool
if err == nil {
err = sharder.onResp(myUnderlyingReq, resp) // perform some potential cleanup, and potentially receive an error to retry
if ke := (*kerr.Error)(nil); errors.As(err, &ke) {
errIsFromResp = true
}
}

// If we failed to issue the request, we *maybe* will retry.
Expand Down Expand Up @@ -2279,6 +2283,14 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
return
}

// If we pulled an error out of the response body in an attempt
// to possibly retry, the request was NOT an error that we want
// to bubble as a shard error. The request was successful, we
// have a response. Before we add the shard, strip the error.
// The end user can parse the response ErrorCode.
if errIsFromResp {
err = nil
}
addShard(shard(broker, myUnderlyingReq, resp, err)) // the error was not retryable
}()
}
Expand Down

0 comments on commit 012cd7c

Please sign in to comment.