-
Notifications
You must be signed in to change notification settings - Fork 594
Fixing bug in WebSink metrics collector #1714
Conversation
@@ -84,6 +84,9 @@ | |||
private boolean isFlatMetrics = true; | |||
private boolean includeTopologyName = true; | |||
private String topologyName; | |||
private long cacheMaxSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments - will be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small scratch. The rest looks good to me.
if (sourceObj instanceof Map) { | ||
@SuppressWarnings("unchecked") | ||
Map<String, Double> castObj = (Map<String, Double>) sourceObj; | ||
sourceCache = castObj; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge L202 and L203 into one line:
sourceCache = (Map<String, Double>) sourceObj;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly we cannot. Apparently you can not annotate a definition if the variable is not also assigned.
e.g.
@SuppressWarnings("unchecked")
Map<String, Double> sourceCache;
Object sourceObj = metricsCache.getIfPresent(source);
if (sourceObj instanceof Map) {
sourceCache = (Map<String, Double>) sourceObj;
...
breaks with warning: [unchecked] unchecked cast
and
@SuppressWarnings("unchecked")
sourceCache = (Map<String, Double>) sourceObj;
is not legal syntax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
.expireAfterWrite(cacheTtlSec, TimeUnit.SECONDS) | ||
.<String, Double>build().asMap(); | ||
} | ||
sourceCache.putAll(processMetrics("", record.getMetrics())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this override the old metric of the same name in the sourceCache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does. Its the standard .putAll
method from the Map interface.
public void putAll(Map<? extends K,? extends V> m)
Copies all of the mappings from the specified map to this map. These mappings will replace any mappings that this map had for any of the keys currently in the specified map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it ignores the metric type and caches only the latest one metric
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean with ignoring the metric type.
Lets say we have two records record_1
and record_2
with this yaml representation:
record_1:
source: source_1
metrics:
metric_1: 1
metric_2: nop
metric_3: 3
record_2:
source: source_1
metrics:
metric_1: 2
metric_4: 4
And they are getting send to the WebSink in order.
The WebSink will publish these metrics on a REST endpoint returning json.
After "record_1" is processed the endpoint will return
"source_1": {
"metric_1": 1.0
"metric_3": 3.0
}
As you can see only numeric metrics are published, "metric_2" is omitted.
After "record_2" is processed the endpoint will return:
"source_1": {
"metric_1": 2.0,
"metric_3": 3.0,
"metric_4": 5.0,
}
"metric_1" was updated, "metric_3" was not changed, "metric_4" was added by the 2nd record.
The old behavior, before this change, would have returned:
"source_1": {
"metric_1": 2.0,
"metric_4": 5.0,
}
The point of this metrics sink is, that an external process queries periodically (once a min) the
REST endpoint to collect the metrics. If the two records would get processed right after one another,
there would be the chance that the metrics collector would never see "metric_3".
This unit test is verifying the behavior described:
https://github.com/tc-dc/heron/blob/ed0e25cb71dd882c159ef4bbcf1fe634a891667c/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/sink/WebSinkTest.java#L165
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see. thank you
looks good to me. 👍 |
note to team: this PR is merged into master, but we will also incorporate it in 0.14.6. I'll do that. |
This reverts commit f4e6b9c. This patch will however be included into 0.14.6 later.
I found a bug in the WebSink metrics collector, that I contributed a couple month ago.
If the metrics are getting exported in the hierarchical format, a new
MetricsRecord
would override and oldMetricsRecord
and theMetricsInfo
s associated to it. Now a newMetricsRecord
will merge theMetricsInfo
s of an oldMetricsRecord
. A cache with expiration is used.