Skip to content

Commit

Permalink
client: fix request buffer pool, readd pnr pool
Browse files Browse the repository at this point in the history
Previously, we were not keeping the inflated buffer, we would put back
into our pool the original buffer. This fixes that, which drastically
reduces garbage for produce requests.

Secondly, after the multiple producer improvements, I'm more confident
in adding a pnr pool similar to the one that originally existed. The
only difference now is that the pnr pool is scoped per client, rather
than globally, so it is safer in the face of races. Again though, after
the previous reduxes, I'm more confident in the pool to begin with.
  • Loading branch information
twmb committed May 30, 2021
1 parent a0d712e commit fec2a18
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
8 changes: 4 additions & 4 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,17 +850,17 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
}
}

buf := cxn.cl.bufPool.get()
defer cxn.cl.bufPool.put(buf)
buf = cxn.cl.reqFormatter.AppendRequest(
buf[:0],
buf := cxn.cl.reqFormatter.AppendRequest(
cxn.cl.bufPool.get()[:0],
req,
cxn.corrID,
)

_, wt := cxn.cl.connTimeoutFn(req)
bytesWritten, writeErr, writeWait, timeToWrite, readEnqueue = cxn.writeConn(ctx, buf, wt, enqueuedForWritingAt)

cxn.cl.bufPool.put(buf)

cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookBrokerWrite); ok {
h.OnWrite(cxn.b.meta, req.Key(), bytesWritten, writeWait, timeToWrite, writeErr)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Client struct {
connTimeoutFn func(kmsg.Request) (time.Duration, time.Duration)

bufPool bufPool // for to brokers to share underlying reusable request buffers
pnrPool pnrPool // for sinks to reuse []promisedNumberedRecord

controllerIDMu sync.Mutex
controllerID int32
Expand Down Expand Up @@ -162,6 +163,7 @@ func NewClient(opts ...Opt) (*Client, error) {
connTimeoutFn: connTimeoutBuilder(cfg.connTimeoutOverhead),

bufPool: newBufPool(),
pnrPool: newPnrPool(),

decompressor: newDecompressor(),

Expand Down
27 changes: 23 additions & 4 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,9 @@ func (cl *Client) finishBatch(batch *recBatch, producerID int64, producerEpoch i
pnr.Attrs = RecordAttrs{uint8(attrs)}

cl.finishRecordPromise(pnr.promisedRec, err)
records[i] = noPNR
}
cl.pnrPool.put(records)
}

// handleRetryBatches sets any first-buf-batch to failing and triggers a
Expand Down Expand Up @@ -1131,11 +1133,15 @@ func (recBuf *recBuf) failAllRecords(err error) {
// modifications to this batch because the recBuf is already
// locked.
batch.mu.Lock()
for _, pnr := range batch.records {
recBuf.cl.finishRecordPromise(pnr.promisedRec, err)
}
records := batch.records
batch.records = nil
batch.mu.Unlock()

for i, pnr := range records {
recBuf.cl.finishRecordPromise(pnr.promisedRec, err)
records[i] = noPNR
}
recBuf.cl.pnrPool.put(records)
}
recBuf.resetBatchDrainIdx()
recBuf.batches = nil
Expand Down Expand Up @@ -1175,6 +1181,8 @@ type promisedNumberedRecord struct {
promisedRec
}

var noPNR = promisedNumberedRecord{}

// recBatch is the type used for buffering records before they are written.
type recBatch struct {
owner *recBuf // who owns us
Expand Down Expand Up @@ -1261,13 +1269,24 @@ func (recBuf *recBuf) newRecordBatch() *recBatch {
4 // record array length
return &recBatch{
owner: recBuf,
records: make([]promisedNumberedRecord, 0, 10),
records: recBuf.cl.pnrPool.get()[:0],
wireLength: recordBatchOverhead,

canFailFromLoadErrs: true, // until we send this batch, we can fail it
}
}

type pnrPool struct{ p *sync.Pool }

func newPnrPool() pnrPool {
return pnrPool{
p: &sync.Pool{New: func() interface{} { r := make([]promisedNumberedRecord, 10); return &r }},
}
}

func (p pnrPool) get() []promisedNumberedRecord { return (*p.p.Get().(*[]promisedNumberedRecord))[:0] }
func (p pnrPool) put(s []promisedNumberedRecord) { p.p.Put(&s) }

// isOwnersFirstBatch returns if the batch in a recBatch is the first batch in
// a records. We only ever want to update batch / buffer logic if the batch is
// the first in the buffer.
Expand Down

0 comments on commit fec2a18

Please sign in to comment.