diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7335674ecd9..620c79d556b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,8 @@ https://github.com/elastic/beats/compare/v5.0.0-rc1...master[Check the HEAD diff *Affecting all Beats* +- Fix kafka output re-trying batches with too large events. {issue}2735[2735] + *Metricbeat* *Packetbeat* diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 2bccf445f80..b0dae8d1841 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -29,9 +29,10 @@ type client struct { } type msgRef struct { - count int32 - batch []outputs.Data - cb func([]outputs.Data, error) + count int32 + total int + failed []outputs.Data + cb func([]outputs.Data, error) err error } @@ -104,9 +105,10 @@ func (c *client) AsyncPublishEvents( debugf("publish events") ref := &msgRef{ - count: int32(len(data)), - batch: data, - cb: cb, + count: int32(len(data)), + total: len(data), + failed: nil, + cb: cb, } ch := c.producer.Input() @@ -136,7 +138,7 @@ func (c *client) getEventMessage(data *outputs.Data) (*message, error) { return msg, nil } - msg.event = event + msg.data = *data topic, err := c.topic.Select(event) if err != nil { @@ -188,7 +190,7 @@ func (c *client) errorWorker(ch <-chan *sarama.ProducerError) { for errMsg := range ch { msg := errMsg.Msg.Metadata.(*message) - msg.ref.fail(errMsg.Err) + msg.ref.fail(msg, errMsg.Err) } } @@ -196,8 +198,20 @@ func (r *msgRef) done() { r.dec() } -func (r *msgRef) fail(err error) { - r.err = err +func (r *msgRef) fail(msg *message, err error) { + switch err { + case sarama.ErrInvalidMessage: + logp.Err("Kafka (topic=%v): dropping invalid message", msg.topic) + + case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize: + logp.Err("Kafka (topic=%v): dropping too large message of size %v.", + msg.topic, + len(msg.key)+len(msg.value)) + + default: + r.failed = append(r.failed, msg.data) + r.err = err + } r.dec() } @@ -211,11 +225,18 @@ func (r *msgRef) dec() { err := r.err if err != nil { - eventsNotAcked.Add(int64(len(r.batch))) + failed := len(r.failed) + success := r.total - failed + + eventsNotAcked.Add(int64(failed)) + if success > 0 { + ackedEvents.Add(int64(success)) + } + debugf("Kafka publish failed with: %v", err) - r.cb(r.batch, err) + r.cb(r.failed, err) } else { - ackedEvents.Add(int64(len(r.batch))) + ackedEvents.Add(int64(r.total)) r.cb(nil, nil) } } diff --git a/libbeat/outputs/kafka/message.go b/libbeat/outputs/kafka/message.go index 20001d65115..6e05ac4ec60 100644 --- a/libbeat/outputs/kafka/message.go +++ b/libbeat/outputs/kafka/message.go @@ -4,7 +4,6 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" ) @@ -20,7 +19,7 @@ type message struct { hash uint32 partition int32 - event common.MapStr + data outputs.Data } var kafkaMessageKey interface{} = int(0) diff --git a/libbeat/outputs/kafka/partition.go b/libbeat/outputs/kafka/partition.go index 70e3091efdd..4316186b2be 100644 --- a/libbeat/outputs/kafka/partition.go +++ b/libbeat/outputs/kafka/partition.go @@ -223,7 +223,7 @@ func makeFieldsHashPartitioner(fields []string, dropFail bool) partitioner { var err error for _, field := range fields { - err = hashFieldValue(hasher, msg.event, field) + err = hashFieldValue(hasher, msg.data.Event, field) if err != nil { break } diff --git a/libbeat/outputs/kafka/partition_test.go b/libbeat/outputs/kafka/partition_test.go index 361adab9861..9e42bc62c86 100644 --- a/libbeat/outputs/kafka/partition_test.go +++ b/libbeat/outputs/kafka/partition_test.go @@ -10,6 +10,7 @@ import ( "github.com/Shopify/sarama" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" "github.com/stretchr/testify/assert" ) @@ -219,7 +220,7 @@ func partTestSimple(N int, makeKey bool) partTestScenario { } msg := &message{partition: -1} - msg.event = event + msg.data = outputs.Data{event, nil} msg.topic = "test" if makeKey { msg.key = randASCIIBytes(10) @@ -272,7 +273,7 @@ func partTestHashInvariant(N int) partTestScenario { } msg := &message{partition: -1} - msg.event = event + msg.data = outputs.Data{event, nil} msg.topic = "test" msg.key = randASCIIBytes(10) msg.value = jsonEvent