Skip to content

Commit

Permalink
feat(metrics): track consumer-fetch-response-size
Browse files Browse the repository at this point in the history
- add metrics registry to decode func
- add metric to track the size of fetch response
  • Loading branch information
dnwe committed Aug 10, 2022
1 parent 1d4bdc2 commit 1d63590
Show file tree
Hide file tree
Showing 17 changed files with 66 additions and 36 deletions.
4 changes: 2 additions & 2 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,9 @@ func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscripti
// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
userDataV1 := &StickyAssignorUserDataV1{}
if err := decode(userDataBytes, userDataV1); err != nil {
if err := decode(userDataBytes, userDataV1, nil); err != nil {
userDataV0 := &StickyAssignorUserDataV0{}
if err := decode(userDataBytes, userDataV0); err != nil {
if err := decode(userDataBytes, userDataV0, nil); err != nil {
return nil, err
}
return userDataV0, nil
Expand Down
16 changes: 8 additions & 8 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
return
}

if err := versionedDecode(packets, res, request.version()); err != nil {
if err := versionedDecode(packets, res, request.version(), b.conf.MetricRegistry); err != nil {
// Malformed response
cb(nil, err)
return
Expand Down Expand Up @@ -1023,13 +1023,13 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
return nil
}

return handleResponsePromise(req, res, promise)
return b.handleResponsePromise(req, res, promise)
}

func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error {
func (b *Broker) handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise) error {
select {
case buf := <-promise.packets:
return versionedDecode(buf, res, req.version())
return versionedDecode(buf, res, req.version(), b.conf.MetricRegistry)
case err := <-promise.errors:
return err
}
Expand Down Expand Up @@ -1121,7 +1121,7 @@ func (b *Broker) responseReceiver() {
}

decodedHeader := responseHeader{}
err = versionedDecode(header, &decodedHeader, response.headerVersion)
err = versionedDecode(header, &decodedHeader, response.headerVersion, b.conf.MetricRegistry)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func (b *Broker) authenticateViaSASLv1() error {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom)
handshakeErr = b.handleResponsePromise(handshakeRequest, handshakeResponse, prom)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
Expand All @@ -1202,7 +1202,7 @@ func (b *Broker) authenticateViaSASLv1() error {
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
return nil, authErr
}
authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom)
authErr = b.handleResponsePromise(authenticateRequest, authenticateResponse, prom)
if authErr != nil {
Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
return nil, authErr
Expand Down Expand Up @@ -1280,7 +1280,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
res := &SaslHandshakeResponse{}

err = versionedDecode(payload, res, 0)
err = versionedDecode(payload, res, 0, b.conf.MetricRegistry)
if err != nil {
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
return err
Expand Down
8 changes: 4 additions & 4 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {
}

meta2 := new(ConsumerGroupMemberMetadata)
err = decode(buf, meta2)
err = decode(buf, meta2, nil)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
Expand All @@ -69,10 +69,10 @@ func TestConsumerGroupMemberMetadata(t *testing.T) {

func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV1, meta); err != nil {
if err := decode(groupMemberMetadataV1, meta, nil); err != nil {
t.Error("Failed to decode V1 data", err)
}
if err := decode(groupMemberMetadataV1Bad, meta); err != nil {
if err := decode(groupMemberMetadataV1Bad, meta, nil); err != nil {
t.Error("Failed to decode V1 'bad' data", err)
}
}
Expand All @@ -94,7 +94,7 @@ func TestConsumerGroupMemberAssignment(t *testing.T) {
}

amt2 := new(ConsumerGroupMemberAssignment)
err = decode(buf, amt2)
err = decode(buf, amt2, nil)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
Expand Down
2 changes: 1 addition & 1 deletion consumer_metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestConsumerMetadataResponseError(t *testing.T) {
testEncodable(t, "", response, consumerMetadataResponseError)

decodedResp := &ConsumerMetadataResponse{}
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0, nil); err != nil {
t.Error("could not decode: ", err)
}

Expand Down
4 changes: 2 additions & 2 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAs
return nil, nil
}
assignment := new(ConsumerGroupMemberAssignment)
err := decode(gmd.MemberAssignment, assignment)
err := decode(gmd.MemberAssignment, assignment, nil)
return assignment, err
}

Expand All @@ -261,6 +261,6 @@ func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMeta
return nil, nil
}
metadata := new(ConsumerGroupMemberMetadata)
err := decode(gmd.MemberMetadata, metadata)
err := decode(gmd.MemberMetadata, metadata, nil)
return metadata, err
}
14 changes: 10 additions & 4 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ type versionedDecoder interface {

// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder) error {
func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error {
if buf == nil {
return nil
}

helper := realDecoder{raw: buf}
helper := realDecoder{
raw: buf,
registry: metricRegistry,
}
err := in.decode(&helper)
if err != nil {
return err
Expand All @@ -75,12 +78,15 @@ func decode(buf []byte, in decoder) error {
return nil
}

func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error {
if buf == nil {
return nil
}

helper := realDecoder{raw: buf}
helper := realDecoder{
raw: buf,
registry: metricRegistry,
}
err := in.decode(&helper, version)
if err != nil {
return err
Expand Down
11 changes: 11 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"sort"
"time"

"github.com/rcrowley/go-metrics"
)

const invalidPreferredReplicaID = -1
Expand Down Expand Up @@ -60,6 +62,12 @@ type FetchResponseBlock struct {
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
metricRegistry := pd.metricRegistry()
var sizeMetric metrics.Histogram
if metricRegistry != nil {
sizeMetric = getOrRegisterHistogram("consumer-fetch-response-size", metricRegistry)
}

tmp, err := pd.getInt16()
if err != nil {
return err
Expand Down Expand Up @@ -115,6 +123,9 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
if err != nil {
return err
}
if sizeMetric != nil {
sizeMetric.Update(int64(recordsSize))
}

recordsDecoder, err := pd.getSubset(int(recordsSize))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata
members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
for _, member := range r.Members {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(member.Metadata, meta); err != nil {
if err := decode(member.Metadata, meta, nil); err != nil {
return nil, err
}
members[member.MemberId] = *meta
Expand Down
2 changes: 1 addition & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestMessageDecodingVersion1(t *testing.T) {

func TestMessageDecodingUnknownVersions(t *testing.T) {
message := Message{Version: 2}
err := decode(emptyV2Message, &message)
err := decode(emptyV2Message, &message, nil)
if err == nil {
t.Error("Decoding did not produce an error for an unknown magic byte")
}
Expand Down
5 changes: 5 additions & 0 deletions packet_decoder.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "github.com/rcrowley/go-metrics"

// PacketDecoder is the interface providing helpers for reading with Kafka's encoding rules.
// Types implementing Decoder only need to worry about calling methods like GetString,
// not about how a string is represented in Kafka.
Expand Down Expand Up @@ -40,6 +42,9 @@ type packetDecoder interface {
// Stacks, see PushDecoder
push(in pushDecoder) error
pop() error

// To record metrics when provided
metricRegistry() metrics.Registry
}

// PushDecoder is the interface for decoding fields like CRCs and lengths where the validity
Expand Down
13 changes: 10 additions & 3 deletions real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"encoding/binary"
"math"

"github.com/rcrowley/go-metrics"
)

var (
Expand All @@ -15,9 +17,10 @@ var (
)

type realDecoder struct {
raw []byte
off int
stack []pushDecoder
raw []byte
off int
stack []pushDecoder
registry metrics.Registry
}

// primitives
Expand Down Expand Up @@ -459,3 +462,7 @@ func (rd *realDecoder) pop() error {

return in.check(rd.off, rd.raw)
}

func (rd *realDecoder) metricRegistry() metrics.Registry {
return rd.registry
}
2 changes: 1 addition & 1 deletion record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
}

b.recordsLen = len(recBuffer)
err = decode(recBuffer, recordsArray(b.Records))
err = decode(recBuffer, recordsArray(b.Records), nil)
if errors.Is(err, ErrInsufficientData) {
b.PartialTrailingRecord = true
b.Records = nil
Expand Down
8 changes: 4 additions & 4 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func TestLegacyRecords(t *testing.T) {
set = &MessageSet{}
r = Records{}

err = decode(exp, set)
err = decode(exp, set, nil)
if err != nil {
t.Fatal(err)
}
err = decode(buf, &r)
err = decode(buf, &r, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -110,11 +110,11 @@ func TestDefaultRecords(t *testing.T) {
batch = &RecordBatch{}
r = Records{}

err = decode(exp, batch)
err = decode(exp, batch, nil)
if err != nil {
t.Fatal(err)
}
err = decode(buf, &r)
err = decode(buf, &r, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func decodeRequest(r io.Reader) (*request, int, error) {
bytesRead += len(encodedReq)

req := &request{}
if err := decode(encodedReq, req); err != nil {
if err := decode(encodedReq, req, nil); err != nil {
return nil, bytesRead, err
}

Expand Down
6 changes: 3 additions & 3 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ func testEncodable(t *testing.T, name string, in encoder, expect []byte) {
}

func testDecodable(t *testing.T, name string, out decoder, in []byte) {
err := decode(in, out)
err := decode(in, out, nil)
if err != nil {
t.Error("Decoding", name, "failed:", err)
}
}

func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []byte, version int16) {
err := versionedDecode(in, out, version)
err := versionedDecode(in, out, version, nil)
if err != nil {
t.Error("Decoding", name, "version", version, "failed:", err)
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func testResponse(t *testing.T, name string, res protocolBody, expected []byte)
}

decoded := reflect.New(reflect.TypeOf(res).Elem()).Interface().(versionedDecoder)
if err := versionedDecode(encoded, decoded, res.version()); err != nil {
if err := versionedDecode(encoded, decoded, res.version(), nil); err != nil {
t.Error("Decoding", name, "failed:", err)
}

Expand Down
1 change: 1 addition & 0 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Consumer related metrics:
| consumer-fetch-rate | meter | Fetch requests/second sent to all brokers |
| consumer-fetch-rate-for-broker-<broker> | meter | Fetch requests/second sent to a given broker |
| consumer-fetch-rate-for-topic-<topic> | meter | Fetch requests/second sent for a given topic |
| consumer-fetch-response-size | histogram | Distribution of the fetch response size in bytes |
| consumer-group-join-total-<GroupID> | counter | Total count of consumer group join attempts |
| consumer-group-join-failed-<GroupID> | counter | Total count of consumer group join failures |
| consumer-group-sync-total-<GroupID> | counter | Total count of consumer group sync attempts |
Expand Down
2 changes: 1 addition & 1 deletion sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type SyncGroupResponse struct {

func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
assignment := new(ConsumerGroupMemberAssignment)
err := decode(r.MemberAssignment, assignment)
err := decode(r.MemberAssignment, assignment, nil)
return assignment, err
}

Expand Down

0 comments on commit 1d63590

Please sign in to comment.