Skip to content

Commit

Permalink
kgo: ignore OOOSN where possible
Browse files Browse the repository at this point in the history
See embedded comment. This preceeds handling KIP-890.

Closes #805.
  • Loading branch information
twmb committed Oct 10, 2024
1 parent f689efd commit 3548d1f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti
failing: mp.loadErr != 0,
sink: mp.sns.sink,
topicPartitionData: td,
lastAckedOffset: -1,
}
} else {
p.cursor = &cursor{
Expand Down
33 changes: 33 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,20 @@ func (s *sink) handleReqRespBatch(
// handling, but KIP-360 demonstrated that resetting sequence
// numbers is fundamentally unsafe, so we treat it like OOOSN.
//
// KAFKA-5793 specifically mentions for OOOSN "when you get it,
// it should always mean data loss". Sometime after KIP-360,
// Kafka changed the client to remove all places
// UnknownProducerID was returned, and then started referring
// to OOOSN as retryable. KIP-890 definitively says OOOSN is
// retryable. However, the Kafka source as of 24-10-10 still
// only retries OOOSN for batches that are NOT the expected
// next batch (i.e., it's next + 1, for when there are multiple
// in flight). With KIP-890, we still just disregard whatever
// supposedly non-retryable / actually-is-retryable error is
// returned if the LogStartOffset is _after_ what we previously
// produced. Specifically, this is step (4) in in wiki link
// within KAFKA-5793.
//
// InvalidMapping is similar to UnknownProducerID, but occurs
// when the txnal coordinator timed out our transaction.
//
Expand Down Expand Up @@ -881,6 +895,22 @@ func (s *sink) handleReqRespBatch(
// txn coordinator requests, which have PRODUCER_FENCED vs
// TRANSACTION_TIMED_OUT.

if batch.owner.lastAckedOffset >= 0 && rp.LogStartOffset > batch.owner.lastAckedOffset {
s.cl.cfg.logger.Log(LogLevelInfo, "partition prefix truncation to after our last produce caused the broker to forget us; no loss occurred, bumping producer epoch and resetting sequence numbers",
"broker", logID(s.nodeID),
"topic", topic,
"partition", rp.Partition,
"producer_id", producerID,
"producer_epoch", producerEpoch,
"err", err,
)
s.cl.failProducerID(producerID, producerEpoch, errReloadProducerID)
if debug {
fmt.Fprintf(b, "resetting@%d,%d(%s)}, ", rp.BaseOffset, nrec, err)
}
return true, false
}

if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss {
s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID",
"broker", logID(s.nodeID),
Expand Down Expand Up @@ -951,6 +981,7 @@ func (s *sink) handleReqRespBatch(
)
} else {
batch.owner.okOnSink = true
batch.owner.lastAckedOffset = rp.BaseOffset + int64(len(batch.records))
}
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err)
didProduce = err == nil
Expand Down Expand Up @@ -1222,6 +1253,8 @@ type recBuf struct {
// to drain.
inflight uint8

lastAckedOffset int64 // last ProduceResponse's BaseOffset + how many records we produced

topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated)

// seq is used for the seq in each record batch. It is incremented when
Expand Down

0 comments on commit 3548d1f

Please sign in to comment.