Skip to content

Commit

Permalink
Revert "ingest: processor stats (elastic#34202)"
Browse files Browse the repository at this point in the history
This reverts commit 6567729.
jasontedor committed Oct 21, 2018
1 parent bf5f0af commit 0577703
Showing 12 changed files with 146 additions and 652 deletions.
Original file line number Diff line number Diff line change
@@ -20,15 +20,12 @@
package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
@@ -43,33 +40,16 @@ public class CompoundProcessor implements Processor {
private final boolean ignoreFailure;
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final LongSupplier relativeTimeProvider;

CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
}

public CompoundProcessor(Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList());
}

public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
}
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors,
LongSupplier relativeTimeProvider) {
super();
this.ignoreFailure = ignoreFailure;
this.processors = processors;
this.onFailureProcessors = onFailureProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = new ArrayList<>(processors.size());
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
}

List<Tuple<Processor, IngestMetric>> getProcessorsWithMetrics() {
return processorsWithMetrics;
}

public boolean isIgnoreFailure() {
@@ -114,17 +94,12 @@ public String getTag() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInNanos = relativeTimeProvider.getAsLong();
for (Processor processor : processors) {
try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) {
return null;
}
} catch (Exception e) {
metric.ingestFailed();
if (ignoreFailure) {
continue;
}
@@ -137,15 +112,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
executeOnFailure(ingestDocument, compoundProcessorException);
break;
}
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
}
return ingestDocument;
}


void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, exception);
Original file line number Diff line number Diff line change
@@ -28,8 +28,6 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script;
@@ -44,51 +42,24 @@ public class ConditionalProcessor extends AbstractProcessor {
private final ScriptService scriptService;

private final Processor processor;
private final IngestMetric metric;
private final LongSupplier relativeTimeProvider;

ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
this(tag, script, scriptService, processor, System::nanoTime);
}

ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) {
super(tag);
this.condition = script;
this.scriptService = scriptService;
this.processor = processor;
this.metric = new IngestMetric();
this.relativeTimeProvider = relativeTimeProvider;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
// Only record metric if the script evaluates to true
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
return processor.execute(ingestDocument);
} catch (Exception e) {
metric.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
return processor.execute(ingestDocument);
}
return ingestDocument;
}

Processor getProcessor() {
return processor;
}

IngestMetric getMetric() {
return metric;
}

@Override
public String getType() {
return TYPE;
112 changes: 16 additions & 96 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
@@ -19,6 +19,19 @@

package org.elasticsearch.ingest;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
@@ -36,7 +49,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -49,19 +61,6 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Holder class for several ingest related services.
*/
@@ -263,59 +262,11 @@ public void applyClusterState(final ClusterChangedEvent event) {
Pipeline originalPipeline = originalPipelines.get(id);
if (originalPipeline != null) {
pipeline.getMetrics().add(originalPipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics);
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics);
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
//consistent id's per processor and/or semantic equals for each processor will be needed.
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
String type = compositeMetric.v1().getType();
IngestMetric metric = compositeMetric.v2();
if (oldMetricsIterator.hasNext()) {
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
String oldType = oldCompositeMetric.v1().getType();
IngestMetric oldMetric = oldCompositeMetric.v2();
if (type.equals(oldType)) {
metric.add(oldMetric);
}
}
}
}
}
});
}
}

/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
*/
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
List<Tuple<Processor, IngestMetric>> processorMetrics) {
//only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
if (processor instanceof CompoundProcessor) {
getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
} else {
//Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor) {
metric = ((ConditionalProcessor) processor).getMetric();
}
processorMetrics.add(new Tuple<>(processor, metric));
}
}
return processorMetrics;
}

private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
@@ -420,42 +371,11 @@ protected void doRun() {
}

public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.forEach((id, pipeline) -> {
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
getProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
});
});
return statsBuilder.build();
}

//package private for testing
static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
Map<String, IngestStats.Stats> statsPerPipeline =
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));

if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
sb.append(":");
sb.append(pipelineName);
}
String tag = processor.getTag();
if(tag != null && !tag.isEmpty()){
sb.append(":");
sb.append(tag);
}
return sb.toString();
return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
}

private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
Loading

0 comments on commit 0577703

Please sign in to comment.