Skip to content

Commit

Permalink
Clean up aggregator via helper methods
Browse files Browse the repository at this point in the history
Also get rid of forceFlushThreshold it does not need its own function
  • Loading branch information
eapache committed Aug 13, 2015
1 parent 85c7680 commit 765b3b4
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 46 deletions.
106 changes: 64 additions & 42 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/eapache/queue"
)

func forceFlushThreshold() int {
return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
}

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -491,6 +487,7 @@ func (pp *partitionProducer) updateLeader() error {
})
}

// one per broker, constructs both an aggregator and a flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)
Expand All @@ -514,28 +511,21 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
return input
}

// one per broker
// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage

buffer []*ProducerMessage
bufferBytes int
timer <-chan time.Time
}

func (a *aggregator) run() {
var (
timer <-chan time.Time
buffer []*ProducerMessage
flushTriggered chan<- []*ProducerMessage
bytesAccumulated int
defaultFlush bool
)

if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
defaultFlush = true
}
var output chan<- []*ProducerMessage

for {
select {
Expand All @@ -544,45 +534,77 @@ func (a *aggregator) run() {
goto shutdown
}

if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
(a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
(a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
if a.wouldOverflow(msg) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
a.output <- buffer
timer = nil
buffer = nil
flushTriggered = nil
bytesAccumulated = 0
a.output <- a.buffer
a.reset()
output = nil
}

buffer = append(buffer, msg)
bytesAccumulated += msg.byteSize()
a.buffer = append(a.buffer, msg)
a.bufferBytes += msg.byteSize()

if defaultFlush ||
msg.flags&chaser == chaser ||
(a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
(a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
flushTriggered = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
timer = time.After(a.parent.conf.Producer.Flush.Frequency)
if a.readyToFlush(msg) {
output = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
}
case <-timer:
flushTriggered = a.output
case flushTriggered <- buffer:
timer = nil
buffer = nil
flushTriggered = nil
bytesAccumulated = 0
case <-a.timer:
output = a.output
case output <- a.buffer:
a.reset()
output = nil
}
}

shutdown:
if len(buffer) > 0 {
a.output <- buffer
if len(a.buffer) > 0 {
a.output <- a.buffer
}
close(a.output)
}

func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
switch {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
return true
// Would we overflow the size-limit of a compressed message-batch?
case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
}
}

func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
switch {
// If all three config values are 0, we always flush as-fast-as-possible
case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
return true
// If the messages is a chaser we must flush to maintain the state-machine
case msg.flags&chaser == chaser:
return true
// If we've passed the message trigger-point
case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
return true
// If we've passed the byte trigger-point
case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
return true
default:
return false
}
}

func (a *aggregator) reset() {
a.timer = nil
a.buffer = nil
a.bufferBytes = 0
}

// takes a batch at a time from the aggregator and sends to the broker
type flusher struct {
parent *asyncProducer
Expand Down
8 changes: 4 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ func (c *Config) Validate() error {
if c.Producer.RequiredAcks > 1 {
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
}
if c.Producer.MaxMessageBytes >= forceFlushThreshold() {
Logger.Println("Producer.MaxMessageBytes is too close to MaxRequestSize; it will be ignored.")
if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
Logger.Println("Producer.MaxMessageBytes is larger than MaxRequestSize; it will be ignored.")
}
if c.Producer.Flush.Bytes >= forceFlushThreshold() {
Logger.Println("Producer.Flush.Bytes is too close to MaxRequestSize; it will be ignored.")
if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
Logger.Println("Producer.Flush.Bytes is larger than MaxRequestSize; it will be ignored.")
}
if c.Producer.Timeout%time.Millisecond != 0 {
Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
Expand Down

0 comments on commit 765b3b4

Please sign in to comment.