Skip to content

Commit

Permalink
make errUnknown{Controller,Coordinator} retryable, improve error wording
Browse files Browse the repository at this point in the history
The two errors were not retryable, so if they were encountered, we
failed requests immediately.

We also improve the wording for errUnknownController to indicate that -1
means the controller is not ready, and we change all "Kafka" to "broker"
to be more broker-implementation agnostic
  • Loading branch information
twmb committed Oct 6, 2022
1 parent ebdbe77 commit f35ef66
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,14 +696,14 @@ start:
// Post, Kafka replies with all versions.
if rawResp[1] == 35 {
if maxVersion == 0 {
return errors.New("Kafka replied with UNSUPPORTED_VERSION to an ApiVersions request of version 0")
return errors.New("broker replied with UNSUPPORTED_VERSION to an ApiVersions request of version 0")
}
srawResp := string(rawResp)
if srawResp == "\x00\x23\x00\x00\x00\x00" ||
// EventHubs erroneously replies with v1, so we check
// for that as well.
srawResp == "\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00" {
cxn.cl.cfg.logger.Log(LogLevelDebug, "kafka does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", logID(cxn.b.meta.NodeID))
cxn.cl.cfg.logger.Log(LogLevelDebug, "broker does not know our ApiVersions version, downgrading to version 0 and retrying", "broker", logID(cxn.b.meta.NodeID))
maxVersion = 0
goto start
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ func (g *groupConsumer) updateCommitted(
}
if g.uncommitted == nil || // just in case
len(req.Topics) != len(resp.Topics) { // bad kafka
g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Num topics in request: %d, in reply: %d, we cannot handle this!", len(req.Topics), len(resp.Topics)), "group", g.cfg.group)
g.cfg.logger.Log(LogLevelError, fmt.Sprintf("broker replied to our OffsetCommitRequest incorrectly! Num topics in request: %d, in reply: %d, we cannot handle this!", len(req.Topics), len(resp.Topics)), "group", g.cfg.group)
return
}

Expand All @@ -1806,7 +1806,7 @@ func (g *groupConsumer) updateCommitted(
if topic == nil || // just in case
reqTopic.Topic != respTopic.Topic || // bad kafka
len(reqTopic.Partitions) != len(respTopic.Partitions) { // same
g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic at request index %d: %s, reply at index: %s; num partitions on request topic: %d, in reply: %d, we cannot handle this!", i, reqTopic.Topic, respTopic.Topic, len(reqTopic.Partitions), len(respTopic.Partitions)), "group", g.cfg.group)
g.cfg.logger.Log(LogLevelError, fmt.Sprintf("broker replied to our OffsetCommitRequest incorrectly! Topic at request index %d: %s, reply at index: %s; num partitions on request topic: %d, in reply: %d, we cannot handle this!", i, reqTopic.Topic, respTopic.Topic, len(reqTopic.Partitions), len(respTopic.Partitions)), "group", g.cfg.group)
continue
}

Expand All @@ -1828,7 +1828,7 @@ func (g *groupConsumer) updateCommitted(
continue
}
if reqPart.Partition != respPart.Partition { // bad kafka
g.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic %s partition %d != resp partition %d", reqTopic.Topic, reqPart.Partition, respPart.Partition), "group", g.cfg.group)
g.cfg.logger.Log(LogLevelError, fmt.Sprintf("broker replied to our OffsetCommitRequest incorrectly! Topic %s partition %d != resp partition %d", reqTopic.Topic, reqPart.Partition, respPart.Partition), "group", g.cfg.group)
continue
}
if respPart.ErrorCode != 0 {
Expand Down
21 changes: 17 additions & 4 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func isRetriableBrokerErr(err error) bool {
if errors.Is(err, errCorrelationIDMismatch) {
return true
}
// We sometimes load the controller before issuing requests, and the
// cluster may not yet be ready and will return -1 for the controller.
// We can backoff and retry and hope the cluster has stabilized.
if ce := (*errUnknownController)(nil); errors.As(err, &ce) {
return true
}
// Same thought for a non-existing coordinator.
if ce := (*errUnknownCoordinator)(nil); errors.As(err, &ce) {
return true
}
var tempErr interface{ Temporary() bool }
if errors.As(err, &tempErr) {
return tempErr.Temporary()
Expand Down Expand Up @@ -213,7 +223,10 @@ type errUnknownController struct {
}

func (e *errUnknownController) Error() string {
return fmt.Sprintf("Kafka replied that the controller broker is %d,"+
if e.id == -1 {
return "broker replied that the controller broker is not available"
}
return fmt.Sprintf("broker replied that the controller broker is %d,"+
" but did not reply with that broker in the broker list", e.id)
}

Expand All @@ -225,15 +238,15 @@ type errUnknownCoordinator struct {
func (e *errUnknownCoordinator) Error() string {
switch e.key.typ {
case coordinatorTypeGroup:
return fmt.Sprintf("Kafka replied that group %s has broker coordinator %d,"+
return fmt.Sprintf("broker replied that group %s has broker coordinator %d,"+
" but did not reply with that broker in the broker list",
e.key.name, e.coordinator)
case coordinatorTypeTxn:
return fmt.Sprintf("Kafka replied that txn id %s has broker coordinator %d,"+
return fmt.Sprintf("broker replied that txn id %s has broker coordinator %d,"+
" but did not reply with that broker in the broker list",
e.key.name, e.coordinator)
default:
return fmt.Sprintf("Kafka replied to an unknown coordinator key %s (type %d) that it has a broker coordinator %d,"+
return fmt.Sprintf("broker replied to an unknown coordinator key %s (type %d) that it has a broker coordinator %d,"+
" but did not reply with that broker in the broker list", e.key.name, e.key.typ, e.coordinator)
}
}
2 changes: 1 addition & 1 deletion pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error)
for _, b := range g.cfg.balancers {
ours = append(ours, b.ProtocolName())
}
g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("%s could not find Kafka-chosen balancer", from), "kafka_choice", proto, "our_set", strings.Join(ours, ", "))
g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("%s could not find broker-chosen balancer", from), "kafka_choice", proto, "our_set", strings.Join(ours, ", "))
return nil, fmt.Errorf("unable to balance: none of our balancers have a name equal to the balancer chosen for balancing (%s)", proto)
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *sink) issueTxnReq(
for _, topic := range resp.Topics {
topicBatches, ok := req.batches[topic.Topic]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with topic in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic)
s.cl.cfg.logger.Log(LogLevelError, "broker replied with topic in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic)
continue
}
for _, partition := range topic.Partitions {
Expand All @@ -470,7 +470,7 @@ func (s *sink) issueTxnReq(

batch, ok := topicBatches[partition.Partition]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka replied with partition in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic, "partition", partition.Partition)
s.cl.cfg.logger.Log(LogLevelError, "broker replied with partition in AddPartitionsToTxnResponse that was not in request", "topic", topic.Topic, "partition", partition.Partition)
continue
}

Expand Down Expand Up @@ -604,7 +604,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
topic := rTopic.Topic
partitions, ok := req.batches[topic]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with topic in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", topic)
s.cl.cfg.logger.Log(LogLevelError, "broker erroneously replied with topic in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", topic)
delete(req.metrics, topic)
continue // should not hit this
}
Expand All @@ -618,7 +618,7 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
partition := rPartition.Partition
batch, ok := partitions[partition]
if !ok {
s.cl.cfg.logger.Log(LogLevelError, "Kafka erroneously replied with partition in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", rTopic.Topic, "partition", partition)
s.cl.cfg.logger.Log(LogLevelError, "broker erroneously replied with partition in produce request that we did not produce to", "broker", logID(s.nodeID), "topic", rTopic.Topic, "partition", partition)
delete(tmetrics, partition)
continue // should not hit this
}
Expand Down Expand Up @@ -655,8 +655,8 @@ func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response
}

if len(req.batches) > 0 {
s.cl.cfg.logger.Log(LogLevelError, "Kafka did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID))
s.handleRetryBatches(req.batches, 0, true, false, "kafka did not reply to all topics in produce request")
s.cl.cfg.logger.Log(LogLevelError, "broker did not reply to all topics / partitions in the produce request! reenqueuing missing partitions", "broker", logID(s.nodeID))
s.handleRetryBatches(req.batches, 0, true, false, "broker did not reply to all topics in produce request")
}
if len(reqRetry) > 0 {
s.handleRetryBatches(reqRetry, 0, true, true, "produce request had retry batches")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
} else {
defer func() {
if committed {
s.cl.cfg.logger.Log(LogLevelDebug, "sleeping 200ms before allowing a rebalance to continue to give Kafka a chance to write txn markers and avoid duplicates")
s.cl.cfg.logger.Log(LogLevelDebug, "sleeping 200ms before allowing a rebalance to continue to give the brokers a chance to write txn markers and avoid duplicates")
go func() {
time.Sleep(200 * time.Millisecond)
s.failMu.Unlock()
Expand Down

0 comments on commit f35ef66

Please sign in to comment.