diff --git a/broker.go b/broker.go index 3feb6a7217..82d866e5ef 100644 --- a/broker.go +++ b/broker.go @@ -37,6 +37,7 @@ type Broker struct { incomingByteRate metrics.Meter requestRate metrics.Meter + fetchRate metrics.Meter requestSize metrics.Histogram requestLatency metrics.Histogram outgoingByteRate metrics.Meter @@ -45,6 +46,7 @@ type Broker struct { requestsInFlight metrics.Counter brokerIncomingByteRate metrics.Meter brokerRequestRate metrics.Meter + brokerFetchRate metrics.Meter brokerRequestSize metrics.Histogram brokerRequestLatency metrics.Histogram brokerOutgoingByteRate metrics.Meter @@ -208,6 +210,7 @@ func (b *Broker) Open(conf *Config) error { // Create or reuse the global metrics shared between brokers b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry) b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry) + b.fetchRate = metrics.GetOrRegisterMeter("fetch-rate", conf.MetricRegistry) b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry) b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry) b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry) @@ -472,6 +475,15 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { // Fetch returns a FetchResponse or error func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { + defer func() { + if b.fetchRate != nil { + b.fetchRate.Mark(1) + } + if b.brokerFetchRate != nil { + b.brokerFetchRate.Mark(1) + } + }() + response := new(FetchResponse) err := b.sendAndReceive(request, response) @@ -1600,6 +1612,7 @@ func (b *Broker) updateThrottleMetric(throttleTime time.Duration) { func (b *Broker) registerMetrics() { b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate") b.brokerRequestRate = b.registerMeter("request-rate") + b.brokerFetchRate = b.registerMeter("fetch-rate") b.brokerRequestSize = b.registerHistogram("request-size") b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms") b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate") diff --git a/broker_test.go b/broker_test.go index 9310d8b904..52a4e4baed 100644 --- a/broker_test.go +++ b/broker_test.go @@ -123,7 +123,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { pendingNotify <- brokerMetrics{bytesRead, bytesWritten} }) broker := NewBroker(mb.Addr()) - // Set the broker id in order to validate local broker metrics + // Set the broker id in order to validate local broujhjker metrics broker.id = 0 conf := NewTestConfig() conf.ApiVersionsRequest = false @@ -132,6 +132,9 @@ func TestSimpleBrokerCommunication(t *testing.T) { if err != nil { t.Fatal(err) } + if _, err := broker.Connected(); err != nil { + t.Error(err) + } tt.runner(t, broker) // Wait up to 500 ms for the remote broker to process the request and // notify us about the metrics diff --git a/fetch_request.go b/fetch_request.go index 4da5a1d2dc..c194ea47f9 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -95,6 +95,8 @@ const ( ) func (r *FetchRequest) encode(pe packetEncoder) (err error) { + metricRegistry := pe.metricRegistry() + pe.putInt32(-1) // ReplicaID is always -1 for clients pe.putInt32(r.MaxWaitTime) pe.putInt32(r.MinBytes) @@ -128,6 +130,7 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { return err } } + getOrRegisterTopicMeter("fetch-rate", topic, metricRegistry).Mark(1) } if r.Version >= 7 { err = pe.putArrayLength(len(r.forgotten)) diff --git a/functional_producer_test.go b/functional_producer_test.go index eec20ff363..d4ca0c29ac 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -307,7 +307,7 @@ func testProducingMessages(t *testing.T, config *Config) { safeClose(t, producer) // Validate producer metrics before using the consumer minus the offset request - validateMetrics(t, client) + validateProducerMetrics(t, client) master, err := NewConsumerFromClient(client) if err != nil { @@ -332,6 +332,9 @@ func testProducingMessages(t *testing.T, config *Config) { } } } + + validateConsumerMetrics(t, client) + safeClose(t, consumer) safeClose(t, client) } @@ -397,7 +400,7 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) { closeProducer(t, producer) } -func validateMetrics(t *testing.T, client Client) { +func validateProducerMetrics(t *testing.T, client Client) { // Get the broker used by test1 topic var broker *Broker if partitions, err := client.Partitions("test.1"); err != nil { @@ -486,6 +489,36 @@ func validateMetrics(t *testing.T, client Client) { metricValidators.run(t, client.Config().MetricRegistry) } +func validateConsumerMetrics(t *testing.T, client Client) { + // Get the broker used by test1 topic + var broker *Broker + if partitions, err := client.Partitions("test.1"); err != nil { + t.Error(err) + } else { + for _, partition := range partitions { + if b, err := client.Leader("test.1", partition); err != nil { + t.Error(err) + } else { + if broker != nil && b != broker { + t.Fatal("Expected only one broker, got at least 2") + } + broker = b + } + } + } + + metricValidators := newMetricValidators() + + // at least 1 global fetch request for the given topic + metricValidators.registerForGlobalAndTopic("test_1", minCountMeterValidator("fetch-rate", 1)) + + // and at least 1 fetch request to the lead broker + metricValidators.registerForBroker(broker, minCountMeterValidator("fetch-rate", 1)) + + // Run the validators + metricValidators.run(t, client.Config().MetricRegistry) +} + // Benchmarks func BenchmarkProducerSmall(b *testing.B) { diff --git a/metrics_test.go b/metrics_test.go index 7572f5b909..ee7aea3746 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -68,6 +68,7 @@ func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metr } func (m metricValidators) run(t *testing.T, r metrics.Registry) { + t.Helper() for _, metricValidator := range m { metric := r.Get(metricValidator.name) if metric == nil { @@ -82,6 +83,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) return &metricValidator{ name: name, validator: func(t *testing.T, metric interface{}) { + t.Helper() if meter, ok := metric.(metrics.Meter); !ok { t.Errorf("Expected meter metric for '%s', got %T", name, metric) } else { @@ -93,6 +95,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter)) func countMeterValidator(name string, expectedCount int) *metricValidator { return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + t.Helper() count := meter.Count() if count != int64(expectedCount) { t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count) diff --git a/sarama.go b/sarama.go index 963515a73c..70fa96bcac 100644 --- a/sarama.go +++ b/sarama.go @@ -72,6 +72,9 @@ Consumer related metrics: | consumer-group-join-failed- | counter | Total count of consumer group join failures | | consumer-group-sync-total- | counter | Total count of consumer group sync attempts | | consumer-group-sync-failed- | counter | Total count of consumer group sync failures | + | fetch-rate | meter | Fetch requests/second sent to all brokers | + | fetch-rate-for-broker- | meter | Fetch requests/second sent to a given broker | + | fetch-rate-for-topic- | meter | Fetch requests/second sent for a given topic | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ */