Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Commit

Permalink
Improve tracing and logging for write errors.
Browse files Browse the repository at this point in the history
This adds error flags and exception messages to the spans for
metadata, suggest, and bigtable writes when the chain of futures
fails. The same exceptions are also logged. Previously some
exceptions, such as grpc errors, could be masked by sl4j settings.

The trace for a metric write was cleaned up to remove several
intermediary spans. These spans did not have useful information and
had no branching paths, so they were just clutter in the overall
trace.

Closes #724.
  • Loading branch information
hexedpackets committed Jan 8, 2021
1 parent 037a871 commit b2cb3e5
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ public class PubSubConsumer implements Consumer, LifeCycles {

@Inject
PubSubConsumer(
AsyncFramework async, Managed<Connection> connection,
@Named("consuming") AtomicInteger consuming, @Named("total") AtomicInteger total,
@Named("errors") AtomicLong errors, @Named("consumed") LongAdder consumed
AsyncFramework async,
Managed<Connection> connection,
@Named("consuming") AtomicInteger consuming,
@Named("total") AtomicInteger total,
@Named("errors") AtomicLong errors,
@Named("consumed") LongAdder consumed
) {
this.async = async;
this.connection = connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package com.spotify.heroic.consumer.pubsub;

import static io.opencensus.trace.AttributeValue.longAttributeValue;
import static io.opencensus.trace.AttributeValue.stringAttributeValue;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer replyConsumer

Span span = tracer.spanBuilder("PubSub.receiveMessage").startSpan();
span.putAttribute("id", stringAttributeValue(messageId));
span.putAttribute("message.size", longAttributeValue(bytes.length));

final FutureReporter.Context consumptionContext = reporter.reportConsumption();

Expand All @@ -79,24 +81,23 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer replyConsumer
consumer.consume(bytes).onDone(consumptionContext).onFinished(() -> {
reporter.reportMessageSize(bytes.length);
replyConsumer.ack();
span.end();
});
} catch (ConsumerSchemaValidationException e) {
reporter.reportConsumerSchemaError();
log.error("ID:{} - {}", messageId, e.getMessage(), e);
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString()));

// The message will never be processable, ack it to make it go away
replyConsumer.ack();
span.end();
} catch (Exception e) {
errors.incrementAndGet();
log.error("ID:{} - Failed to consume", messageId, e);
span.setStatus(Status.INTERNAL.withDescription(e.toString()));
reporter.reportMessageError();
replyConsumer.nack();
span.end();
} finally {
consumed.increment();
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import static io.opencensus.trace.AttributeValue.stringAttributeValue;


import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
Expand Down Expand Up @@ -51,8 +50,6 @@
import com.spotify.heroic.time.Clock;
import dagger.Component;
import eu.toolchain.async.AsyncFuture;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
Expand Down Expand Up @@ -87,59 +84,59 @@ public Consumer(Clock clock, IngestionGroup ingestion, ConsumerReporter reporter
@Override
public AsyncFuture<Void> consume(final byte[] message) throws ConsumerSchemaException {
final JsonNode tree;
final Span span = tracer.spanBuilder("ConsumerSchema.consume").startSpan();
var scope = tracer.spanBuilder("ConsumerSchema.consume").startScopedSpan();
var span = tracer.getCurrentSpan();
span.putAttribute("schema", stringAttributeValue("Spotify100"));

try (Scope ws = tracer.withSpan(span)) {
try {
tree = mapper.readTree(message);
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString()));
span.end();
throw new ConsumerSchemaValidationException("Invalid metric", e);
}

if (tree.getNodeType() != JsonNodeType.OBJECT) {
span.setStatus(
Status.INVALID_ARGUMENT.withDescription("Metric is not an object"));
span.end();
throw new ConsumerSchemaValidationException(
"Expected object, but got: " + tree.getNodeType());
}

final ObjectNode object = (ObjectNode) tree;

final JsonNode versionNode = object.remove("version");

if (versionNode == null) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Missing version"));
span.end();
throw new ConsumerSchemaValidationException(
"Missing version in received object");
}

final Version version;

try {
version = Version.parse(versionNode.asText());
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Bad version"));
span.end();
throw new ConsumerSchemaValidationException("Bad version: " + versionNode);
}

int major = version.getMajor();

if (major == 1) {
return handleVersion1(tree).onFinished(span::end);
} else if (major == 2) {
return handleVersion2(tree).onFinished(span::end);
}

span.setStatus(Status.INVALID_ARGUMENT.withDescription("Unsupported version"));
span.end();
throw new ConsumerSchemaValidationException("Unsupported version: " + version);
try {
tree = mapper.readTree(message);
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.toString()));
scope.close();
throw new ConsumerSchemaValidationException("Invalid metric", e);
}

if (tree.getNodeType() != JsonNodeType.OBJECT) {
span.setStatus(
Status.INVALID_ARGUMENT.withDescription("Metric is not an object"));
scope.close();
throw new ConsumerSchemaValidationException(
"Expected object, but got: " + tree.getNodeType());
}

final ObjectNode object = (ObjectNode) tree;

final JsonNode versionNode = object.remove("version");

if (versionNode == null) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Missing version"));
scope.close();
throw new ConsumerSchemaValidationException(
"Missing version in received object");
}

final Version version;

try {
version = Version.parse(versionNode.asText());
} catch (final Exception e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Bad version"));
scope.close();
throw new ConsumerSchemaValidationException("Bad version: " + versionNode);
}

span.putAttribute("metric.version", stringAttributeValue(version.toString()));
int major = version.getMajor();

if (major == 1) {
return handleVersion1(tree).onFinished(scope::close);
} else if (major == 2) {
return handleVersion2(tree).onFinished(scope::close);
}

span.setStatus(Status.INVALID_ARGUMENT.withDescription("Unsupported version"));
scope.close();
throw new ConsumerSchemaValidationException("Unsupported version: " + version);
}

private AsyncFuture<Void> handleVersion1(final JsonNode tree)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

package com.spotify.heroic.consumer.schemas;

import static io.opencensus.trace.AttributeValue.stringAttributeValue;

import com.google.common.collect.ImmutableList;
import com.spotify.heroic.common.Series;
import com.spotify.heroic.consumer.ConsumerSchema;
Expand All @@ -41,6 +43,10 @@
import dagger.Component;
import eu.toolchain.async.AsyncFramework;
import eu.toolchain.async.AsyncFuture;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -63,6 +69,7 @@ public static class Consumer implements ConsumerSchema.Consumer {
private final IngestionGroup ingestion;
private final ConsumerReporter reporter;
private final AsyncFramework async;
private static final Tracer tracer = Tracing.getTracer();

@Inject
public Consumer(
Expand All @@ -79,53 +86,64 @@ public Consumer(

@Override
public AsyncFuture<Void> consume(final byte[] message) throws ConsumerSchemaException {
var scope = tracer.spanBuilder("ConsumerSchema.consume").startScopedSpan();
var span = tracer.getCurrentSpan();
span.putAttribute("schema", stringAttributeValue("Spotify100Proto"));

final List<Spotify100.Metric> metrics;
try {
metrics = Spotify100.Batch.parseFrom(Snappy.uncompress(message)).getMetricList();
} catch (IOException e) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription(e.getMessage()));
scope.close();
throw new ConsumerSchemaValidationException("Invalid batch of metrics", e);
}

final List<AsyncFuture<Ingestion>> ingestions = new ArrayList<>();
for (Spotify100.Metric metric : metrics) {

if (metric.getTime() <= 0) {
span.setStatus(Status.INVALID_ARGUMENT.withDescription("Negative time set"));
scope.close();
throw new ConsumerSchemaValidationException(
"time: field must be a positive number: " + metric.toString());
}
reporter.reportMessageDrift(clock.currentTimeMillis() - metric.getTime());

final Series s = Series.of(metric.getKey(), metric.getTagsMap(),
final Series series = Series.of(metric.getKey(), metric.getTagsMap(),
metric.getResourceMap());


Spotify100.Value distributionTypeValue = metric.getDistributionTypeValue();
Point point = null;
if (!metric.hasDistributionTypeValue()) {
point = new Point(metric.getTime(), metric.getValue());
} else if (distributionTypeValue.getValueCase()
.equals(Spotify100.Value.ValueCase.DOUBLE_VALUE)) {
.equals(Spotify100.Value.ValueCase.DOUBLE_VALUE)
) {
point = new Point(metric.getTime(), distributionTypeValue.getDoubleValue());
} else if (distributionTypeValue.getValueCase()
.equals(Spotify100.Value.ValueCase.DISTRIBUTION_VALUE)) {
.equals(Spotify100.Value.ValueCase.DISTRIBUTION_VALUE)
) {
Distribution distribution = HeroicDistribution.
create(distributionTypeValue.getDistributionValue());
DistributionPoint distributionPoint =
DistributionPoint.create(distribution, metric.getTime());
final List<DistributionPoint> distributionPoints =
ImmutableList.of(distributionPoint);
ingestions
.add(ingestion.write(new Request(s,
.add(ingestion.write(new Request(series,
MetricCollection.distributionPoints(distributionPoints))));
}
if (point != null) {
List<Point> points = ImmutableList.of(point);
ingestions
.add(ingestion.write(new Request(s, MetricCollection.points(points))));
.add(ingestion.write(new Request(series, MetricCollection.points(points))));
}

}
reporter.reportMetricsIn(metrics.size());
var metricsSize = metrics.size();
reporter.reportMetricsIn(metricsSize);
span.putAttribute("metrics", AttributeValue.longAttributeValue(metricsSize));
// Return Void future, to not leak unnecessary information from the backend but just
// allow monitoring of when the consumption is done.
return async.collectAndDiscard(ingestions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import java.util.function.Supplier;

public class CoreIngestionGroup implements IngestionGroup {
private final Tracer tracer = Tracing.getTracer();
private static final Tracer tracer = Tracing.getTracer();

private final AsyncFramework async;
private final Supplier<Filter> filter;
Expand Down Expand Up @@ -127,16 +127,15 @@ protected AsyncFuture<Ingestion> syncWrite(final Request request) {
reporter.incrementConcurrentWrites();

try (Scope ws = tracer.withSpan(span)) {
return doWrite(request).onFinished(() -> {
return doWrite(request, span).onFinished(() -> {
writePermits.release();
reporter.decrementConcurrentWrites();
span.end();
});
}
}

protected AsyncFuture<Ingestion> doWrite(final Request request) {
final Span span = tracer.spanBuilder("CoreIngestionGroup.doWrite").startSpan();
protected AsyncFuture<Ingestion> doWrite(final Request request, final Span span) {
final List<AsyncFuture<Ingestion>> futures = new ArrayList<>();

final Supplier<DateRange> range = rangeSupplier(request);
Expand All @@ -146,7 +145,7 @@ protected AsyncFuture<Ingestion> doWrite(final Request request) {
.ifPresent(futures::add);
suggest.map(s -> doSuggestWrite(s, request, range.get(), span)).ifPresent(futures::add);

return async.collect(futures, Ingestion.reduce()).onFinished(span::end);
return async.collect(futures, Ingestion.reduce());
}

protected AsyncFuture<Ingestion> doMetricWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
import com.spotify.heroic.tracing.EndSpanFutureReporter;
import eu.toolchain.async.AsyncFramework;
import eu.toolchain.async.AsyncFuture;
import eu.toolchain.async.FutureDone;
import eu.toolchain.async.LazyTransform;
import eu.toolchain.async.StreamCollector;
import eu.toolchain.async.FutureDone;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import java.util.ArrayList;
Expand All @@ -71,12 +71,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
Expand Down Expand Up @@ -472,7 +472,7 @@ public AsyncFuture<FetchData.Result> fetch(

@Override
public AsyncFuture<WriteMetric> write(final WriteMetric.Request request) {
return write(request, io.opencensus.trace.Tracing.getTracer().getCurrentSpan());
return write(request);
}

@Override
Expand Down
Loading

0 comments on commit b2cb3e5

Please sign in to comment.