Skip to content

Commit

Permalink
Hot reload for preprocessor rules (#398)
Browse files Browse the repository at this point in the history
  • Loading branch information
basilisk487 authored and sushantdewan123 committed Jul 17, 2019
1 parent 4679749 commit bfdc755
Show file tree
Hide file tree
Showing 29 changed files with 784 additions and 525 deletions.
25 changes: 14 additions & 11 deletions proxy/src/main/java/com/wavefront/agent/AbstractAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.wavefront.agent.config.LogsIngestionConfig;
import com.wavefront.agent.config.ReportableConfig;
import com.wavefront.agent.logsharvesting.InteractiveLogsTester;
import com.wavefront.agent.preprocessor.AgentPreprocessorConfiguration;
import com.wavefront.agent.preprocessor.PreprocessorConfigManager;
import com.wavefront.agent.preprocessor.PointLineBlacklistRegexFilter;
import com.wavefront.agent.preprocessor.PointLineWhitelistRegexFilter;
import com.wavefront.agent.preprocessor.PreprocessorRuleMetrics;
Expand Down Expand Up @@ -64,7 +64,6 @@
import org.jboss.resteasy.spi.ResteasyProviderFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
Expand Down Expand Up @@ -710,7 +709,7 @@ public abstract class AbstractAgent {
protected final List<PostPushDataTimedTask> managedTasks = new ArrayList<>();
protected final List<ExecutorService> managedExecutors = new ArrayList<>();
protected final List<Runnable> shutdownTasks = new ArrayList<>();
protected final AgentPreprocessorConfiguration preprocessors = new AgentPreprocessorConfiguration();
protected PreprocessorConfigManager preprocessors = new PreprocessorConfigManager();
protected ValidationConfiguration validationConfiguration = null;
protected RecyclableRateLimiter pushRateLimiter = null;
protected TokenAuthenticator tokenAuthenticator = TokenAuthenticatorBuilder.create().
Expand Down Expand Up @@ -835,18 +834,28 @@ private void addPreprocessorFilters(String commaDelimitedPorts, String whitelist
Metrics.newCounter(new TaggedMetricName("validationRegex", "points-checked", "port", strPort))
);
if (blacklist != null) {
preprocessors.forPort(strPort).forPointLine().addFilter(
preprocessors.getSystemPreprocessor(strPort).forPointLine().addFilter(
new PointLineBlacklistRegexFilter(blacklistRegex, ruleMetrics));
}
if (whitelist != null) {
preprocessors.forPort(strPort).forPointLine().addFilter(
preprocessors.getSystemPreprocessor(strPort).forPointLine().addFilter(
new PointLineWhitelistRegexFilter(whitelist, ruleMetrics));
}
}
}
}

private void initPreprocessors() throws IOException {
try {
preprocessors = new PreprocessorConfigManager(preprocessorConfigFile);
} catch (FileNotFoundException ex) {
throw new RuntimeException("Unable to load preprocessor rules - file does not exist: " +
preprocessorConfigFile);
}
if (preprocessorConfigFile != null) {
logger.info("Preprocessor configuration loaded from " + preprocessorConfigFile);
}

// convert blacklist/whitelist fields to filters for full backwards compatibility
// blacklistRegex and whitelistRegex are applied to pushListenerPorts, graphitePorts and picklePorts
String allPorts = StringUtils.join(new String[]{
Expand All @@ -859,12 +868,6 @@ private void initPreprocessors() throws IOException {

// opentsdbBlacklistRegex and opentsdbWhitelistRegex are applied to opentsdbPorts only
addPreprocessorFilters(opentsdbPorts, opentsdbWhitelistRegex, opentsdbBlacklistRegex);

if (preprocessorConfigFile != null) {
FileInputStream stream = new FileInputStream(preprocessorConfigFile);
preprocessors.loadFromStream(stream);
logger.info("Preprocessor configuration loaded from " + preprocessorConfigFile);
}
}

// Returns null on any exception, and logs the exception.
Expand Down
38 changes: 20 additions & 18 deletions proxy/src/main/java/com/wavefront/agent/PushAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected void startListeners() {
graphiteFieldsToRemove);
Iterable<String> ports = Splitter.on(",").omitEmptyStrings().trimResults().split(graphitePorts);
for (String strPort : ports) {
preprocessors.forPort(strPort).forPointLine().addTransformer(0, graphiteFormatter);
preprocessors.getSystemPreprocessor(strPort).forPointLine().addTransformer(0, graphiteFormatter);
startGraphiteListener(strPort, handlerFactory, null);
logger.info("listening on port: " + strPort + " for graphite metrics");
}
Expand Down Expand Up @@ -392,8 +392,8 @@ protected void startJsonListener(String strPort, ReportableEntityHandlerFactory
final int port = Integer.parseInt(strPort);
registerTimestampFilter(strPort);

ChannelHandler channelHandler = new JsonMetricsPortUnificationHandler(strPort, tokenAuthenticator,
handlerFactory, prefix, hostname, preprocessors.forPort(strPort));
ChannelHandler channelHandler = new JsonMetricsPortUnificationHandler(strPort,
tokenAuthenticator, handlerFactory, prefix, hostname, preprocessors.get(strPort));

startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, pushListenerMaxReceivedLength,
pushListenerHttpBufferSize, listenerIdleConnectionTimeout), port).withChildChannelOptions(childChannelOptions),
Expand All @@ -407,7 +407,7 @@ protected void startWriteHttpJsonListener(String strPort, ReportableEntityHandle
registerTimestampFilter(strPort);

ChannelHandler channelHandler = new WriteHttpJsonPortUnificationHandler(strPort, tokenAuthenticator,
handlerFactory, hostname, preprocessors.forPort(strPort));
handlerFactory, hostname, preprocessors.get(strPort));

startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, pushListenerMaxReceivedLength,
pushListenerHttpBufferSize, listenerIdleConnectionTimeout), port).withChildChannelOptions(childChannelOptions),
Expand All @@ -424,7 +424,7 @@ protected void startOpenTsdbListener(final String strPort, ReportableEntityHandl
new OpenTSDBDecoder("unknown", customSourceTags));

ChannelHandler channelHandler = new OpenTSDBPortUnificationHandler(strPort, tokenAuthenticator, openTSDBDecoder,
handlerFactory, preprocessors.forPort(strPort), hostnameResolver);
handlerFactory, preprocessors.get(strPort), hostnameResolver);

startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, pushListenerMaxReceivedLength,
pushListenerHttpBufferSize, listenerIdleConnectionTimeout), port).withChildChannelOptions(childChannelOptions),
Expand All @@ -444,7 +444,7 @@ protected void startDataDogListener(final String strPort, ReportableEntityHandle

ChannelHandler channelHandler = new DataDogPortUnificationHandler(strPort, handlerFactory,
dataDogProcessSystemMetrics, dataDogProcessServiceChecks, httpClient, dataDogRequestRelayTarget,
preprocessors.forPort(strPort));
preprocessors.get(strPort));

startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, pushListenerMaxReceivedLength,
pushListenerHttpBufferSize, listenerIdleConnectionTimeout), port).withChildChannelOptions(childChannelOptions),
Expand All @@ -464,7 +464,7 @@ protected void startPickleListener(String strPort, PointHandler pointHandler, Gr
// Set up a custom handler
ChannelHandler channelHandler = new ChannelByteArrayHandler(
new PickleProtocolDecoder("unknown", customSourceTags, formatter.getMetricMangler(), port),
pointHandler, preprocessors.forPort(strPort));
pointHandler, preprocessors.get(strPort));

// create a class to use for StreamIngester to get a new FrameDecoder
// for each request (not shareable since it's storing how many bytes
Expand Down Expand Up @@ -495,7 +495,7 @@ protected void startTraceListener(final String strPort, ReportableEntityHandlerF
registerTimestampFilter(strPort);

ChannelHandler channelHandler = new TracePortUnificationHandler(strPort, tokenAuthenticator,
new SpanDecoder("unknown"), new SpanLogsDecoder(), preprocessors.forPort(strPort), handlerFactory, sampler,
new SpanDecoder("unknown"), new SpanLogsDecoder(), preprocessors.get(strPort), handlerFactory, sampler,
traceAlwaysSampleErrors);

startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, traceListenerMaxReceivedLength,
Expand All @@ -522,7 +522,7 @@ protected void startTraceJaegerListener(
server.
makeSubChannel("jaeger-collector", Connection.Direction.IN).
register("Collector::submitBatches", new JaegerThriftCollectorHandler(strPort, handlerFactory,
wfSender, traceDisabled, preprocessors.forPort(strPort), sampler,
wfSender, traceDisabled, preprocessors.get(strPort), sampler,
traceAlwaysSampleErrors, traceJaegerApplicationName, traceDerivedCustomTagKeys));
server.listen().channel().closeFuture().sync();
server.shutdown(false);
Expand All @@ -544,8 +544,8 @@ protected void startTraceZipkinListener(
Sampler sampler) {
final int port = Integer.parseInt(strPort);
ChannelHandler channelHandler = new ZipkinPortUnificationHandler(strPort, handlerFactory, wfSender, traceDisabled,
preprocessors.forPort(strPort), sampler, traceAlwaysSampleErrors,
traceZipkinApplicationName, traceDerivedCustomTagKeys);
preprocessors.get(strPort), sampler, traceAlwaysSampleErrors, traceZipkinApplicationName,
traceDerivedCustomTagKeys);
startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, traceListenerMaxReceivedLength,
traceListenerHttpBufferSize, listenerIdleConnectionTimeout), port).withChildChannelOptions(childChannelOptions),
"listener-zipkin-trace-" + port);
Expand All @@ -567,7 +567,7 @@ ReportableEntityType.POINT, getDecoderInstance(),
ReportableEntityType.SOURCE_TAG, new ReportSourceTagDecoder(),
ReportableEntityType.HISTOGRAM, new ReportPointDecoderWrapper(new HistogramDecoder("unknown")));
WavefrontPortUnificationHandler wavefrontPortUnificationHandler = new WavefrontPortUnificationHandler(strPort,
tokenAuthenticator, decoders, handlerFactory, hostAnnotator, preprocessors.forPort(strPort));
tokenAuthenticator, decoders, handlerFactory, hostAnnotator, preprocessors.get(strPort));
startAsManagedThread(new TcpIngester(createInitializer(wavefrontPortUnificationHandler, strPort,
pushListenerMaxReceivedLength, pushListenerHttpBufferSize, listenerIdleConnectionTimeout), port).
withChildChannelOptions(childChannelOptions), "listener-graphite-" + port);
Expand All @@ -584,7 +584,7 @@ protected void startRelayListener(String strPort, ReportableEntityHandlerFactory
ReportableEntityType.POINT, getDecoderInstance(),
ReportableEntityType.HISTOGRAM, new ReportPointDecoderWrapper(new HistogramDecoder("unknown")));
ChannelHandler channelHandler = new RelayPortUnificationHandler(strPort, tokenAuthenticator, decoders,
handlerFactory, preprocessors.forPort(strPort));
handlerFactory, preprocessors.get(strPort));
startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, pushListenerMaxReceivedLength,
pushListenerHttpBufferSize, listenerIdleConnectionTimeout), port).
withChildChannelOptions(childChannelOptions), "listener-relay-" + port);
Expand Down Expand Up @@ -623,7 +623,7 @@ protected void startRawLogsIngestionListener(int port, LogsIngester logsIngester
String strPort = String.valueOf(port);

ChannelHandler channelHandler = new RawLogsIngesterPortUnificationHandler(strPort, logsIngester, hostnameResolver,
tokenAuthenticator, preprocessors.forPort(strPort));
tokenAuthenticator, preprocessors.get(strPort));

startAsManagedThread(new TcpIngester(createInitializer(channelHandler, strPort, rawLogsMaxReceivedLength,
rawLogsHttpBufferSize, listenerIdleConnectionTimeout), port).withChildChannelOptions(childChannelOptions),
Expand Down Expand Up @@ -772,8 +772,9 @@ private void startHistogramListener(
"listener-plaintext-histogram-" + port);
}

private static ChannelInitializer createInitializer(ChannelHandler channelHandler, String strPort, int messageMaxLength,
int httpRequestBufferSize, int idleTimeout) {
private static ChannelInitializer createInitializer(ChannelHandler channelHandler, String strPort,
int messageMaxLength, int httpRequestBufferSize,
int idleTimeout) {
ChannelHandler idleStateEventHandler = new IdleStateEventHandler(
Metrics.newCounter(new TaggedMetricName("listeners", "connections.idle.closed", "port", strPort)));
ChannelHandler connectionTracker = new ConnectionTrackingHandler(
Expand All @@ -793,13 +794,14 @@ public void initChannel(SocketChannel ch) {
}

private void registerTimestampFilter(String strPort) {
preprocessors.forPort(strPort).forReportPoint().addFilter(
preprocessors.getSystemPreprocessor(strPort).forReportPoint().addFilter(
new ReportPointTimestampInRangeFilter(dataBackfillCutoffHours, dataPrefillCutoffHours));
}

private void registerPrefixFilter(String strPort) {
if (prefix != null && !prefix.isEmpty()) {
preprocessors.forPort(strPort).forReportPoint().addTransformer(new ReportPointAddPrefixTransformer(prefix));
preprocessors.getSystemPreprocessor(strPort).forReportPoint().
addTransformer(new ReportPointAddPrefixTransformer(prefix));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -36,18 +37,18 @@ public class ChannelByteArrayHandler extends SimpleChannelInboundHandler<byte[]>
private final PointHandler pointHandler;

@Nullable
private final ReportableEntityPreprocessor preprocessor;
private final Supplier<ReportableEntityPreprocessor> preprocessorSupplier;
private final GraphiteDecoder recoder;

/**
* Constructor.
*/
public ChannelByteArrayHandler(Decoder<byte[]> decoder,
final PointHandler pointHandler,
@Nullable final ReportableEntityPreprocessor preprocessor) {
final PointHandler pointHandler,
@Nullable final Supplier<ReportableEntityPreprocessor> preprocessor) {
this.decoder = decoder;
this.pointHandler = pointHandler;
this.preprocessor = preprocessor;
this.preprocessorSupplier = preprocessor;
this.recoder = new GraphiteDecoder(Collections.emptyList());
}

Expand All @@ -58,18 +59,20 @@ protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Except
return;
}

ReportableEntityPreprocessor preprocessor = preprocessorSupplier == null ? null : preprocessorSupplier.get();

List<ReportPoint> points = Lists.newArrayListWithExpectedSize(1);
try {
decoder.decodeReportPoints(msg, points, "dummy");
for (ReportPoint point: points) {
if (preprocessor != null && preprocessor.forPointLine().hasTransformers()) {
if (preprocessor != null && !preprocessor.forPointLine().getTransformers().isEmpty()) {
String pointLine = PointHandlerImpl.pointToString(point);
pointLine = preprocessor.forPointLine().transform(pointLine);
List<ReportPoint> parsedPoints = Lists.newArrayListWithExpectedSize(1);
recoder.decodeReportPoints(pointLine, parsedPoints, "dummy");
parsedPoints.forEach(this::preprocessAndReportPoint);
parsedPoints.forEach(x -> preprocessAndReportPoint(x, preprocessor));
} else {
preprocessAndReportPoint(point);
preprocessAndReportPoint(point, preprocessor);
}
}
} catch (final Exception e) {
Expand All @@ -88,29 +91,30 @@ protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Except
}
}

private void preprocessAndReportPoint(ReportPoint point) {
private void preprocessAndReportPoint(ReportPoint point, ReportableEntityPreprocessor preprocessor) {
String[] messageHolder = new String[1];
if (preprocessor == null) {
pointHandler.reportPoint(point, point.getMetric());
return;
}
// backwards compatibility: apply "pointLine" rules to metric name
if (!preprocessor.forPointLine().filter(point.getMetric())) {
if (preprocessor.forPointLine().getLastFilterResult() != null) {
if (!preprocessor.forPointLine().filter(point.getMetric(), messageHolder)) {
if (messageHolder[0] != null) {
blockedPointsLogger.warning(PointHandlerImpl.pointToString(point));
} else {
blockedPointsLogger.info(PointHandlerImpl.pointToString(point));
}
pointHandler.handleBlockedPoint(preprocessor.forPointLine().getLastFilterResult());
pointHandler.handleBlockedPoint(messageHolder[0]);
return;
}
preprocessor.forReportPoint().transform(point);
if (!preprocessor.forReportPoint().filter(point)) {
if (preprocessor.forReportPoint().getLastFilterResult() != null) {
if (!preprocessor.forReportPoint().filter(point, messageHolder)) {
if (messageHolder[0] != null) {
blockedPointsLogger.warning(PointHandlerImpl.pointToString(point));
} else {
blockedPointsLogger.info(PointHandlerImpl.pointToString(point));
}
pointHandler.handleBlockedPoint(preprocessor.forReportPoint().getLastFilterResult());
pointHandler.handleBlockedPoint(messageHolder[0]);
return;
}
pointHandler.reportPoint(point, point.getMetric());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,18 @@ public static void processPointLine(final String message,
String pointLine = message.trim();
if (pointLine.isEmpty()) return;

String[] messageHolder = new String[1];
// transform the line if needed
if (preprocessor != null) {
pointLine = preprocessor.forPointLine().transform(pointLine);
// apply white/black lists after formatting
if (!preprocessor.forPointLine().filter(pointLine)) {
if (preprocessor.forPointLine().getLastFilterResult() != null) {
if (!preprocessor.forPointLine().filter(pointLine, messageHolder)) {
if (messageHolder[0] != null) {
blockedPointsLogger.warning(pointLine);
} else {
blockedPointsLogger.info(pointLine);
}
pointHandler.handleBlockedPoint(preprocessor.forPointLine().getLastFilterResult());
pointHandler.handleBlockedPoint(messageHolder[0]);
return;
}
}
Expand Down Expand Up @@ -162,13 +163,13 @@ public static void processPointLine(final String message,
if (preprocessor != null) {
for (ReportPoint point : points) {
preprocessor.forReportPoint().transform(point);
if (!preprocessor.forReportPoint().filter(point)) {
if (preprocessor.forReportPoint().getLastFilterResult() != null) {
if (!preprocessor.forReportPoint().filter(point, messageHolder)) {
if (messageHolder[0] != null) {
blockedPointsLogger.warning(PointHandlerImpl.pointToString(point));
} else {
blockedPointsLogger.info(PointHandlerImpl.pointToString(point));
}
pointHandler.handleBlockedPoint(preprocessor.forReportPoint().getLastFilterResult());
pointHandler.handleBlockedPoint(messageHolder[0]);
return;
}
}
Expand Down
Loading

0 comments on commit bfdc755

Please sign in to comment.