Skip to content

Commit

Permalink
feat(metrics): track consumer fetch request rates
Browse files Browse the repository at this point in the history
In additional to the global request rates, split out the fetch request
rates for consumers.

- fetch-rate
- fetch-rate-for-broker-X
- fetch-rate-for-topic-Y
  • Loading branch information
dnwe committed Aug 2, 2022
1 parent 9a76925 commit 359272f
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 3 deletions.
13 changes: 13 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Broker struct {

incomingByteRate metrics.Meter
requestRate metrics.Meter
fetchRate metrics.Meter
requestSize metrics.Histogram
requestLatency metrics.Histogram
outgoingByteRate metrics.Meter
Expand All @@ -45,6 +46,7 @@ type Broker struct {
requestsInFlight metrics.Counter
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerFetchRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerRequestLatency metrics.Histogram
brokerOutgoingByteRate metrics.Meter
Expand Down Expand Up @@ -208,6 +210,7 @@ func (b *Broker) Open(conf *Config) error {
// Create or reuse the global metrics shared between brokers
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
b.fetchRate = metrics.GetOrRegisterMeter("fetch-rate", conf.MetricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
Expand Down Expand Up @@ -472,6 +475,15 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {

// Fetch returns a FetchResponse or error
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
defer func() {
if b.fetchRate != nil {
b.fetchRate.Mark(1)
}
if b.brokerFetchRate != nil {
b.brokerFetchRate.Mark(1)
}
}()

response := new(FetchResponse)

err := b.sendAndReceive(request, response)
Expand Down Expand Up @@ -1600,6 +1612,7 @@ func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
func (b *Broker) registerMetrics() {
b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
b.brokerRequestRate = b.registerMeter("request-rate")
b.brokerFetchRate = b.registerMeter("fetch-rate")
b.brokerRequestSize = b.registerHistogram("request-size")
b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
Expand Down
5 changes: 4 additions & 1 deletion broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
})
broker := NewBroker(mb.Addr())
// Set the broker id in order to validate local broker metrics
// Set the broker id in order to validate local broujhjker metrics
broker.id = 0
conf := NewTestConfig()
conf.ApiVersionsRequest = false
Expand All @@ -132,6 +132,9 @@ func TestSimpleBrokerCommunication(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err := broker.Connected(); err != nil {
t.Error(err)
}
tt.runner(t, broker)
// Wait up to 500 ms for the remote broker to process the request and
// notify us about the metrics
Expand Down
3 changes: 3 additions & 0 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ const (
)

func (r *FetchRequest) encode(pe packetEncoder) (err error) {
metricRegistry := pe.metricRegistry()

pe.putInt32(-1) // ReplicaID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
pe.putInt32(r.MinBytes)
Expand Down Expand Up @@ -128,6 +130,7 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
return err
}
}
getOrRegisterTopicMeter("fetch-rate", topic, metricRegistry).Mark(1)
}
if r.Version >= 7 {
err = pe.putArrayLength(len(r.forgotten))
Expand Down
37 changes: 35 additions & 2 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, producer)

// Validate producer metrics before using the consumer minus the offset request
validateMetrics(t, client)
validateProducerMetrics(t, client)

master, err := NewConsumerFromClient(client)
if err != nil {
Expand All @@ -332,6 +332,9 @@ func testProducingMessages(t *testing.T, config *Config) {
}
}
}

validateConsumerMetrics(t, client)

safeClose(t, consumer)
safeClose(t, client)
}
Expand Down Expand Up @@ -397,7 +400,7 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
closeProducer(t, producer)
}

func validateMetrics(t *testing.T, client Client) {
func validateProducerMetrics(t *testing.T, client Client) {
// Get the broker used by test1 topic
var broker *Broker
if partitions, err := client.Partitions("test.1"); err != nil {
Expand Down Expand Up @@ -486,6 +489,36 @@ func validateMetrics(t *testing.T, client Client) {
metricValidators.run(t, client.Config().MetricRegistry)
}

func validateConsumerMetrics(t *testing.T, client Client) {
// Get the broker used by test1 topic
var broker *Broker
if partitions, err := client.Partitions("test.1"); err != nil {
t.Error(err)
} else {
for _, partition := range partitions {
if b, err := client.Leader("test.1", partition); err != nil {
t.Error(err)
} else {
if broker != nil && b != broker {
t.Fatal("Expected only one broker, got at least 2")
}
broker = b
}
}
}

metricValidators := newMetricValidators()

// at least 1 global fetch request for the given topic
metricValidators.registerForGlobalAndTopic("test_1", minCountMeterValidator("fetch-rate", 1))

// and at least 1 fetch request to the lead broker
metricValidators.registerForBroker(broker, minCountMeterValidator("fetch-rate", 1))

// Run the validators
metricValidators.run(t, client.Config().MetricRegistry)
}

// Benchmarks

func BenchmarkProducerSmall(b *testing.B) {
Expand Down
3 changes: 3 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (m *metricValidators) registerForAllBrokers(broker *Broker, validator *metr
}

func (m metricValidators) run(t *testing.T, r metrics.Registry) {
t.Helper()
for _, metricValidator := range m {
metric := r.Get(metricValidator.name)
if metric == nil {
Expand All @@ -82,6 +83,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter))
return &metricValidator{
name: name,
validator: func(t *testing.T, metric interface{}) {
t.Helper()
if meter, ok := metric.(metrics.Meter); !ok {
t.Errorf("Expected meter metric for '%s', got %T", name, metric)
} else {
Expand All @@ -93,6 +95,7 @@ func meterValidator(name string, extraValidator func(*testing.T, metrics.Meter))

func countMeterValidator(name string, expectedCount int) *metricValidator {
return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
t.Helper()
count := meter.Count()
if count != int64(expectedCount) {
t.Errorf("Expected meter metric '%s' count = %d, got %d", name, expectedCount, count)
Expand Down
3 changes: 3 additions & 0 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ Consumer related metrics:
| consumer-group-join-failed-<GroupID> | counter | Total count of consumer group join failures |
| consumer-group-sync-total-<GroupID> | counter | Total count of consumer group sync attempts |
| consumer-group-sync-failed-<GroupID> | counter | Total count of consumer group sync failures |
| fetch-rate | meter | Fetch requests/second sent to all brokers |
| fetch-rate-for-broker-<broker> | meter | Fetch requests/second sent to a given broker |
| fetch-rate-for-topic-<topic> | meter | Fetch requests/second sent for a given topic |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
*/
Expand Down

0 comments on commit 359272f

Please sign in to comment.