Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport: Introduce on_failure_pipeline ingest metadata inside on_failure block #49596

Merged
merged 4 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ The `if` condition can be more then a simple equality check.
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].

IMPORTANT: The value of ctx is read-only in `if` conditions.
IMPORTANT: The value of ctx is read-only in `if` conditions.

A more complex `if` condition that drops the document (i.e. not index it)
unless it has a multi-valued tag field with at least one value that contains the characters
Expand Down Expand Up @@ -722,8 +722,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`.

You may want to retrieve the actual error message that was thrown
by a failed processor. To do so you can access metadata fields called
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
from within the context of an `on_failure` block.
`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
These fields are only accessible from within the context of an `on_failure` block.

Here is an updated version of the example that you
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ teardown:
pipeline: "outer"
body: {}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
});
} else {
pipeline.execute(ingestDocument, (result, e) -> {
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (e == null) {
handler.accept(new SimulateDocumentBaseResult(result), null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor {
public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";

private final boolean ignoreFailure;
private final List<Processor> processors;
Expand Down Expand Up @@ -144,7 +145,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
IngestProcessorException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
newCompoundProcessorException(e, processor, ingestDocument);
if (onFailureProcessors.isEmpty()) {
handler.accept(null, compoundProcessorException);
} else {
Expand Down Expand Up @@ -177,7 +178,7 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
onFailureProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) {
removeFailureMetadata(ingestDocument);
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
return;
}
if (result == null) {
Expand All @@ -192,34 +193,46 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
List<String> processorTypeHeader = cause.getHeader("processor_type");
List<String> processorTagHeader = cause.getHeader("processor_tag");
List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
if (failedPipelineId != null) {
ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
}
}

private void removeFailureMetadata(IngestDocument ingestDocument) {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
}

private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
return (IngestProcessorException) e;
}

IngestProcessorException exception = new IngestProcessorException(e);

String processorType = processor.getType();
if (processorType != null) {
exception.addHeader("processor_type", processorType);
}
String processorTag = processor.getTag();
if (processorTag != null) {
exception.addHeader("processor_tag", processorTag);
}
List<String> pipelineStack = document.getPipelineStack();
if (pipelineStack.size() > 1) {
exception.addHeader("pipeline_origin", pipelineStack);
}

return exception;
}
Expand Down
17 changes: 13 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -60,7 +60,7 @@ public final class IngestDocument {
private final Map<String, Object> ingestMetadata;

// Contains all pipelines that have been executed for this document
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
private final Set<String> executedPipelines = new LinkedHashSet<>();

public IngestDocument(String index, String type, String id, String routing,
Long version, VersionType versionType, Map<String, Object> source) {
Expand Down Expand Up @@ -647,16 +647,25 @@ private static Object deepCopy(Object value) {
* @param handler handles the result or failure
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline)) {
if (executedPipelines.add(pipeline.getId())) {
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline);
executedPipelines.remove(pipeline.getId());
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
}
}

/**
* @return a pipeline stack; all pipelines that are in execution by this document in reverse order
*/
List<String> getPipelineStack() {
List<String> pipelineStack = new ArrayList<>(executedPipelines);
Collections.reverse(pipelineStack);
return pipelineStack;
}

@Override
public boolean equals(Object obj) {
if (obj == this) { return true; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
pipeline.execute(ingestDocument, (result, e) -> {
ingestDocument.executePipeline(pipeline, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
if (e != null) {
Expand Down
Loading