Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

METRICS-1302: Added prefix support for resource labels. #14

Merged
merged 9 commits into from
Apr 24, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
Expand All @@ -32,12 +33,14 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -48,19 +51,31 @@ public class OpenCensusProtobufInputRowParser implements ByteBufferInputRowParse
private static final Logger LOG = new Logger(OpenCensusProtobufInputRowParser.class);

private static final String SEPARATOR = "-";
public static final String NAME = "name";
public static final String VALUE = "value";
public static final String TIMESTAMP_COLUMN = "timestamp";
private static final String DEFAULT_METRIC_DIMENSION = "name";
private static final String VALUE = "value";
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String DEFAULT_RESOURCE_PREFIX = "resource.";
private final ParseSpec parseSpec;
private final List<String> dimensions;

private final String metricDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized you didn't add any @UsonProperty annotated getters for those properties, which means they will be left out when serializing to json. This is required for nodes to communicate the parser spec to each-other

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl Created PR: #15
Added additional test cases and labels for distribution metric too.


@JsonCreator
public OpenCensusProtobufInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("metricDimension") String metricDimension,
@JsonProperty("metricLabelPrefix") String metricPrefix,
@JsonProperty("resourceLabelPrefix") String resourcePrefix
)
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.metricDimension = Strings.isNullOrEmpty(metricDimension) ? DEFAULT_METRIC_DIMENSION : metricDimension;
this.metricLabelPrefix = StringUtils.nullToEmptyNonDruidDataString(metricPrefix);
this.resourceLabelPrefix = resourcePrefix != null ? resourcePrefix : DEFAULT_RESOURCE_PREFIX;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we handling resourceLabelPrefix and metricLabelPrefix differently? They should both be treated the same way. What we do here seems perfectly fine for metricsLabelPrefix as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the discussion we had I think we agreed to have resource. as default prefix for resource labels where metric labels will default to empty.

#14 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, we could do this by explicitly having DEFAULT_METRIC_PREFIX = "" to make it explicit what the defaults are, but if you want to leave it as is that's fine too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving it as it is for now. Will change if needed for some other default prefix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine, it was mostly a suggestion to make the pattern more obvious to someone reading the code.


LOG.info("Creating Open Census Protobuf parser with spec:" + parseSpec);
}

Expand All @@ -73,7 +88,11 @@ public ParseSpec getParseSpec()
@Override
public OpenCensusProtobufInputRowParser withParseSpec(ParseSpec parseSpec)
{
return new OpenCensusProtobufInputRowParser(parseSpec);
return new OpenCensusProtobufInputRowParser(
parseSpec,
metricDimension,
metricLabelPrefix,
resourceLabelPrefix);
}

@Override
Expand All @@ -88,22 +107,30 @@ public List<InputRow> parseBatch(ByteBuffer input)
throw new ParseException(e, "Protobuf message could not be parsed");
}

// Process metric descriptor labels map keys.
List<String> descriptorLabels = metric.getMetricDescriptor().getLabelKeysList().stream()
.map(s -> this.metricLabelPrefix + s.getKey())
.collect(Collectors.toList());

// Process resource labels map.
Map<String, Object> resourceLabelsMap = metric.getResource().getLabelsMap().entrySet().stream()
.collect(Collectors.toMap(entry -> this.resourceLabelPrefix + entry.getKey(),
Map.Entry::getValue));

final List<String> dimensions;

if (!this.dimensions.isEmpty()) {
dimensions = this.dimensions;
} else {
Set<String> recordDimensions = metric.getMetricDescriptor().getLabelKeysList().stream()
.map(s -> s.getKey())
.collect(Collectors.toSet());
Set<String> recordDimensions = new HashSet<>(descriptorLabels);

// Add resource map key set to record dimensions.
recordDimensions.addAll(metric.getResource().getLabelsMap().keySet());
recordDimensions.addAll(resourceLabelsMap.keySet());

// NAME, VALUE dimensions will not be present in labelKeysList or Metric.Resource map as they
// are derived dimensions, which get populated while parsing data for timeSeries hence add
// them to recordDimensions.
recordDimensions.add(NAME);
// MetricDimension, VALUE dimensions will not be present in labelKeysList or Metric.Resource
// map as they are derived dimensions, which get populated while parsing data for timeSeries
// hence add them to recordDimensions.
recordDimensions.add(metricDimension);
recordDimensions.add(VALUE);

dimensions = Lists.newArrayList(
Expand All @@ -116,11 +143,11 @@ public List<InputRow> parseBatch(ByteBuffer input)
for (TimeSeries ts : metric.getTimeseriesList()) {

// Add common resourceLabels.
Map<String, Object> labels = new HashMap<>(metric.getResource().getLabelsMap());
Map<String, Object> labels = new HashMap<>(resourceLabelsMap);

// Add labels to record.
for (int i = 0; i < metric.getMetricDescriptor().getLabelKeysCount(); i++) {
labels.put(metric.getMetricDescriptor().getLabelKeys(i).getKey(), ts.getLabelValues(i).getValue());
labels.put(descriptorLabels.get(i), ts.getLabelValues(i).getValue());
}

// One row per timeSeries point.
Expand All @@ -132,29 +159,29 @@ public List<InputRow> parseBatch(ByteBuffer input)
case DOUBLE_VALUE:
Map<String, Object> doubleGauge = new HashMap<>();
doubleGauge.putAll(labels);
doubleGauge.put(NAME, metric.getMetricDescriptor().getName());
doubleGauge.put(metricDimension, metric.getMetricDescriptor().getName());
doubleGauge.put(VALUE, point.getDoubleValue());
addDerivedMetricsRow(doubleGauge, dimensions, rows);
break;
case INT64_VALUE:
HashMap<String, Object> intGauge = new HashMap<>();
intGauge.putAll(labels);
intGauge.put(VALUE, point.getInt64Value());
intGauge.put(NAME, metric.getMetricDescriptor().getName());
intGauge.put(metricDimension, metric.getMetricDescriptor().getName());
addDerivedMetricsRow(intGauge, dimensions, rows);
break;
case SUMMARY_VALUE:
// count
Map<String, Object> summaryCount = new HashMap<>();
summaryCount.putAll(labels);
summaryCount.put(NAME, metric.getMetricDescriptor().getName() + SEPARATOR + "count");
summaryCount.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "count");
summaryCount.put(VALUE, point.getSummaryValue().getCount().getValue());
addDerivedMetricsRow(summaryCount, dimensions, rows);

// sum
Map<String, Object> summarySum = new HashMap<>();
summarySum.putAll(labels);
summarySum.put(NAME, metric.getMetricDescriptor().getName() + SEPARATOR + "sum");
summarySum.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "sum");
summarySum.put(VALUE, point.getSummaryValue().getSnapshot().getSum().getValue());
addDerivedMetricsRow(summarySum, dimensions, rows);

Expand All @@ -163,13 +190,13 @@ public List<InputRow> parseBatch(ByteBuffer input)
case DISTRIBUTION_VALUE:
// count
Map<String, Object> distCount = new HashMap<>();
distCount.put(NAME, metric.getMetricDescriptor().getName() + SEPARATOR + "count");
distCount.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "count");
distCount.put(VALUE, point.getDistributionValue().getCount());
addDerivedMetricsRow(distCount, dimensions, rows);

// sum
Map<String, Object> distSum = new HashMap<>();
distSum.put(NAME, metric.getMetricDescriptor().getName() + SEPARATOR + "sum");
distSum.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "sum");
distSum.put(VALUE, point.getDistributionValue().getSum());
addDerivedMetricsRow(distSum, dimensions, rows);
// TODO: How to handle buckets ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testGaugeParse() throws Exception
{

//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec);
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Expand All @@ -128,7 +128,7 @@ public void testGaugeParse() throws Exception
public void testSummaryParse() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec);
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Expand Down Expand Up @@ -161,7 +161,7 @@ public void testSummaryParse() throws Exception
public void testDimensionsParseWithParseSpecDimensions() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpecWithDimensions);
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpecWithDimensions, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Expand Down Expand Up @@ -192,7 +192,7 @@ public void testDimensionsParseWithParseSpecDimensions() throws Exception
public void testDimensionsParseWithoutParseSpecDimensions() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec);
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Expand Down Expand Up @@ -221,6 +221,87 @@ public void testDimensionsParseWithoutParseSpecDimensions() throws Exception

}

@Test
public void testMetricNameOverride() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, "dimension_name", null, "");

Metric metric = summaryMetric(Timestamp.getDefaultInstance());
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));

Assert.assertEquals(2, rows.size());

InputRow row = rows.get(0);
Assert.assertEquals(4, row.getDimensions().size());
assertDimensionEquals(row, "dimension_name", "metric_summary-count");
assertDimensionEquals(row, "foo_key", "foo_value");
assertDimensionEquals(row, "env_key", "env_val");

row = rows.get(1);
Assert.assertEquals(4, row.getDimensions().size());
assertDimensionEquals(row, "dimension_name", "metric_summary-sum");
assertDimensionEquals(row, "foo_key", "foo_value");
assertDimensionEquals(row, "env_key", "env_val");
}

@Test
public void testDefaultPrefix() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, null);

Metric metric = summaryMetric(Timestamp.getDefaultInstance());
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));

Assert.assertEquals(2, rows.size());

InputRow row = rows.get(0);
Assert.assertEquals(4, row.getDimensions().size());
assertDimensionEquals(row, "name", "metric_summary-count");
assertDimensionEquals(row, "foo_key", "foo_value");
assertDimensionEquals(row, "resource.env_key", "env_val");

row = rows.get(1);
Assert.assertEquals(4, row.getDimensions().size());
assertDimensionEquals(row, "name", "metric_summary-sum");
assertDimensionEquals(row, "foo_key", "foo_value");
assertDimensionEquals(row, "resource.env_key", "env_val");
}

@Test
public void testCustomPrefix() throws Exception
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, "descriptor.", "custom.");

Metric metric = summaryMetric(Timestamp.getDefaultInstance());
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));

Assert.assertEquals(2, rows.size());

InputRow row = rows.get(0);
Assert.assertEquals(4, row.getDimensions().size());
assertDimensionEquals(row, "name", "metric_summary-count");
assertDimensionEquals(row, "descriptor.foo_key", "foo_value");
assertDimensionEquals(row, "custom.env_key", "env_val");

row = rows.get(1);
Assert.assertEquals(4, row.getDimensions().size());
assertDimensionEquals(row, "name", "metric_summary-sum");
assertDimensionEquals(row, "descriptor.foo_key", "foo_value");
assertDimensionEquals(row, "custom.env_key", "env_val");
}

private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Expand Down