Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MONIT-16056 Proxy support for events MVP #453

Merged
merged 5 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<jackson.version>2.9.10</jackson.version>
<jackson-databind.version>2.9.10.1</jackson-databind.version>
<netty.version>4.1.41.Final</netty.version>
<java-lib.version>2019-11.1</java-lib.version>
<java-lib.version>2019-12.1</java-lib.version>

<doclint>none</doclint>
</properties>
Expand Down
5 changes: 5 additions & 0 deletions proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@
<artifactId>jafama</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
<version>1.3</version>
</dependency>
</dependencies>

<build>
Expand Down
3 changes: 2 additions & 1 deletion proxy/src/main/java/com/wavefront/agent/AbstractAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,8 @@ protected boolean handleAsIdempotent(HttpRequest request) {
register(AcceptEncodingGZIPFilter.class).
register((ClientRequestFilter) context -> {
if (context.getUri().getPath().contains("/pushdata/") ||
context.getUri().getPath().contains("/report")) {
context.getUri().getPath().contains("/report") ||
context.getUri().getPath().contains("/event")) {
context.getHeaders().add("Authorization", "Bearer " + token);
}
}).
Expand Down
18 changes: 10 additions & 8 deletions proxy/src/main/java/com/wavefront/agent/PushAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.wavefront.common.NamedThreadFactory;
import com.wavefront.common.TaggedMetricName;
import com.wavefront.data.ReportableEntityType;
import com.wavefront.ingester.EventDecoder;
import com.wavefront.ingester.GraphiteDecoder;
import com.wavefront.ingester.HistogramDecoder;
import com.wavefront.ingester.OpenTSDBDecoder;
Expand Down Expand Up @@ -145,14 +146,15 @@ public class PushAgent extends AbstractAgent {
protected ReportableEntityHandlerFactory handlerFactory;
protected HealthCheckManager healthCheckManager;
protected Supplier<Map<ReportableEntityType, ReportableEntityDecoder>> decoderSupplier =
lazySupplier(() -> ImmutableMap.of(
ReportableEntityType.POINT, new ReportPointDecoderWrapper(new GraphiteDecoder("unknown",
customSourceTags)),
ReportableEntityType.SOURCE_TAG, new ReportSourceTagDecoder(),
ReportableEntityType.HISTOGRAM, new ReportPointDecoderWrapper(
new HistogramDecoder("unknown")),
ReportableEntityType.TRACE, new SpanDecoder("unknown"),
ReportableEntityType.TRACE_SPAN_LOGS, new SpanLogsDecoder()));
lazySupplier(() -> ImmutableMap.<ReportableEntityType, ReportableEntityDecoder>builder().
put(ReportableEntityType.POINT, new ReportPointDecoderWrapper(
new GraphiteDecoder("unknown", customSourceTags))).
put(ReportableEntityType.SOURCE_TAG, new ReportSourceTagDecoder()).
put(ReportableEntityType.HISTOGRAM, new ReportPointDecoderWrapper(
new HistogramDecoder("unknown"))).
put(ReportableEntityType.TRACE, new SpanDecoder("unknown")).
put(ReportableEntityType.TRACE_SPAN_LOGS, new SpanLogsDecoder()).
put(ReportableEntityType.EVENT, new EventDecoder()).build());
private Logger blockedPointsLogger;
private Logger blockedHistogramsLogger;
private Logger blockedSpansLogger;
Expand Down
166 changes: 121 additions & 45 deletions proxy/src/main/java/com/wavefront/agent/QueuedAgentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.RateLimiter;
Expand All @@ -23,6 +24,7 @@
import com.wavefront.api.agent.AgentConfiguration;
import com.wavefront.common.Clock;
import com.wavefront.common.NamedThreadFactory;
import com.wavefront.dto.Event;
import com.wavefront.metrics.ExpectedAgentMetric;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
Expand Down Expand Up @@ -92,22 +94,18 @@ public class QueuedAgentService implements ForceQueueEnabledProxyAPI {
private WavefrontV2API wrapped;
private final List<ResubmissionTaskQueue> taskQueues;
private final List<ResubmissionTaskQueue> sourceTagTaskQueues;
private final List<ResubmissionTaskQueue> eventTaskQueues;
private final List<Runnable> taskRunnables;
private final List<Runnable> sourceTagTaskRunnables;
private final List<Runnable> eventTaskRunnables;
private static AtomicInteger minSplitBatchSize = new AtomicInteger(100);
private static AtomicDouble retryBackoffBaseSeconds = new AtomicDouble(2.0);
private boolean lastKnownQueueSizeIsPositive = true;
private boolean lastKnownSourceTagQueueSizeIsPositive = true;
private boolean lastKnownSourceTagQueueSizeIsPositive = false;
private boolean lastKnownEventQueueSizeIsPositive = false;
private AtomicBoolean isRunning = new AtomicBoolean(false);
private final ScheduledExecutorService executorService;
private final String token;

/**
* A loading cache for tracking queue sizes (refreshed once a minute). Calculating the number of objects across
* all queues can be a non-trivial operation, hence the once-a-minute refresh.
*/
private final LoadingCache<ResubmissionTaskQueue, AtomicInteger> queueSizes;

private MetricsRegistry metricsRegistry = new MetricsRegistry();
private Meter resultPostingMeter = metricsRegistry.newMeter(QueuedAgentService.class, "post-result", "results",
TimeUnit.MINUTES);
Expand Down Expand Up @@ -163,46 +161,34 @@ public QueuedAgentService(WavefrontV2API service, String bufferFile, final int r
token);
this.sourceTagTaskQueues = QueuedAgentService.createResubmissionTasks(service, retryThreads,
bufferFile + "SourceTag", purge, agentId, token);
this.eventTaskQueues = QueuedAgentService.createResubmissionTasks(service, retryThreads,
bufferFile + ".events", purge, agentId, token);
this.taskRunnables = Lists.newArrayListWithExpectedSize(taskQueues.size());
this.sourceTagTaskRunnables = Lists.newArrayListWithExpectedSize(sourceTagTaskQueues.size());
this.eventTaskRunnables = Lists.newArrayListWithExpectedSize(eventTaskQueues.size());
this.executorService = executorService;
this.token = token;

queueSizes = Caffeine.newBuilder()
.refreshAfterWrite(15, TimeUnit.SECONDS)
.build(new CacheLoader<ResubmissionTaskQueue, AtomicInteger>() {
@Override
public AtomicInteger load(@Nonnull ResubmissionTaskQueue key) throws Exception {
return new AtomicInteger(key.size());
}

// reuse old object if possible
@Override
public AtomicInteger reload(@Nonnull ResubmissionTaskQueue key,
@Nonnull AtomicInteger oldValue) {
oldValue.set(key.size());
return oldValue;
}
});

int threadId = 0;
for (ResubmissionTaskQueue taskQueue : taskQueues) {
taskRunnables.add(createRunnable(executorService, splitPushWhenRateLimited, threadId, taskQueue,
pushRateLimiter));
threadId++;
taskRunnables.add(createRunnable(executorService, splitPushWhenRateLimited, threadId++,
taskQueue, pushRateLimiter));
}
threadId = 0;
for (ResubmissionTaskQueue taskQueue : sourceTagTaskQueues) {
sourceTagTaskRunnables.add(createRunnable(executorService, splitPushWhenRateLimited, threadId, taskQueue,
pushRateLimiter));
threadId++;
sourceTagTaskRunnables.add(createRunnable(executorService, splitPushWhenRateLimited,
threadId++, taskQueue, pushRateLimiter));
}
threadId = 0;
for (ResubmissionTaskQueue taskQueue : eventTaskQueues) {
eventTaskRunnables.add(createRunnable(executorService, splitPushWhenRateLimited,
threadId++, taskQueue, pushRateLimiter));
}

if (taskQueues.size() > 0) {
executorService.scheduleAtFixedRate(() -> {
try {
Supplier<Stream<Integer>> sizes = () -> taskQueues.stream()
.map(k -> Math.max(0, queueSizes.get(k).intValue()));
Supplier<Stream<Integer>> sizes = () -> taskQueues.stream().map(TaskQueue::size);
if (sizes.get().anyMatch(i -> i > 0)) {
lastKnownQueueSizeIsPositive = true;
logger.info("current retry queue sizes: [" +
Expand Down Expand Up @@ -237,6 +223,19 @@ public Long value() {
lastKnownSourceTagQueueSizeIsPositive = false;
logger.warning("source tag retry queue has been cleared");
}

// do the same thing for event queues
Supplier<Stream<Integer>> eventQueueSizes = () -> eventTaskQueues.stream().
map(TaskQueue::size);
if (eventQueueSizes.get().anyMatch(i -> i > 0)) {
lastKnownEventQueueSizeIsPositive = true;
logger.warning("current event retry queue sizes: [" +
eventQueueSizes.get().map(Object::toString).collect(Collectors.joining("/")) + "]");
} else if (lastKnownEventQueueSizeIsPositive) {
lastKnownEventQueueSizeIsPositive = false;
logger.warning("event retry queue has been cleared");
}

} catch (Exception ex) {
logger.warning("Exception " + ex);
}
Expand Down Expand Up @@ -264,6 +263,8 @@ public void start() {
(long) (Math.random() * taskRunnables.size()), TimeUnit.SECONDS));
sourceTagTaskRunnables.forEach(taskRunnable -> executorService.schedule(taskRunnable,
(long) (Math.random() * taskRunnables.size()), TimeUnit.SECONDS));
eventTaskRunnables.forEach(taskRunnable -> executorService.schedule(taskRunnable,
(long) (Math.random() * taskRunnables.size()), TimeUnit.SECONDS));
}
}

Expand Down Expand Up @@ -319,7 +320,6 @@ public void run() {
List<? extends ResubmissionTask> splitTasks = task.splitTask();
for (ResubmissionTask smallerTask : splitTasks) {
taskQueue.add(smallerTask);
queueSizes.get(taskQueue).incrementAndGet();
queuePointsCount.addAndGet(smallerTask.size());
}
break;
Expand All @@ -333,7 +333,6 @@ public void run() {
List<? extends ResubmissionTask> splitTasks = task.splitTask();
for (ResubmissionTask smallerTask : splitTasks) {
taskQueue.add(smallerTask);
queueSizes.get(taskQueue).incrementAndGet();
queuePointsCount.addAndGet(smallerTask.size());
}
} else {
Expand All @@ -349,7 +348,6 @@ public void run() {
task.service = null;
task.currentAgentId = null;
taskQueue.add(task);
queueSizes.get(taskQueue).incrementAndGet();
queuePointsCount.addAndGet(taskSize);
if (failures > 10) {
logger.warning("[RETRY THREAD " + threadId + "] saw too many submission errors. Will " +
Expand All @@ -359,7 +357,6 @@ public void run() {
} finally {
if (removeTask) {
taskQueue.remove();
queueSizes.get(taskQueue).decrementAndGet();
queuePointsCount.addAndGet(-taskSize);
}
}
Expand Down Expand Up @@ -497,12 +494,6 @@ public long getQueuedSourceTagTasksCount() {
return toReturn;
}

private ResubmissionTaskQueue getSmallestQueue() {
Optional<ResubmissionTaskQueue> smallestQueue = taskQueues.stream()
.min(Comparator.comparingInt(q -> queueSizes.get(q).intValue()));
return smallestQueue.orElse(null);
}

private Runnable getPostingSizerTask(final ResubmissionTask task) {
return () -> {
try {
Expand Down Expand Up @@ -603,6 +594,15 @@ private void handleSourceTagTaskRetry(RuntimeException failureException,
addSourceTagTaskToSmallestQueue(taskToRetry);
}

private void handleEventTaskRetry(RuntimeException failureException,
PostEventResultTask taskToRetry) {
if (failureException instanceof QueuedPushTooLargeException) {
taskToRetry.splitTask().forEach(this::addEventTaskToSmallestQueue);
} else {
addTaskToSmallestQueue(taskToRetry);
}
}

private void addSourceTagTaskToSmallestQueue(PostSourceTagResultTask taskToRetry) {
// we need to make sure the we preserve the order of operations for each source
ResubmissionTaskQueue queue = sourceTagTaskQueues.get(Math.abs(taskToRetry.id.hashCode()) % sourceTagTaskQueues.size());
Expand All @@ -619,11 +619,11 @@ private void addSourceTagTaskToSmallestQueue(PostSourceTagResultTask taskToRetry
}

private void addTaskToSmallestQueue(ResubmissionTask taskToRetry) {
ResubmissionTaskQueue queue = getSmallestQueue();
ResubmissionTaskQueue queue = taskQueues.stream().min(Comparator.comparingInt(TaskQueue::size)).
orElse(null);
if (queue != null) {
try {
queue.add(taskToRetry);
queueSizes.get(queue).incrementAndGet();
queuePointsCount.addAndGet(taskToRetry.size());
} catch (FileException e) {
logger.log(Level.SEVERE, "CRITICAL (Losing points!): WF-1: Submission queue is full.", e);
Expand All @@ -633,6 +633,20 @@ private void addTaskToSmallestQueue(ResubmissionTask taskToRetry) {
}
}

private void addEventTaskToSmallestQueue(ResubmissionTask taskToRetry) {
ResubmissionTaskQueue queue = eventTaskQueues.stream().
min(Comparator.comparingInt(TaskQueue::size)).orElse(null);
if (queue != null) {
try {
queue.add(taskToRetry);
} catch (FileException e) {
logger.log(Level.SEVERE, "CRITICAL (Losing events!): WF-1: Submission queue is full.", e);
}
} else {
logger.severe("CRITICAL (Losing events!): WF-2: No retry queues found.");
}
}

private static void parsePostingResponse(Response response) {
if (response == null) throw new RuntimeException("No response from server");
try {
Expand Down Expand Up @@ -748,6 +762,25 @@ public Response removeDescription(String id, boolean forceToQueue) {
}
}

@Override
public Response proxyEvents(UUID proxyId, List<Event> eventBatch, boolean forceToQueue) {
PostEventResultTask task = new PostEventResultTask(proxyId, eventBatch);

if (forceToQueue) {
addEventTaskToSmallestQueue(task);
return Response.status(Response.Status.NOT_ACCEPTABLE).build();
} else {
try {
parsePostingResponse(wrapped.proxyEvents(proxyId, eventBatch));
} catch (RuntimeException ex) {
logger.warning("Unable to create events: " + ExceptionUtils.getFullStackTrace(ex));
addEventTaskToSmallestQueue(task);
return Response.status(Response.Status.NOT_ACCEPTABLE).build();
}
return Response.ok().build();
}
}

@Override
public Response setDescription(String id, String desc, boolean forceToQueue) {
PostSourceTagResultTask task = new PostSourceTagResultTask(id, desc,
Expand Down Expand Up @@ -823,6 +856,49 @@ public Response removeTag(String id, String tagValue, boolean forceToQueue) {
}
}

@Override
public Response proxyEvents(UUID proxyId, List<Event> events) {
return proxyEvents(proxyId, events, false);
}

public static class PostEventResultTask extends ResubmissionTask<PostEventResultTask> {
private final UUID proxyId;
private final List<Event> events;

public PostEventResultTask(UUID proxyId, List<Event> events) {
this.proxyId = proxyId;
this.events = events;
}

@Override
public List<PostEventResultTask> splitTask() {
if (events.size() > 1) {
// in this case, split the payload in 2 batches approximately in the middle.
int splitPoint = events.size() / 2;
return ImmutableList.of(
new PostEventResultTask(proxyId, events.subList(0, splitPoint)),
new PostEventResultTask(proxyId, events.subList(splitPoint, events.size())));
}
return ImmutableList.of(this);
basilisk487 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public int size() {
return events.size();
}

@Override
public void execute(Object callback) {
Response response;
try {
response = service.proxyEvents(proxyId, events);
} catch (Exception ex) {
throw new RuntimeException(SERVER_ERROR + ": " + Throwables.getRootCause(ex));
}
parsePostingResponse(response);
}
}

public static class PostSourceTagResultTask extends ResubmissionTask<PostSourceTagResultTask> {
private final String id;
private final String[] tagValues;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.wavefront.agent.api;

import com.wavefront.dto.Event;

import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -71,4 +73,13 @@ Response proxyReport(@HeaderParam("X-WF-PROXY-ID") final UUID proxyId,
* @param forceToQueue Whether to bypass posting data to the API and write to queue instead.
*/
Response removeDescription(String id, boolean forceToQueue);

/**
* Create an event.
*
* @param proxyId id of the proxy submitting events.
* @param eventBatch events to create.
* @param forceToQueue Whether to bypass posting data to the API and write to queue instead.
*/
Response proxyEvents(UUID proxyId, List<Event> eventBatch, boolean forceToQueue);
}
Loading