Skip to content

Commit

Permalink
Allow pipeline processor to ignore missing pipelines (#87354)
Browse files Browse the repository at this point in the history
Add `ignore_missing_pipeline` option to `pipeline` processor. This
controls whether the `pipeline` processor should fail with an error if
no pipeline with a name specified in the `name` option exists.

This enhancement is useful to setup a pipeline infrastructure that
lazily adds extension points for overwrites. So that for specific
cluster setups custom pre-processing can be added at a later point in
time.

Relates to #87323
  • Loading branch information
martijnvg authored Jun 7, 2022
1 parent a4fdb0b commit 7154608
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/87354.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87354
summary: Allow pipeline processor to ignore missing pipelines
area: Ingest
type: enhancement
issues: []
5 changes: 3 additions & 2 deletions docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ Executes another pipeline.
.Pipeline Options
[options="header"]
|======
| Name | Required | Default | Description
| `name` | yes | - | The name of the pipeline to execute. Supports <<template-snippets,template snippets>>.
| Name | Required | Default | Description
| `name` | yes | - | The name of the pipeline to execute. Supports <<template-snippets,template snippets>>.
| `ignore_missing_pipeline` | no | false | Whether to ignore missing pipelines instead of failing.
include::common-options.asciidoc[]
|======

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,94 @@ teardown:
- match: { _source.pipelines.0: "pipeline1" }
- match: { _source.pipelines.1: "another_pipeline" }
- match: { _source.pipelines.2: "another_pipeline2" }

---
"Test ignore_missing_pipeline parameter":
- do:
ingest.put_pipeline:
id: "my-pipeline"
body: >
{
"processors" : [
{
"pipeline" : {
"name": "non_existing_pipeline",
"ignore_missing_pipeline": false
}
}
]
}
- match: { acknowledged: true }

- do:
catch: /illegal_state_exception/
index:
index: test
id: "1"
pipeline: "my-pipeline"
body: {}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [non_existing_pipeline]" }

- do:
ingest.simulate:
id: "my-pipeline"
body: >
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- match: { docs.0.error.root_cause.0.type: "illegal_state_exception" }
- match: { docs.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [non_existing_pipeline]" }

- do:
ingest.put_pipeline:
id: "my-pipeline"
body: >
{
"processors" : [
{
"pipeline" : {
"name": "non_existing_pipeline",
"ignore_missing_pipeline": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my-pipeline"
body: {}
- match: { _index: "test" }
- match: { _id: "1" }
- match: { _version: 1 }
- match: { result: "created" }

- do:
ingest.simulate:
id: "my-pipeline"
body: >
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- match: { docs.0.doc._index: "index" }
- match: { docs.0.doc._id: "id" }
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ public class PipelineProcessor extends AbstractProcessor {
public static final String TYPE = "pipeline";

private final TemplateScript.Factory pipelineTemplate;
private final boolean ignoreMissingPipeline;
private final IngestService ingestService;

PipelineProcessor(String tag, String description, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
PipelineProcessor(
String tag,
String description,
TemplateScript.Factory pipelineTemplate,
boolean ignoreMissingPipeline,
IngestService ingestService
) {
super(tag, description);
this.pipelineTemplate = pipelineTemplate;
this.ignoreMissingPipeline = ignoreMissingPipeline;
this.ingestService = ingestService;
}

Expand All @@ -33,10 +41,14 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
if (pipeline != null) {
ingestDocument.executePipeline(pipeline, handler);
} else {
handler.accept(
null,
new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']')
);
if (ignoreMissingPipeline) {
handler.accept(ingestDocument, null);
} else {
handler.accept(
null,
new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']')
);
}
}
}

Expand Down Expand Up @@ -85,7 +97,14 @@ public PipelineProcessor create(
"name",
ingestService.getScriptService()
);
return new PipelineProcessor(processorTag, description, pipelineTemplate, ingestService);
boolean ignoreMissingPipeline = ConfigurationUtils.readBooleanProperty(
TYPE,
processorTag,
config,
"ignore_missing_pipeline",
false
);
return new PipelineProcessor(processorTag, description, pipelineTemplate, ignoreMissingPipeline, ingestService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ public void testThrowsOnMissingPipeline() throws Exception {
assertEquals("Pipeline processor configured for non-existent pipeline [missingPipelineId]", e[0].getMessage());
}

public void testIgnoreMissingPipeline() throws Exception {
var ingestService = createIngestService();
var testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
var factory = new PipelineProcessor.Factory(ingestService);
var config = new HashMap<String, Object>();
config.put("name", "missingPipelineId");
config.put("ignore_missing_pipeline", true);

var r = new IngestDocument[1];
var e = new Exception[1];
var processor = factory.create(Collections.emptyMap(), null, null, config);
processor.execute(testIngestDocument, (result, e1) -> {
r[0] = result;
e[0] = e1;
});
assertNull(e[0]);
assertSame(testIngestDocument, r[0]);
}

public void testThrowsOnRecursivePipelineInvocations() throws Exception {
String innerPipelineId = "inner";
String outerPipelineId = "outer";
Expand Down Expand Up @@ -229,7 +248,7 @@ public String getType() {
});
if (i < (numPipelines - 1)) {
TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1));
processors.add(new PipelineProcessor(null, null, pipelineName, ingestService));
processors.add(new PipelineProcessor(null, null, pipelineName, false, ingestService));
}

Pipeline pipeline = new Pipeline(pipelineId, null, null, null, new CompoundProcessor(false, processors, List.of()));
Expand Down

0 comments on commit 7154608

Please sign in to comment.