Skip to content

Commit

Permalink
Merge pull request #746 from slaunay/enhancement/producer-metrics
Browse files Browse the repository at this point in the history
Expose producer metrics with go-metrics
  • Loading branch information
eapache authored Nov 22, 2016
2 parents 0c2a04e + 124e7c6 commit 3f392a5
Show file tree
Hide file tree
Showing 17 changed files with 200 additions and 36 deletions.
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
}

req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req)
buf, err := encode(req, b.conf.MetricRegistry)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(meta)
buf, err := encode(meta, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
Expand All @@ -56,7 +56,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
UserData: []byte{0x01, 0x02, 0x03},
}

buf, err := encode(amt)
buf, err := encode(amt, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
Expand Down
11 changes: 8 additions & 3 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package sarama

import "fmt"
import (
"fmt"

"github.com/rcrowley/go-metrics"
)

// Encoder is the interface that wraps the basic Encode method.
// Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
type encoder interface {
encode(pe packetEncoder) error
}

// Encode takes an Encoder and turns it into bytes.
func encode(e encoder) ([]byte, error) {
// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
if e == nil {
return nil, nil
}
Expand All @@ -27,6 +31,7 @@ func encode(e encoder) ([]byte, error) {
}

realEnc.raw = make([]byte, prepEnc.length)
realEnc.registry = metricRegistry
err = e.encode(&realEnc)
if err != nil {
return nil, err
Expand Down
22 changes: 22 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func validateMetrics(t *testing.T, client Client) {

metricValidators := newMetricValidators()
noResponse := client.Config().Producer.RequiredAcks == NoResponse
compressionEnabled := client.Config().Producer.Compression != CompressionNone

// We read at least 1 byte from the broker
metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
Expand All @@ -203,6 +204,27 @@ func validateMetrics(t *testing.T, client Client) {
metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))

// We send at least 1 batch
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
if compressionEnabled {
// We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
} else {
// We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
}

// We send exactly TestBatchSize messages
metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
// We send at least one record per request
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))

// We receive at least 1 byte from the broker
metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
if noResponse {
Expand Down
2 changes: 1 addition & 1 deletion join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
}

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
bin, err := encode(metadata)
bin, err := encode(metadata, nil)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Message struct {
Timestamp time.Time // the timestamp of the message (version 1+ only)

compressedCache []byte
compressedSize int // used for computing the compression ratio metrics
}

func (m *Message) encode(pe packetEncoder) error {
Expand Down Expand Up @@ -77,6 +78,8 @@ func (m *Message) encode(pe packetEncoder) error {
default:
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
}
// Keep in mind the compressed payload size for metric gathering
m.compressedSize = len(payload)
}

if err = pe.putBytes(payload); err != nil {
Expand Down Expand Up @@ -121,6 +124,10 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}

// Required for deep equal assertion during tests but might be useful
// for future metrics about the compression ratio in fetch requests
m.compressedSize = len(m.Value)

switch m.Codec {
case CompressionNone:
// nothing to do
Expand Down
15 changes: 15 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"fmt"
"strings"

"github.com/rcrowley/go-metrics"
)
Expand Down Expand Up @@ -34,3 +35,17 @@ func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) m
func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
}

func getMetricNameForTopic(name string, topic string) string {
// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
// cf. KAFKA-1902 and KAFKA-2337
return fmt.Sprintf(name+"-for-topic-%s", strings.Replace(topic, ".", "_", -1))
}

func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter {
return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r)
}

func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r)
}
14 changes: 14 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (m *metricValidators) registerForBroker(broker *Broker, validator *metricVa
m.register(&metricValidator{getMetricNameForBroker(validator.name, broker), validator.validator})
}

func (m *metricValidators) registerForGlobalAndTopic(topic string, validator *metricValidator) {
m.register(&metricValidator{validator.name, validator.validator})
m.register(&metricValidator{getMetricNameForTopic(validator.name, topic), validator.validator})
}

func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metricValidator) {
m.register(validator)
m.registerForBroker(broker, validator)
Expand Down Expand Up @@ -156,3 +161,12 @@ func minValHistogramValidator(name string, minMin int) *metricValidator {
}
})
}

func maxValHistogramValidator(name string, maxMax int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
max := int(histogram.Max())
if max > maxMax {
t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max)
}
})
}
2 changes: 1 addition & 1 deletion mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
}
Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)

encodedRes, err := encode(res)
encodedRes, err := encode(res, nil)
if err != nil {
b.serverError(err)
break
Expand Down
8 changes: 8 additions & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "github.com/rcrowley/go-metrics"

// PacketEncoder is the interface providing helpers for writing with Kafka's encoding rules.
// Types implementing Encoder only need to worry about calling methods like PutString,
// not about how a string is represented in Kafka.
Expand All @@ -19,9 +21,15 @@ type packetEncoder interface {
putInt32Array(in []int32) error
putInt64Array(in []int64) error

// Provide the current offset to record the batch size metric
offset() int

// Stacks, see PushEncoder
push(in pushEncoder)
pop() error

// To record metrics when provided
metricRegistry() metrics.Registry
}

// PushEncoder is the interface for encoding fields like CRCs and lengths where the value
Expand Down
11 changes: 11 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"fmt"
"math"

"github.com/rcrowley/go-metrics"
)

type prepEncoder struct {
Expand Down Expand Up @@ -99,6 +101,10 @@ func (pe *prepEncoder) putInt64Array(in []int64) error {
return nil
}

func (pe *prepEncoder) offset() int {
return pe.length
}

// stackable

func (pe *prepEncoder) push(in pushEncoder) {
Expand All @@ -108,3 +114,8 @@ func (pe *prepEncoder) push(in pushEncoder) {
func (pe *prepEncoder) pop() error {
return nil
}

// we do not record metrics during the prep encoder pass
func (pe *prepEncoder) metricRegistry() metrics.Registry {
return nil
}
50 changes: 50 additions & 0 deletions produce_request.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "github.com/rcrowley/go-metrics"

// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
// it must see before responding. Any of the constants defined here are valid. On broker versions
// prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
Expand Down Expand Up @@ -30,6 +32,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if err != nil {
return err
}
metricRegistry := pe.metricRegistry()
var batchSizeMetric metrics.Histogram
var compressionRatioMetric metrics.Histogram
if metricRegistry != nil {
batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
}

totalRecordCount := int64(0)
for topic, partitions := range r.msgSets {
err = pe.putString(topic)
if err != nil {
Expand All @@ -39,7 +50,13 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if err != nil {
return err
}
topicRecordCount := int64(0)
var topicCompressionRatioMetric metrics.Histogram
if metricRegistry != nil {
topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
}
for id, msgSet := range partitions {
startOffset := pe.offset()
pe.putInt32(id)
pe.push(&lengthField{})
err = msgSet.encode(pe)
Expand All @@ -50,8 +67,41 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if err != nil {
return err
}
if metricRegistry != nil {
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
}
batchSize := int64(pe.offset() - startOffset)
batchSizeMetric.Update(batchSize)
getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
}
}
if topicRecordCount > 0 {
getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
totalRecordCount += topicRecordCount
}
}
if totalRecordCount > 0 {
metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
// and sent as the payload of a single fake "message" with the appropriate codec
// set and no key. When the server sees a message with a compression codec, it
// decompresses the payload and treats the result as its message set.
payload, err := encode(set.setToSend)
payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
Expand All @@ -98,6 +98,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
Codec: ps.parent.conf.Producer.Compression,
Key: nil,
Value: payload,
Set: set.setToSend, // Provide the underlying message set for accurate metrics
}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
compMsg.Version = 1
Expand Down
22 changes: 18 additions & 4 deletions real_encoder.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package sarama

import "encoding/binary"
import (
"encoding/binary"

"github.com/rcrowley/go-metrics"
)

type realEncoder struct {
raw []byte
off int
stack []pushEncoder
raw []byte
off int
stack []pushEncoder
registry metrics.Registry
}

// primitives
Expand Down Expand Up @@ -98,6 +103,10 @@ func (re *realEncoder) putInt64Array(in []int64) error {
return nil
}

func (re *realEncoder) offset() int {
return re.off
}

// stacks

func (re *realEncoder) push(in pushEncoder) {
Expand All @@ -113,3 +122,8 @@ func (re *realEncoder) pop() error {

return in.run(re.off, re.raw)
}

// we do record metrics during the real encoder pass
func (re *realEncoder) metricRegistry() metrics.Registry {
return re.registry
}
Loading

0 comments on commit 3f392a5

Please sign in to comment.