-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose broker metrics with go-metrics #701
Conversation
- add MetricRegistry configuration parameter that defaults to metrics.DefaultRegistry - provide the following metrics: - incoming-byte-rate meter (global and per registered broker) - request-rate meter (global and per registered broker) - request-size histogram (global and per registered broker) - outgoing-byte-rate meter (global and per registered broker) - response-rate meter (global and per registered broker) - response-size histogram (global and per registered broker) - add metrics flag to kafka-console-producer to output metrics - add bytes read to decodeRequest - store request and response size in MockBroker history - add unit tests and example - functional tests in functional_producer_test - documentation of metrics in main package
- close MockBroker in the for loop before validating metrics - do not add expectation for ProduceRequest (NoResponse)
- use Logger instead of Logf in unit test to stay consistent - add MmockBroker.WaitForExpectations for graceful shutdown
Sorry for the delays in responding to this - between vacations and some internal deadlines there hasn't been a lot of time for Sarama recently. Hopefully I'll be able to review this properly next week. |
@@ -338,6 +371,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, | |||
return nil, err | |||
} | |||
|
|||
b.updateOutgoingCommunicationMetrics(len(buf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to move this down below the SetWriteDeadline
else you could miscount in the (admittedly exceptional) case where that fails
Thanks for all your work on this! Most of the code review points are pretty minor, it's just the mockbroker changes I'm not really convinced of. Apologies again for the delay. |
break | ||
} | ||
b.lock.Lock() | ||
requestResponse.ResponseSize = len(resHeader) + len(encodedRes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By using a reference to a RequestResponse
I am able to update the ResponseSize
field without keeping the index of the entry that was added to the slice and then updating the entry later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't even have to keep the index though do you? It's always just the last element in the slice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be if the responses are served in serial order but I do not know if this is always the case (third parties using the MockBroker
concurrently).
But the cleaner implementation through a handler should discard that detail I believe.
- update*CommunicationMetrics even when a Read/Write fails - use MockBroker notifier for waiting for both expectations and metrics - add documentation about disabling metrics gathering - use METRICS_DISABLE env variable for disabling metrics in benchmarks - use constants for exponentially decaying reservoir for histograms - fix typo in main documentation
Here are the results from the benchmark when running them twice on the original master commit, against the PR with the no-op implementation and then with the effective implementation:
Results are very similar so it probably comes down to I/O and caching on my VM, that's why I believe the overall impact is minimal. |
if err != nil { | ||
b.updateIncomingCommunicationMetrics(bytesReadHeader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could just do this immediately after calling ReadFull
rather than in all three error branches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but then I would have to call it twice for the regular case where we read the header and then read the body.
This would result in the same metric for byte rate but will double the rate for response rate metric and mess up with response size histogram metric too.
That being said I think using defer
with a totalBytesRead
variable might simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot that this code is running inside a for
loop so using defer
would require wrapping the for
code block inside an function, not sure if it makes it easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, OK I missed the fact it would double-count packets in the success case. This is fine as-is.
Glad to hear that, I think this feature is gonna lead to more optimizations and easier tuning in the future as we will be able to quantify those. |
MetricRegistry
configuration parameter that defaults tometrics.DefaultRegistry
incoming-byte-rate
meter (global and per registered broker)request-rate
meter (global and per registered broker)request-size
histogram (global and per registered broker)outgoing-byte-rate
meter (global and per registered broker)response-rate meter
(global and per registered broker)response-size histogram
(global and per registered broker)metrics
flag tokafka-console-producer
to output metricsdecodeRequest
MockBroker
for unit testingfunctional_producer_test
Related issue #683 and original PR #688.
I'll submit a PR for producer related metrics once this one is merged (should be shorter 😉).