Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetching: export utilities for decompressing and parsing partition fetch responses #4

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ type Client struct {
producer producer
consumer consumer

compressor *compressor
decompressor *decompressor
compressor *compressor

coordinatorsMu sync.Mutex
coordinators map[coordinatorKey]*coordinatorLoad
Expand Down Expand Up @@ -482,8 +481,7 @@ func NewClient(opts ...Opt) (*Client, error) {
bufPool: newBufPool(),
prsPool: newPrsPool(),

compressor: compressor,
decompressor: newDecompressor(),
compressor: compressor,

coordinators: make(map[coordinatorKey]*coordinatorLoad),

Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ type decompressor struct {
unzstdPool sync.Pool
}

var defaultDecompressor = newDecompressor()

func newDecompressor() *decompressor {
d := &decompressor{
ungzPool: sync.Pool{
Expand Down
143 changes: 97 additions & 46 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
"sync"
"time"

"github.com/twmb/franz-go/pkg/kmsg"

"github.com/twmb/franz-go/pkg/kbin"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

type recordsPool struct{ p *sync.Pool }
Expand Down Expand Up @@ -115,6 +116,29 @@ func (s *source) removeCursor(rm *cursor) {
}
}

type ProcessFetchPartitionOptions struct {
// KeepControlRecords sets the parser to keep control messages and return
// them with fetches, overriding the default that discards them.
//
// Generally, control messages are not useful. It is the same as kgo.KeepControlRecords().
KeepControlRecords bool

// Offset is the minimum offset for which we'll parse records. Records with lower offsets will not be parsed or returned.
Offset int64

// IsolationLevel controls whether or not to return uncomitted records. See kgo.IsolationLevel.
IsolationLevel IsolationLevel

// Topic is used to populate the Topic field of each Record.
Topic string

// Topic is used to populate the Partition field of each Record.
Partition int32

// recordsPool is for internal use only.
recordPool recordsPool
}

// cursor is where we are consuming from for an individual partition.
type cursor struct {
topic string
Expand Down Expand Up @@ -1091,7 +1115,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
continue
}

fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks, s.cl.cfg.recordsPool)
fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks, s.cl.cfg.recordsPool)
if fp.Err != nil {
if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving {
strip(topic, partition, fp.Err)
Expand Down Expand Up @@ -1268,20 +1292,52 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

// processRespPartition processes all records in all potentially compressed
// batches (or message sets).
func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks, recordsPool recordsPool) FetchPartition {
func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks, recordsPool recordsPool) (fp FetchPartition) {
if rp.ErrorCode == 0 {
o.hwm = rp.HighWatermark
}
opts := ProcessFetchPartitionOptions{
KeepControlRecords: br.cl.cfg.keepControl,
Offset: o.offset,
IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel},
Topic: o.from.topic,
Partition: o.from.partition,
recordPool: recordsPool,
}
observeMetrics := func(m FetchBatchMetrics) {
hooks.each(func(h Hook) {
if h, ok := h.(HookFetchBatchRead); ok {
h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m)
}
})
}
fp, o.offset = ProcessRespPartition(opts, rp, observeMetrics)
if len(fp.Records) > 0 {
lastRecord := fp.Records[len(fp.Records)-1]
// We adjust the offset separately because it may be larger than the offset of the last record for compacted partitions.
o.lastConsumedEpoch = lastRecord.LeaderEpoch
o.lastConsumedTime = lastRecord.Timestamp
}

return fp
}

// ProcessRespPartition processes all records in all potentially compressed batches (or message sets).
// ProcessRespPartition returns the FetchPartition and the last offset of records processed. observeMetrics can be nil.
// This is useful when issuing manual Fetch requests for records.
// In case of a compacted partition, the last offset may be larger than the offset of the last record.
// If the partition response is truncated and the partiiton was compacted, then the last offset is the offset of the last record.
func ProcessRespPartition(o ProcessFetchPartitionOptions, rp *kmsg.FetchResponseTopicPartition, observeMetrics func(FetchBatchMetrics)) (FetchPartition, int64) {
fp := FetchPartition{
Partition: rp.Partition,
Err: kerr.ErrorForCode(rp.ErrorCode),
HighWatermark: rp.HighWatermark,
LastStableOffset: rp.LastStableOffset,
LogStartOffset: rp.LogStartOffset,
}
if rp.ErrorCode == 0 {
o.hwm = rp.HighWatermark
}

var aborter aborter
if br.cl.cfg.isolationLevel == 1 {
if o.IsolationLevel.level == 1 {
aborter = buildAborter(rp)
}

Expand Down Expand Up @@ -1372,10 +1428,10 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon

default:
fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length)
if next := offset + 1; next > o.offset {
o.offset = next
if next := offset + 1; next > o.Offset {
o.Offset = next
}
return fp
return fp, o.Offset
}

if !check() {
Expand All @@ -1390,30 +1446,27 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
case *kmsg.MessageV0:
m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length
m.CompressionType = uint8(t.Attributes) & 0b0000_0111
m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor)
m.NumRecords, m.UncompressedBytes = processV0OuterMessage(&o, &fp, t, defaultDecompressor)

case *kmsg.MessageV1:
m.CompressedBytes = int(length)
m.CompressionType = uint8(t.Attributes) & 0b0000_0111
m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor)
m.NumRecords, m.UncompressedBytes = processV1OuterMessage(&o, &fp, t, defaultDecompressor)

case *kmsg.RecordBatch:
m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length
m.CompressionType = uint8(t.Attributes) & 0b0000_0111
m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor, recordsPool)
m.NumRecords, m.UncompressedBytes = processRecordBatch(&o, &fp, t, aborter, defaultDecompressor, o.recordPool)
}

if m.UncompressedBytes == 0 {
m.UncompressedBytes = m.CompressedBytes
}
hooks.each(func(h Hook) {
if h, ok := h.(HookFetchBatchRead); ok {
h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m)
}
})
if observeMetrics != nil {
observeMetrics(m)
}
}

return fp
return fp, o.Offset
}

type aborter map[int64][]int64
Expand Down Expand Up @@ -1476,7 +1529,8 @@ func readRawRecords(n int, in []byte) []kmsg.Record {
return rs
}

func (o *cursorOffsetNext) processRecordBatch(
func processRecordBatch(
o *ProcessFetchPartitionOptions,
fp *FetchPartition,
batch *kmsg.RecordBatch,
aborter aborter,
Expand All @@ -1488,7 +1542,7 @@ func (o *cursorOffsetNext) processRecordBatch(
return 0, 0
}
lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta)
if lastOffset < o.offset {
if lastOffset < o.Offset {
// If the last offset in this batch is less than what we asked
// for, we got a batch that we entirely do not need. We can
// avoid all work (although we should not get this batch).
Expand Down Expand Up @@ -1520,15 +1574,15 @@ func (o *cursorOffsetNext) processRecordBatch(
// either advance offsets or will set to nextAskOffset.
nextAskOffset := lastOffset + 1
defer func() {
if numRecords == len(krecords) && o.offset < nextAskOffset {
o.offset = nextAskOffset
if numRecords == len(krecords) && o.Offset < nextAskOffset {
o.Offset = nextAskOffset
}
}()

abortBatch := aborter.shouldAbortBatch(batch)
for i := range krecords {
record := recordToRecord(
o.from.topic,
o.Topic,
fp.Partition,
batch,
&krecords[i],
Expand All @@ -1553,14 +1607,10 @@ func (o *cursorOffsetNext) processRecordBatch(
// this easy, but if not, we decompress and process each inner message as
// either v0 or v1. We only expect the inner message to be v1, but technically
// a crazy pipeline could have v0 anywhere.
func (o *cursorOffsetNext) processV1OuterMessage(
fp *FetchPartition,
message *kmsg.MessageV1,
decompressor *decompressor,
) (int, int) {
func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, decompressor *decompressor) (int, int) {
compression := byte(message.Attributes & 0x0003)
if compression == 0 {
o.processV1Message(fp, message)
processV1Message(o, fp, message)
return 1, 0
}

Expand Down Expand Up @@ -1631,21 +1681,22 @@ out:
case *kmsg.MessageV0:
innerMessage.Offset = firstOffset + int64(i)
innerMessage.Attributes |= int8(compression)
if !o.processV0Message(fp, innerMessage) {
if !processV0Message(o, fp, innerMessage) {
return i, uncompressedBytes
}
case *kmsg.MessageV1:
innerMessage.Offset = firstOffset + int64(i)
innerMessage.Attributes |= int8(compression)
if !o.processV1Message(fp, innerMessage) {
if !processV1Message(o, fp, innerMessage) {
return i, uncompressedBytes
}
}
}
return len(innerMessages), uncompressedBytes
}

func (o *cursorOffsetNext) processV1Message(
func processV1Message(
o *ProcessFetchPartitionOptions,
fp *FetchPartition,
message *kmsg.MessageV1,
) bool {
Expand All @@ -1657,21 +1708,22 @@ func (o *cursorOffsetNext) processV1Message(
fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes)
return false
}
record := v1MessageToRecord(o.from.topic, fp.Partition, message)
record := v1MessageToRecord(o.Topic, fp.Partition, message)
o.maybeKeepRecord(fp, record, false)
return true
}

// Processes an outer v0 message. We expect inner messages to be entirely v0 as
// well, so this only tries v0 always.
func (o *cursorOffsetNext) processV0OuterMessage(
func processV0OuterMessage(
o *ProcessFetchPartitionOptions,
fp *FetchPartition,
message *kmsg.MessageV0,
decompressor *decompressor,
) (int, int) {
compression := byte(message.Attributes & 0x0003)
if compression == 0 {
o.processV0Message(fp, message)
processV0Message(o, fp, message)
return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return
}

Expand Down Expand Up @@ -1714,14 +1766,15 @@ func (o *cursorOffsetNext) processV0OuterMessage(
innerMessage := &innerMessages[i]
innerMessage.Attributes |= int8(compression)
innerMessage.Offset = firstOffset + int64(i)
if !o.processV0Message(fp, innerMessage) {
if !processV0Message(o, fp, innerMessage) {
return i, uncompressedBytes
}
}
return len(innerMessages), uncompressedBytes
}

func (o *cursorOffsetNext) processV0Message(
func processV0Message(
o *ProcessFetchPartitionOptions,
fp *FetchPartition,
message *kmsg.MessageV0,
) bool {
Expand All @@ -1733,7 +1786,7 @@ func (o *cursorOffsetNext) processV0Message(
fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes)
return false
}
record := v0MessageToRecord(o.from.topic, fp.Partition, message)
record := v0MessageToRecord(o.Topic, fp.Partition, message)
o.maybeKeepRecord(fp, record, false)
return true
}
Expand All @@ -1742,26 +1795,24 @@ func (o *cursorOffsetNext) processV0Message(
//
// If the record is being aborted or the record is a control record and the
// client does not want to keep control records, this does not keep the record.
func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) {
if record.Offset < o.offset {
func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) {
if record.Offset < o.Offset {
// We asked for offset 5, but that was in the middle of a
// batch; we got offsets 0 thru 4 that we need to skip.
return
}

// We only keep control records if specifically requested.
if record.Attrs.IsControl() {
abort = !o.from.keepControl
abort = !o.KeepControlRecords
}
if !abort {
fp.Records = append(fp.Records, record)
}

// The record offset may be much larger than our expected offset if the
// topic is compacted.
o.offset = record.Offset + 1
o.lastConsumedEpoch = record.LeaderEpoch
o.lastConsumedTime = record.Timestamp
o.Offset = record.Offset + 1
}

///////////////////////////////
Expand Down