Skip to content

Commit

Permalink
*kgo.Records pooling support
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
  • Loading branch information
ortuman committed Sep 20, 2024
1 parent b77dd13 commit fc5a7a3
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
14 changes: 14 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type cfg struct {
partitions map[string]map[int32]Offset // partitions to directly consume from
regex bool

recordsPool recordsPool

////////////////////////////
// CONSUMER GROUP SECTION //
////////////////////////////
Expand Down Expand Up @@ -1347,6 +1349,18 @@ func ConsumeRegex() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.regex = true }}
}

// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool,
// in order to minimize the number of allocations.
//
// By enabling this option, the records returned by PollFetches/PollRecords
// can be sent back to the pool via ReuseRecords method in order to be recycled.
//
// This option is particularly useful for use cases where the volume of generated records is very high,
// as it can negatively impact performance due to the extra GC overhead.
func EnableRecordsPool() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }}
}

// DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+).
//
// A "fetch session" is is a way to reduce bandwidth for fetch requests &
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ type Record struct {
// producer hooks. It can also be set in a consumer hook to propagate
// enrichment to consumer clients.
Context context.Context

recordsPool recordsPool
}

// Release releases the record back to the pool.
// This should be called after the record is no longer needed.
func (r *Record) Release() {
r.recordsPool.put(r)
}

func (r *Record) userSize() int64 {
Expand Down
57 changes: 42 additions & 15 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@ import (
"github.com/twmb/franz-go/pkg/kmsg"
)

type recordsPool struct{ p *sync.Pool }

func newRecordsPool() recordsPool {
return recordsPool{
p: &sync.Pool{New: func() any { return &Record{} }},
}
}

func (p recordsPool) get() *Record {
if p.p == nil {
return &Record{}
}
return p.p.Get().(*Record)
}

func (p recordsPool) put(r *Record) {
if p.p == nil {
return
}
*r = Record{} // zero out the record
p.p.Put(r)
}

type readerFrom interface {
ReadFrom([]byte) error
}
Expand Down Expand Up @@ -1068,7 +1091,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
continue
}

fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks)
fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks, s.cl.cfg.recordsPool)
if fp.Err != nil {
if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving {
strip(topic, partition, fp.Err)
Expand Down Expand Up @@ -1245,7 +1268,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

// processRespPartition processes all records in all potentially compressed
// batches (or message sets).
func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition {
func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks, recordsPool recordsPool) FetchPartition {
fp := FetchPartition{
Partition: rp.Partition,
Err: kerr.ErrorForCode(rp.ErrorCode),
Expand Down Expand Up @@ -1377,7 +1400,7 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
case *kmsg.RecordBatch:
m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length
m.CompressionType = uint8(t.Attributes) & 0b0000_0111
m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor)
m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor, recordsPool)
}

if m.UncompressedBytes == 0 {
Expand Down Expand Up @@ -1458,6 +1481,7 @@ func (o *cursorOffsetNext) processRecordBatch(
batch *kmsg.RecordBatch,
aborter aborter,
decompressor *decompressor,
recordsPool recordsPool,
) (int, int) {
if batch.Magic != 2 {
fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic)
Expand Down Expand Up @@ -1508,6 +1532,7 @@ func (o *cursorOffsetNext) processRecordBatch(
fp.Partition,
batch,
&krecords[i],
recordsPool,
)
o.maybeKeepRecord(fp, record, abortBatch)

Expand Down Expand Up @@ -1753,6 +1778,7 @@ func recordToRecord(
partition int32,
batch *kmsg.RecordBatch,
record *kmsg.Record,
recordsPool recordsPool,
) *Record {
h := make([]RecordHeader, 0, len(record.Headers))
for _, kv := range record.Headers {
Expand All @@ -1761,19 +1787,20 @@ func recordToRecord(
Value: kv.Value,
})
}
r := recordsPool.get()

r.Key = record.Key
r.Value = record.Value
r.Headers = h
r.Topic = topic
r.Partition = partition
r.Attrs = RecordAttrs{uint8(batch.Attributes)}
r.ProducerID = batch.ProducerID
r.ProducerEpoch = batch.ProducerEpoch
r.LeaderEpoch = batch.PartitionLeaderEpoch
r.Offset = batch.FirstOffset + int64(record.OffsetDelta)
r.recordsPool = recordsPool

r := &Record{
Key: record.Key,
Value: record.Value,
Headers: h,
Topic: topic,
Partition: partition,
Attrs: RecordAttrs{uint8(batch.Attributes)},
ProducerID: batch.ProducerID,
ProducerEpoch: batch.ProducerEpoch,
LeaderEpoch: batch.PartitionLeaderEpoch,
Offset: batch.FirstOffset + int64(record.OffsetDelta),
}
if r.Attrs.TimestampType() == 0 {
r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)
} else {
Expand Down

0 comments on commit fc5a7a3

Please sign in to comment.