Skip to content

Commit

Permalink
kgo.Client: add UpdateFetchMaxBytes
Browse files Browse the repository at this point in the history
One final knob to control how much a user fetches and thus keeps in
memory.

Closes #375.
  • Loading branch information
twmb committed Mar 13, 2023
1 parent b0fa1a0 commit 3a7f35e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
12 changes: 12 additions & 0 deletions pkg/kgo/atomic_maybe_work.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kgo

import "sync/atomic"

const (
stateUnstarted = iota
stateWorking
Expand Down Expand Up @@ -62,3 +64,13 @@ func (l *workLoop) maybeFinish(again bool) bool {
func (l *workLoop) hardFinish() {
l.state.Store(stateUnstarted)
}

// lazyI32 is used in a few places where we want atomics _sometimes_. Some
// uses do not need to be atomic (notably, setup), and we do not want the
// noCopy guard.
//
// Specifically, this is used for a few int32 settings in the config.
type lazyI32 int32

func (v *lazyI32) store(s int32) { atomic.StoreInt32((*int32)(v), s) }
func (v *lazyI32) load() int32 { return atomic.LoadInt32((*int32)(v)) }
8 changes: 4 additions & 4 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ type cfg struct {

maxWait int32
minBytes int32
maxBytes int32
maxPartBytes int32
maxBytes lazyI32
maxPartBytes lazyI32
resetOffset Offset
isolationLevel int8
keepControl bool
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func FetchMaxWait(wait time.Duration) ConsumerOpt {
// recommended to set this option so that decompression does not eat all of
// your RAM.
func FetchMaxBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxBytes = b }}
return consumerOpt{func(cfg *cfg) { cfg.maxBytes = lazyI32(b) }}
}

// FetchMinBytes sets the minimum amount of bytes a broker will try to send
Expand All @@ -1154,7 +1154,7 @@ func FetchMinBytes(b int32) ConsumerOpt {
//
// This corresponds to the Java max.partition.fetch.bytes setting.
func FetchMaxPartitionBytes(b int32) ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = b }}
return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = lazyI32(b) }}
}

// MaxConcurrentFetches sets the maximum number of fetch requests to allow in
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,14 @@ func (cl *Client) AllowRebalance() {
cl.consumer.allowRebalance()
}

// UpdateFetchMaxBytes updates the max bytes that a fetch request will ask for
// and the max partition bytes that a fetch request will ask for each
// partition.
func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
cl.cfg.maxBytes.store(maxBytes)
cl.cfg.maxPartBytes.store(maxPartBytes)
}

// PauseFetchTopics sets the client to no longer fetch the given topics and
// returns all currently paused topics. Paused topics persist until resumed.
// You can call this function with no topics to simply receive the list of
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ func (s *source) createReq() *fetchRequest {
req := &fetchRequest{
maxWait: s.cl.cfg.maxWait,
minBytes: s.cl.cfg.minBytes,
maxBytes: s.cl.cfg.maxBytes,
maxPartBytes: s.cl.cfg.maxPartBytes,
maxBytes: s.cl.cfg.maxBytes.load(),
maxPartBytes: s.cl.cfg.maxPartBytes.load(),
rack: s.cl.cfg.rack,
isolationLevel: s.cl.cfg.isolationLevel,
preferLagFn: s.cl.cfg.preferLagFn,
Expand Down

0 comments on commit 3a7f35e

Please sign in to comment.