Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom tracing port #479

Merged
merged 7 commits into from
Jan 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions proxy/src/main/java/com/wavefront/agent/ProxyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,12 @@ public class ProxyConfig extends Configuration {
" Defaults: none")
String deltaCountersAggregationListenerPorts = "";

@Parameter(names = {"--customTracingListenerPorts"},
description = "Comma-separated list of ports to listen on spans from level 1 SDK. Helps " +
"derive RED metrics and for the span and heartbeat for corresponding application at proxy." +
" Defaults: none")
protected String customTracingListenerPorts = "";

@Parameter()
List<String> unparsed_params;

Expand Down Expand Up @@ -1316,6 +1322,10 @@ public String getDeltaCountersAggregationListenerPorts() {
return deltaCountersAggregationListenerPorts;
}

public String getCustomTracingListenerPorts() {
return customTracingListenerPorts;
}

@JsonIgnore
public TimeProvider getTimeProvider() {
return timeProvider;
Expand Down Expand Up @@ -1393,6 +1403,9 @@ public void verifyAndInit() {
config.getLong("deltaCountersAggregationIntervalSeconds",
deltaCountersAggregationIntervalSeconds);

customTracingListenerPorts =
config.getString("customTracingListenerPorts", customTracingListenerPorts);

// Histogram: deprecated settings - fall back for backwards compatibility
if (config.isDefined("avgHistogramKeyBytes")) {
histogramMinuteAvgKeyBytes = histogramHourAvgKeyBytes = histogramDayAvgKeyBytes =
Expand Down
37 changes: 37 additions & 0 deletions proxy/src/main/java/com/wavefront/agent/PushAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.wavefront.agent.listeners.RelayPortUnificationHandler;
import com.wavefront.agent.listeners.WavefrontPortUnificationHandler;
import com.wavefront.agent.listeners.WriteHttpJsonPortUnificationHandler;
import com.wavefront.agent.listeners.tracing.CustomTracingPortUnificationHandler;
import com.wavefront.agent.listeners.tracing.JaegerPortUnificationHandler;
import com.wavefront.agent.listeners.tracing.JaegerTChannelCollectorHandler;
import com.wavefront.agent.listeners.tracing.TracePortUnificationHandler;
Expand Down Expand Up @@ -77,6 +78,7 @@
import com.wavefront.ingester.SpanDecoder;
import com.wavefront.ingester.SpanLogsDecoder;
import com.wavefront.ingester.TcpIngester;
import com.wavefront.internal.reporter.WavefrontInternalReporter;
import com.wavefront.metrics.ExpectedAgentMetric;
import com.wavefront.sdk.common.WavefrontSender;
import com.wavefront.sdk.entities.tracing.sampling.CompositeSampler;
Expand Down Expand Up @@ -332,6 +334,9 @@ protected void startListeners() {

csvToList(proxyConfig.getTraceListenerPorts()).forEach(strPort ->
startTraceListener(strPort, handlerFactory, compositeSampler));
csvToList(proxyConfig.getCustomTracingListenerPorts()).forEach(strPort ->
startCustomTracingListener(strPort, handlerFactory,
new InternalProxyWavefrontClient(handlerFactory, strPort), compositeSampler));
csvToList(proxyConfig.getTraceJaegerListenerPorts()).forEach(strPort -> {
PreprocessorRuleMetrics ruleMetrics = new PreprocessorRuleMetrics(
Metrics.newCounter(new TaggedMetricName("point.spanSanitize", "count", "port", strPort)),
Expand Down Expand Up @@ -524,6 +529,38 @@ healthCheckManager, new SpanDecoder("unknown"), new SpanLogsDecoder(),
logger.info("listening on port: " + strPort + " for trace data");
}

@VisibleForTesting
protected void startCustomTracingListener(final String strPort,
ReportableEntityHandlerFactory handlerFactory,
@Nullable WavefrontSender wfSender,
Sampler sampler) {
final int port = Integer.parseInt(strPort);
registerPrefixFilter(strPort);
registerTimestampFilter(strPort);
if (proxyConfig.isHttpHealthCheckAllPorts()) healthCheckManager.enableHealthcheck(port);
WavefrontInternalReporter wfInternalReporter = null;
if (wfSender != null) {
wfInternalReporter = new WavefrontInternalReporter.Builder().
prefixedWith("tracing.derived").withSource("custom_tracing").reportMinuteDistribution().
build(wfSender);
// Start the reporter
wfInternalReporter.start(1, TimeUnit.MINUTES);
}

ChannelHandler channelHandler = new CustomTracingPortUnificationHandler(strPort, tokenAuthenticator,
healthCheckManager, new SpanDecoder("unknown"), new SpanLogsDecoder(),
preprocessors.get(strPort), handlerFactory, sampler, proxyConfig.isTraceAlwaysSampleErrors(),
() -> entityProps.get(ReportableEntityType.TRACE).isFeatureDisabled(),
() -> entityProps.get(ReportableEntityType.TRACE_SPAN_LOGS).isFeatureDisabled(),
wfSender, wfInternalReporter, proxyConfig.getTraceDerivedCustomTagKeys());

startAsManagedThread(port, new TcpIngester(createInitializer(channelHandler, port,
proxyConfig.getTraceListenerMaxReceivedLength(), proxyConfig.getTraceListenerHttpBufferSize(),
proxyConfig.getListenerIdleConnectionTimeout()),
port).withChildChannelOptions(childChannelOptions), "listener-custom-trace-" + port);
logger.info("listening on port: " + strPort + " for custom trace data");
}

protected void startTraceJaegerListener(String strPort,
ReportableEntityHandlerFactory handlerFactory,
@Nullable WavefrontSender wfSender,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package com.wavefront.agent.listeners.tracing;

import com.google.common.annotations.VisibleForTesting;

import com.fasterxml.jackson.databind.JsonNode;
import com.wavefront.agent.auth.TokenAuthenticator;
import com.wavefront.agent.channel.HealthCheckManager;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.handlers.ReportableEntityHandler;
import com.wavefront.agent.handlers.ReportableEntityHandlerFactory;
import com.wavefront.agent.preprocessor.ReportableEntityPreprocessor;
import com.wavefront.data.ReportableEntityType;
import com.wavefront.ingester.ReportableEntityDecoder;
import com.wavefront.internal.reporter.WavefrontInternalReporter;
import com.wavefront.sdk.common.WavefrontSender;
import com.wavefront.sdk.entities.tracing.sampling.Sampler;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.Nullable;

import io.netty.channel.ChannelHandler;
import wavefront.report.Annotation;
import wavefront.report.Span;
import wavefront.report.SpanLogs;

import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.ERROR_SPAN_TAG_KEY;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.reportHeartbeats;
import static com.wavefront.agent.listeners.tracing.SpanDerivedMetricsUtils.reportWavefrontGeneratedData;
import static com.wavefront.sdk.common.Constants.APPLICATION_TAG_KEY;
import static com.wavefront.sdk.common.Constants.CLUSTER_TAG_KEY;
import static com.wavefront.sdk.common.Constants.COMPONENT_TAG_KEY;
import static com.wavefront.sdk.common.Constants.NULL_TAG_VAL;
import static com.wavefront.sdk.common.Constants.SERVICE_TAG_KEY;
import static com.wavefront.sdk.common.Constants.SHARD_TAG_KEY;

/**
* Handler that process trace data sent from tier 1 SDK.
*
* @author djia@vmware.com
*/
@ChannelHandler.Sharable
public class CustomTracingPortUnificationHandler extends TracePortUnificationHandler {
private static final Logger logger = Logger.getLogger(
CustomTracingPortUnificationHandler.class.getCanonicalName());
@Nullable
private final WavefrontSender wfSender;
private final WavefrontInternalReporter wfInternalReporter;
private final ConcurrentMap<HeartbeatMetricKey, Boolean> discoveredHeartbeatMetrics;
private final Set<String> traceDerivedCustomTagKeys;

/**
* @param handle handle/port number.
* @param tokenAuthenticator {@link TokenAuthenticator} for incoming requests.
* @param healthCheckManager shared health check endpoint handler.
* @param traceDecoder trace decoders.
* @param spanLogsDecoder span logs decoders.
* @param preprocessor preprocessor.
* @param handlerFactory factory for ReportableEntityHandler objects.
* @param sampler sampler.
* @param alwaysSampleErrors always sample spans with error tag.
* @param traceDisabled supplier for backend-controlled feature flag for spans.
* @param spanLogsDisabled supplier for backend-controlled feature flag for span logs.
* @param wfSender sender to send trace to Wavefront.
* @param traceDerivedCustomTagKeys custom tags added to derived RED metrics.
*/
public CustomTracingPortUnificationHandler(
djia-vm-wf marked this conversation as resolved.
Show resolved Hide resolved
String handle, TokenAuthenticator tokenAuthenticator, HealthCheckManager healthCheckManager,
ReportableEntityDecoder<String, Span> traceDecoder,
ReportableEntityDecoder<JsonNode, SpanLogs> spanLogsDecoder,
@Nullable Supplier<ReportableEntityPreprocessor> preprocessor,
ReportableEntityHandlerFactory handlerFactory, Sampler sampler, boolean alwaysSampleErrors,
Supplier<Boolean> traceDisabled, Supplier<Boolean> spanLogsDisabled,
@Nullable WavefrontSender wfSender, @Nullable WavefrontInternalReporter wfInternalReporter,
Set<String> traceDerivedCustomTagKeys) {
this(handle, tokenAuthenticator, healthCheckManager, traceDecoder, spanLogsDecoder,
preprocessor, handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.TRACE, handle)),
handlerFactory.getHandler(HandlerKey.of(ReportableEntityType.TRACE_SPAN_LOGS, handle)),
sampler, alwaysSampleErrors, traceDisabled, spanLogsDisabled, wfSender, wfInternalReporter,
traceDerivedCustomTagKeys);
}

@VisibleForTesting
public CustomTracingPortUnificationHandler(
String handle, TokenAuthenticator tokenAuthenticator, HealthCheckManager healthCheckManager,
ReportableEntityDecoder<String, Span> traceDecoder,
ReportableEntityDecoder<JsonNode, SpanLogs> spanLogsDecoder,
@Nullable Supplier<ReportableEntityPreprocessor> preprocessor,
final ReportableEntityHandler<Span, String> handler,
final ReportableEntityHandler<SpanLogs, String> spanLogsHandler, Sampler sampler,
boolean alwaysSampleErrors, Supplier<Boolean> traceDisabled,
Supplier<Boolean> spanLogsDisabled, @Nullable WavefrontSender wfSender,
@Nullable WavefrontInternalReporter wfInternalReporter,
Set<String> traceDerivedCustomTagKeys) {
super(handle, tokenAuthenticator, healthCheckManager, traceDecoder, spanLogsDecoder,
preprocessor, handler, spanLogsHandler, sampler, alwaysSampleErrors, traceDisabled, spanLogsDisabled);
this.wfSender = wfSender;
this.wfInternalReporter = wfInternalReporter;
this.discoveredHeartbeatMetrics = new ConcurrentHashMap<>();
this.traceDerivedCustomTagKeys = traceDerivedCustomTagKeys;
}

@Override
protected void report(Span object) {
// report converted metrics/histograms from the span
String applicationName = NULL_TAG_VAL;
String serviceName = NULL_TAG_VAL;
String cluster = NULL_TAG_VAL;
String shard = NULL_TAG_VAL;
String componentTagValue = NULL_TAG_VAL;
String isError = "false";
List<Annotation> annotations = object.getAnnotations();
for (Annotation annotation : annotations) {
switch (annotation.getKey()) {
case APPLICATION_TAG_KEY:
applicationName = annotation.getValue();
continue;
case SERVICE_TAG_KEY:
serviceName = annotation.getValue();
case CLUSTER_TAG_KEY:
cluster = annotation.getValue();
continue;
case SHARD_TAG_KEY:
shard = annotation.getValue();
continue;
case COMPONENT_TAG_KEY:
componentTagValue = annotation.getValue();
break;
case ERROR_SPAN_TAG_KEY:
isError = annotation.getValue();
break;
}
}
if (applicationName.equals(NULL_TAG_VAL) || serviceName.equals(NULL_TAG_VAL)) {
logger.warning("Ingested spans discarded because span application/service name is " +
"missing.");
discardedSpans.inc();
return;
}
handler.report(object);

if (wfInternalReporter != null) {
discoveredHeartbeatMetrics.putIfAbsent(reportWavefrontGeneratedData(wfInternalReporter,
object.getName(), applicationName, serviceName, cluster, shard, object.getSource(),
componentTagValue, Boolean.parseBoolean(isError), object.getDuration(),
traceDerivedCustomTagKeys, annotations), true);
try {
reportHeartbeats("customTracing", wfSender, discoveredHeartbeatMetrics);
} catch (IOException e) {
logger.log(Level.WARNING, "Cannot report heartbeat metric to wavefront");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ public class TracePortUnificationHandler extends AbstractLineDelimitedHandler {

private static final ObjectMapper JSON_PARSER = new ObjectMapper();

private final ReportableEntityHandler<Span, String> handler;
protected final ReportableEntityHandler<Span, String> handler;
private final ReportableEntityHandler<SpanLogs, String> spanLogsHandler;
private final ReportableEntityDecoder<String, Span> decoder;
private final ReportableEntityDecoder<JsonNode, SpanLogs> spanLogsDecoder;
private final Supplier<ReportableEntityPreprocessor> preprocessorSupplier;
private final Sampler sampler;
private final boolean alwaysSampleErrors;
protected final boolean alwaysSampleErrors;
private final Supplier<Boolean> traceDisabled;
private final Supplier<Boolean> spanLogsDisabled;

private final Counter discardedSpans;
protected final Counter discardedSpans;
private final Counter discardedSpansBySampler;

public TracePortUnificationHandler(
Expand Down Expand Up @@ -175,12 +175,21 @@ protected void processLine(final ChannelHandlerContext ctx, @Nonnull String mess
boolean sampleError = alwaysSampleErrors && object.getAnnotations().stream().anyMatch(
t -> t.getKey().equals(ERROR_SPAN_TAG_KEY) && t.getValue().equals(ERROR_SPAN_TAG_VAL));
if (sampleError || sample(object)) {
handler.report(object);
report(object);
}
}
}

private boolean sample(Span object) {
/**
* Report span and derived metrics if needed.
*
* @param object span.
*/
protected void report(Span object) {
handler.report(object);
}

protected boolean sample(Span object) {
if (sampler.sample(object.getName(),
UUID.fromString(object.getTraceId()).getLeastSignificantBits(), object.getDuration())) {
return true;
Expand Down
Loading