From bce9b89ba7ee5b39a0399331baf8c294105dab09 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 8 Aug 2024 17:28:14 +0200 Subject: [PATCH 1/2] fetching: export utilities for decompressing and parsing partition retch responses ### Background In grafana/mimir we are working towards making fetch requests ourselves. The primary reason behind that is that individual requests to the kafka backend are slow, so doing them sequentially per partition becomes the bottleneck in our application. So we want to fetch records in parallel to speed up the consumption. One difficulty I met when issuing `FetchRequest`s ourselves is that parsing the response is non-trivial. That's why I'm proposing to export these functions for downstream projects to use. Alternatively, I can also try contributing the concurrent fetching logic. But I believe that is much more nuanced and with more tradeoffs around fetched bytes and latency. So I wasn't sure whether it's a good fit for a general purpose library. I'm open to discuss this further. ### What this PR does Moves `(*kgo.cursorOffsetNext).processRespPartition` from being a method to being a standalone function - `kgo.processRespPartition`. There were also little changes necessary to make the interface suitable for public use (like removing the `*broker` parameter). ### Side effects To minimize the necessary changes and the API surface of the package I opted to use a single global decompressor for all messages. Previously, there would be one decompressor per client and that decompressor would be passed down to `(*cursorOffsetNext).processRespPartition`. My understanding is that using different pooled readers (lz4, zst, gzip) shouldn't have a negative impact on performance because usage patterns do not affect the behaviour of the reader (for example, a consistent size of decompressed data doesn't make the reader more or less efficient). I have not thoroughly verified or tested this - Let me know if you think that's important. An alternative to this is to also export the `decompressor` along with `newDecompressor()` and the auxiliary types for decompression. --- pkg/kgo/client.go | 6 +- pkg/kgo/compression.go | 2 + pkg/kgo/source.go | 139 ++++++++++++++++++++++++++--------------- 3 files changed, 94 insertions(+), 53 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 775a22e6..197c3933 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -78,8 +78,7 @@ type Client struct { producer producer consumer consumer - compressor *compressor - decompressor *decompressor + compressor *compressor coordinatorsMu sync.Mutex coordinators map[coordinatorKey]*coordinatorLoad @@ -482,8 +481,7 @@ func NewClient(opts ...Opt) (*Client, error) { bufPool: newBufPool(), prsPool: newPrsPool(), - compressor: compressor, - decompressor: newDecompressor(), + compressor: compressor, coordinators: make(map[coordinatorKey]*coordinatorLoad), diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index fe8ad645..81d9d8a7 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -235,6 +235,8 @@ type decompressor struct { unzstdPool sync.Pool } +var defaultDecompressor = newDecompressor() + func newDecompressor() *decompressor { d := &decompressor{ ungzPool: sync.Pool{ diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0c475d14..cdef8a3a 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -92,6 +92,26 @@ func (s *source) removeCursor(rm *cursor) { } } +type ProcessFetchPartitionOptions struct { + // KeepControlRecords sets the parser to keep control messages and return + // them with fetches, overriding the default that discards them. + // + // Generally, control messages are not useful. It is the same as kgo.KeepControlRecords(). + KeepControlRecords bool + + // Offset is the minimum offset for which we'll parse records. Records with lower offsets will not be parsed or returned. + Offset int64 + + // IsolationLevel controls whether or not to return uncomitted records. See kgo.IsolationLevel. + IsolationLevel IsolationLevel + + // Topic is used to populate the Topic field of each Record. + Topic string + + // Topic is used to populate the Partition field of each Record. + Partition int32 +} + // cursor is where we are consuming from for an individual partition. type cursor struct { topic string @@ -1068,7 +1088,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks) + fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1245,7 +1265,41 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) { + if rp.ErrorCode == 0 { + o.hwm = rp.HighWatermark + } + opts := ProcessFetchPartitionOptions{ + KeepControlRecords: br.cl.cfg.keepControl, + Offset: o.offset, + IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, + Topic: o.from.topic, + Partition: o.from.partition, + } + observeMetrics := func(m FetchBatchMetrics) { + hooks.each(func(h Hook) { + if h, ok := h.(HookFetchBatchRead); ok { + h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) + } + }) + } + fp, o.offset = ProcessRespPartition(opts, rp, observeMetrics) + if len(fp.Records) > 0 { + lastRecord := fp.Records[len(fp.Records)-1] + // We adjust the offset separately because it may be larger than the offset of the last record for compacted partitions. + o.lastConsumedEpoch = lastRecord.LeaderEpoch + o.lastConsumedTime = lastRecord.Timestamp + } + + return fp +} + +// ProcessRespPartition processes all records in all potentially compressed batches (or message sets). +// ProcessRespPartition returns the FetchPartition and the last offset of records processed. observeMetrics can be nil. +// This is useful when issuing manual Fetch requests for records. +// In case of a compacted partition, the last offset may be larger than the offset of the last record. +// If the partition response is truncated and the partiiton was compacted, then the last offset is the offset of the last record. +func ProcessRespPartition(o ProcessFetchPartitionOptions, rp *kmsg.FetchResponseTopicPartition, observeMetrics func(FetchBatchMetrics)) (FetchPartition, int64) { fp := FetchPartition{ Partition: rp.Partition, Err: kerr.ErrorForCode(rp.ErrorCode), @@ -1253,12 +1307,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon LastStableOffset: rp.LastStableOffset, LogStartOffset: rp.LogStartOffset, } - if rp.ErrorCode == 0 { - o.hwm = rp.HighWatermark - } var aborter aborter - if br.cl.cfg.isolationLevel == 1 { + if o.IsolationLevel.level == 1 { aborter = buildAborter(rp) } @@ -1349,10 +1400,10 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon default: fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length) - if next := offset + 1; next > o.offset { - o.offset = next + if next := offset + 1; next > o.Offset { + o.Offset = next } - return fp + return fp, o.Offset } if !check() { @@ -1367,30 +1418,27 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon case *kmsg.MessageV0: m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV0OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.MessageV1: m.CompressedBytes = int(length) m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV1OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.RecordBatch: m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor) + m.NumRecords, m.UncompressedBytes = processRecordBatch(&o, &fp, t, aborter, defaultDecompressor) } if m.UncompressedBytes == 0 { m.UncompressedBytes = m.CompressedBytes } - hooks.each(func(h Hook) { - if h, ok := h.(HookFetchBatchRead); ok { - h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) - } - }) + if observeMetrics != nil { + observeMetrics(m) + } } - - return fp + return fp, o.Offset } type aborter map[int64][]int64 @@ -1453,7 +1501,8 @@ func readRawRecords(n int, in []byte) []kmsg.Record { return rs } -func (o *cursorOffsetNext) processRecordBatch( +func processRecordBatch( + o *ProcessFetchPartitionOptions, fp *FetchPartition, batch *kmsg.RecordBatch, aborter aborter, @@ -1464,7 +1513,7 @@ func (o *cursorOffsetNext) processRecordBatch( return 0, 0 } lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) - if lastOffset < o.offset { + if lastOffset < o.Offset { // If the last offset in this batch is less than what we asked // for, we got a batch that we entirely do not need. We can // avoid all work (although we should not get this batch). @@ -1496,15 +1545,15 @@ func (o *cursorOffsetNext) processRecordBatch( // either advance offsets or will set to nextAskOffset. nextAskOffset := lastOffset + 1 defer func() { - if numRecords == len(krecords) && o.offset < nextAskOffset { - o.offset = nextAskOffset + if numRecords == len(krecords) && o.Offset < nextAskOffset { + o.Offset = nextAskOffset } }() abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( - o.from.topic, + o.Topic, fp.Partition, batch, &krecords[i], @@ -1528,14 +1577,10 @@ func (o *cursorOffsetNext) processRecordBatch( // this easy, but if not, we decompress and process each inner message as // either v0 or v1. We only expect the inner message to be v1, but technically // a crazy pipeline could have v0 anywhere. -func (o *cursorOffsetNext) processV1OuterMessage( - fp *FetchPartition, - message *kmsg.MessageV1, - decompressor *decompressor, -) (int, int) { +func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, decompressor *decompressor) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV1Message(fp, message) + processV1Message(o, fp, message) return 1, 0 } @@ -1606,13 +1651,13 @@ out: case *kmsg.MessageV0: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } case *kmsg.MessageV1: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV1Message(fp, innerMessage) { + if !processV1Message(o, fp, innerMessage) { return i, uncompressedBytes } } @@ -1620,7 +1665,8 @@ out: return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV1Message( +func processV1Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, ) bool { @@ -1632,21 +1678,17 @@ func (o *cursorOffsetNext) processV1Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v1MessageToRecord(o.from.topic, fp.Partition, message) + record := v1MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } // Processes an outer v0 message. We expect inner messages to be entirely v0 as // well, so this only tries v0 always. -func (o *cursorOffsetNext) processV0OuterMessage( - fp *FetchPartition, - message *kmsg.MessageV0, - decompressor *decompressor, -) (int, int) { +func processV0OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, decompressor *decompressor) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV0Message(fp, message) + processV0Message(o, fp, message) return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } @@ -1689,14 +1731,15 @@ func (o *cursorOffsetNext) processV0OuterMessage( innerMessage := &innerMessages[i] innerMessage.Attributes |= int8(compression) innerMessage.Offset = firstOffset + int64(i) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } } return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV0Message( +func processV0Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, ) bool { @@ -1708,7 +1751,7 @@ func (o *cursorOffsetNext) processV0Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v0MessageToRecord(o.from.topic, fp.Partition, message) + record := v0MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } @@ -1717,8 +1760,8 @@ func (o *cursorOffsetNext) processV0Message( // // If the record is being aborted or the record is a control record and the // client does not want to keep control records, this does not keep the record. -func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { - if record.Offset < o.offset { +func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { + if record.Offset < o.Offset { // We asked for offset 5, but that was in the middle of a // batch; we got offsets 0 thru 4 that we need to skip. return @@ -1726,7 +1769,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // We only keep control records if specifically requested. if record.Attrs.IsControl() { - abort = !o.from.keepControl + abort = !o.KeepControlRecords } if !abort { fp.Records = append(fp.Records, record) @@ -1734,9 +1777,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // The record offset may be much larger than our expected offset if the // topic is compacted. - o.offset = record.Offset + 1 - o.lastConsumedEpoch = record.LeaderEpoch - o.lastConsumedTime = record.Timestamp + o.Offset = record.Offset + 1 } /////////////////////////////// From ceadc28d3bd974eb03f53c89b5ae4f961161090b Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 8 Aug 2024 18:01:55 +0200 Subject: [PATCH 2/2] Restore multiline processV0OuterMessage --- pkg/kgo/source.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index cdef8a3a..85586a0a 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1685,7 +1685,12 @@ func processV1Message( // Processes an outer v0 message. We expect inner messages to be entirely v0 as // well, so this only tries v0 always. -func processV0OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, decompressor *decompressor) (int, int) { +func processV0OuterMessage( + o *ProcessFetchPartitionOptions, + fp *FetchPartition, + message *kmsg.MessageV0, + decompressor *decompressor, +) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { processV0Message(o, fp, message)