Skip to content

chore: add ThroughputMetricsReporter for consumed/produced metrics from KIP-846#9187

Merged
A. Sophie Blee-Goldman (ableegoldman) merged 13 commits intoconfluentinc:masterfrom
ableegoldman:add-ThroughputMetricsReporter
Jun 14, 2022
Merged

chore: add ThroughputMetricsReporter for consumed/produced metrics from KIP-846#9187
A. Sophie Blee-Goldman (ableegoldman) merged 13 commits intoconfluentinc:masterfrom
ableegoldman:add-ThroughputMetricsReporter

Conversation

@ableegoldman
Copy link
Contributor

@ableegoldman A. Sophie Blee-Goldman (ableegoldman) commented Jun 8, 2022

Introduces a ThroughputMetricsReporter to report the consumed/produced total throughput metrics added to Streams in KIP-846.

Metrics are aggregated to spit out a total sum of records & bytes consumed/produced per topic, per query

}

@Override
public synchronized void configure(final Map<String, ?> configMap) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do things slightly different than in either of the two existing reporters, rather than register the metrics during configuration/initialization and then update them as they come in with metricChange, I just wait and only add them then (ie in metricChange)

final String queryId,
final String topic
) {
final MetricName metricName = getMetricNameWithQueryIdTag(queryId, metric.metricName());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetricName for our ksql metrics is mostly the same as the original Streams metric, except that we add the queryId in the tags

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the finer-grained metric is only used for aggregating into the per-topic per-query ones which is associated with the queryTopicThroughputMetricTags, and we already organized them as inside the query id nested map. So do we still need to change the metric name to attach the query id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand -- note that we're not technically "changing" or removing the original metrics, we're just adding new metrics that have aggregated over all the taskId & processorNode tags, and this new metric is given a query tag as well

Although that reminds me, the ksql metrics' MetricName should not include the taskId and processorNode tags -- I'll update the code to remove that. Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I see what you're saying, once I fix the above then yes, there's no reason to add the queryId to the MetricName of the original metrics from Streams -- only the actual MetricName of the aggregated throughput total needs to have this tag. I'll update that as well


metricRegistry.addMetric(
metricName,
(config, now) -> registeredMetrics.get(queryId).get(topic).get(metricName).getValue()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately we just take the metrics coming out of Streams and then register new metrics which are (a) tagged with the queryId, and (b) aggregated (summed) over all values for that particular throughput metric at that topic from that query.

Essentially we're adding up the records/bytes count across all the different tasks, and across the sink nodes within those tasks if there happen to be more than one producing to a given topic (possible within Streams, not sure if you can actually do this in ksql but that doesn't matter here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on the PR. A meta question is around if the aggregation logic is really achieving what we want to do here?

mkSet(RECORDS_CONSUMED, BYTES_CONSUMED, RECORDS_PRODUCED, BYTES_PRODUCED);
private static final Pattern NAMED_TOPOLOGY_PATTERN = Pattern.compile("(.*?)__\\d*_\\d*");
private static final Pattern QUERY_ID_PATTERN =
Pattern.compile("(?<=query_|transient_)(.*?)(?=-)");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an agreed-upon query-id naming contract that we would guarantee forever? If yes let's document it publicly somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might need to check the quire ID I think there might be a hyphen instead of an underscore for the new version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I just copied this over from the StorageUtilizationMetrics -- so that probably means we need to fix that or else those will be broken again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an agreed-upon query-id naming contract that we would guarantee forever?

Definitely no 🙂 -- however we should consider documenting that, ie that things like our query naming scheme are not part of the public contract. At least, I think they should not be -- maybe if the name was something more predictable, but right now it's built up from a lot of internal implementation details, for example the query number concatenated with what I presume to be a global counter, our abbreviations and/or classification of the query type (eg CTAS, CSAS, transient), and so on.

Same goes for the mapping of query to kafka consumer group -- just like we are/have broken it with the shared runtimes engine, we may want to evolve this even further in the future or even abstract some or possibly all queries completely from consumer groups in the far far future. I mean we're even about to break it again when we get around to consolidating down to a single runtime

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great. In that case I'd like to at least as a sketchy upgrade path when we change the internal query id generation rules, e.g. which code classes need to be updated, are there any other code repo outside ksql that needs to be updated, and if yes in which order should the changes be pushed etc.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might need to check the query ID I think there might be a hyphen instead of an underscore for the new version

Oh I just copied this over from the StorageUtilizationMetrics -- so that probably means we need to fix that or else those will be broken again?

Walker Carlson (@wcarlson5) btw in case you see that I reverted this and are wondering about it, the existing regex with the underscore is still correct -- this regex is actually used to parse the queryId from the thread-id when it's not a shared runtime, for shared runtimes we use a separate regex for parsing the taskId

newMetric = registeredMetrics.get(queryId).get(topic).get(metricName);
newMetric.add(metricName, metric);
} else {
newMetric = registeredMetrics.get(queryId).get(topic).get(metricName);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo copy-paste? Should this be newMetric = new ThroughputTotalMetric...?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming it is indeed a typo, then I'm not sure I understand this logic here: it seems the goal is to use a single ThroughputTotalMetric per-query per-topic, with differentiated bytes and records, to aggregate all metrics across different task-id / processor-node-id / thread-id, right? But from the code it seems not achieving this goal since each metricName (with the whole tag map) would be getting its own ThroughputTotalMetric and hence the inner map of the ThroughputTotalMetric would always contain a single element.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus, I'm wondering if it's a bit over-complicated to achieve this logic plus being able to remove the ThroughputTotalMetric metric when all of its associated finer grained metrics are gone?

Is it efficient to just maintain a private static final Map<String, Map<String, Map<String, ThroughputTotalMetric>>> registeredMetrics where the inner-most map's key are just bytes and records, since the ThroughputTotalMetric itself maintains the map of its associated finer-grained metrics. And then upon metricRemoval we can just check if the ThroughputTotalMetric's inner map is empty and if yes remove that metric from the registeredMetrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be newMetric = new ThroughputTotalMetric...

haha whoops -- yes that was definitely a typo, thanks.

each metricName (with the whole tag map) would be getting its own ThroughputTotalMetric and hence the inner map of the ThroughputTotalMetric would always contain a single element.

Ah ok I think something I changed at the last minute did indeed introduce a bug and is what's causing this confusion -- the innermost map in registeredMetrics was supposed to be keyed by the metricName.name(), eg bytes-consumed-total, records-produced-total, etc -- I tried to simplify things and make the code easier to understand by changing this to the plain MetricName but in fact that's not the same thing as there may be multiple metrics for a given metric-type & topic & the reason corresponding to different taskId and processorNodeId values in the tags. So yes, the way it is now would indeed result in just a single metric per ThroughputTotalMetric resulting in non-aggregated metrics. I'll change it back to the way it was, ie keyed by metricName.name(), which I think was what you were getting at with this suggestion?

just maintain a private static final Map<String, Map<String, Map<String, ThroughputTotalMetric>>> registeredMetrics where the inner-most map's key are just bytes and records

However the second part of your suggestion I'm not quite seeing, are you suggesting we don't check & clean up the outer maps when we've removed the last ThroughputMetric in the topic map, or the last topic entry in the query map? It may be a bit longwinded but I do think we need to maintain these maps or else we'll leak memory over time as queries come and go

And then upon metricRemoval we can just check if the ThroughputTotalMetric's inner map is empty and if yes remove that metric from the registeredMetrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I'm actually going to leave the innermost map key type as a MetricName rather than String: MetricName.name() just because I think it helps to add clarity around the meaning of each String/key -- I just need to fix the MetricName to remove the taskId and processorNodeId tags, then we should correctly end up adding and aggregating over each of these metrics for a given queryId-topic-metricType combination in the ThroughputTotalMetric.

Lmk if that makes sense or if you still have questions -- can also hop on a quick call tomorrow to go over this stuff if there's anything left to resolve

final String queryId,
final String topic
) {
final MetricName metricName = getMetricNameWithQueryIdTag(queryId, metric.metricName());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the finer-grained metric is only used for aggregating into the per-topic per-query ones which is associated with the queryTopicThroughputMetricTags, and we already organized them as inside the query id nested map. So do we still need to change the metric name to attach the query id?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a second pass, overall lgtm except one minor comment about metrics registry removal.

Would better to have Rohan (@rodesai) having another look.

@ableegoldman A. Sophie Blee-Goldman (ableegoldman) force-pushed the add-ThroughputMetricsReporter branch 2 times, most recently from 86fecf9 to be45c24 Compare June 13, 2022 09:48
@guozhangwang
Copy link
Contributor

Made a final pass, LGTM.

@ableegoldman
Copy link
Contributor Author

Test failures are unrelated, and have already been fixed upstream -- they'll stop failing once the AK sync goes through (I opened a PR for that here)

@ableegoldman A. Sophie Blee-Goldman (ableegoldman) merged commit b832626 into confluentinc:master Jun 14, 2022
return Objects.equals(last.accumulateAndGet(
now,
(l, n) -> n.isAfter(l.plusSeconds(intervalSeconds)) ? n : l
) == now;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey Rohan (@rodesai) I just threw this fix into my PR when I saw the compiler warning, but now we're suddenly seeing failures in RocksDBMetricsCollectorTest.shouldNotUpdateIfWithinInterval due to

org.mockito.exceptions.verification.TooManyActualInvocations: 

kafkaMetric.metricValue();
Wanted 1 time:
-> at org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:54)
But was 3 times:
-> at io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollector$AggregatedMetric.update(RocksDBMetricsCollector.java:229)
-> at io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollector$AggregatedMetric.update(RocksDBMetricsCollector.java:229)
-> at io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollector$AggregatedMetric.update(RocksDBMetricsCollector.java:229)
	at io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollectorTest.shouldNotUpdateIfWithinInterval(RocksDBMetricsCollectorTest.java:256)

The thing is, I can't seem to reproduce it locally after thousand+ runs, and it also didn't fail on the build for my PR. Although after looking at the implementation of accumulateAndGet I see that it was actually fine before, I still don't understand why this change could be incorrect, much less nondeterministically so. Any thoughts on what's going on here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants