From 3f3b70e92981b14825655cf51a124d150e6cf861 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 Jun 2022 11:19:40 +0200 Subject: [PATCH 1/3] Allow pipeline processor to ignore missing pipelines Add `ignore_missing_pipeline` option to `pipeline` processor. This controls whether the `pipeline` processor should fail with an error if no pipeline with a name specified in the `name` option exists. This enhancement is useful to setup a pipeline infrastructure that lazily adds extension points for overwrites. So that for specific cluster setups custom pre-processing can be added at a later point in time. Relates to #87323 --- .../ingest/processors/pipeline.asciidoc | 5 +- .../test/ingest/210_pipeline_processor.yml | 91 +++++++++++++++++++ .../ingest/PipelineProcessor.java | 25 +++-- .../ingest/PipelineProcessorTests.java | 21 ++++- 4 files changed, 133 insertions(+), 9 deletions(-) diff --git a/docs/reference/ingest/processors/pipeline.asciidoc b/docs/reference/ingest/processors/pipeline.asciidoc index 958085a4a965a..28d516ae3a844 100644 --- a/docs/reference/ingest/processors/pipeline.asciidoc +++ b/docs/reference/ingest/processors/pipeline.asciidoc @@ -10,8 +10,9 @@ Executes another pipeline. .Pipeline Options [options="header"] |====== -| Name | Required | Default | Description -| `name` | yes | - | The name of the pipeline to execute. Supports <>. +| Name | Required | Default | Description +| `name` | yes | - | The name of the pipeline to execute. Supports <>. +| `ignore_missing_pipeline` | no | false | Whether to ignore missing pipelines instead of failing. include::common-options.asciidoc[] |====== diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 71e2ea3b9ea79..da174831ff930 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -280,3 +280,94 @@ teardown: - match: { _source.pipelines.0: "pipeline1" } - match: { _source.pipelines.1: "another_pipeline" } - match: { _source.pipelines.2: "another_pipeline2" } + +--- +"Test ignore_missing_pipeline parameter": + - do: + ingest.put_pipeline: + id: "my-pipeline" + body: > + { + "processors" : [ + { + "pipeline" : { + "name": "non_existing_pipeline", + "ignore_missing_pipeline": false + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /illegal_state_exception/ + index: + index: test + id: "1" + pipeline: "my-pipeline" + body: {} + - match: { error.root_cause.0.type: "illegal_state_exception" } + - match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [non_existing_pipeline]" } + + - do: + ingest.simulate: + id: "my-pipeline" + body: > + { + "docs": [ + { + "_index": "index", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - match: { docs.0.error.root_cause.0.type: "illegal_state_exception" } + - match: { docs.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [non_existing_pipeline]" } + + - do: + ingest.put_pipeline: + id: "my-pipeline" + body: > + { + "processors" : [ + { + "pipeline" : { + "name": "non_existing_pipeline", + "ignore_missing_pipeline": true + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: "1" + pipeline: "my-pipeline" + body: {} + - match: { _index: "test" } + - match: { _id: "1" } + - match: { _version: 1 } + - match: { result: "created" } + + - do: + ingest.simulate: + id: "my-pipeline" + body: > + { + "docs": [ + { + "_index": "index", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - match: { docs.0.doc._index: "index" } + - match: { docs.0.doc._id: "id" } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index c98a723e7b480..b8d3466355a24 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -18,11 +18,13 @@ public class PipelineProcessor extends AbstractProcessor { public static final String TYPE = "pipeline"; private final TemplateScript.Factory pipelineTemplate; + private final boolean ignoreMissingPipeline; private final IngestService ingestService; - PipelineProcessor(String tag, String description, TemplateScript.Factory pipelineTemplate, IngestService ingestService) { + PipelineProcessor(String tag, String description, TemplateScript.Factory pipelineTemplate, boolean ignoreMissingPipeline, IngestService ingestService) { super(tag, description); this.pipelineTemplate = pipelineTemplate; + this.ignoreMissingPipeline = ignoreMissingPipeline; this.ingestService = ingestService; } @@ -33,10 +35,14 @@ public void execute(IngestDocument ingestDocument, BiConsumer()); + var factory = new PipelineProcessor.Factory(ingestService); + var config = new HashMap(); + config.put("name", "missingPipelineId"); + config.put("ignore_missing_pipeline", true); + + var r = new IngestDocument[1]; + var e = new Exception[1]; + var processor = factory.create(Collections.emptyMap(), null, null, config); + processor.execute(testIngestDocument, (result, e1) -> { + r[0] = result; + e[0] = e1; + }); + assertNull(e[0]); + assertSame(testIngestDocument, r[0]); + } + public void testThrowsOnRecursivePipelineInvocations() throws Exception { String innerPipelineId = "inner"; String outerPipelineId = "outer"; @@ -229,7 +248,7 @@ public String getType() { }); if (i < (numPipelines - 1)) { TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1)); - processors.add(new PipelineProcessor(null, null, pipelineName, ingestService)); + processors.add(new PipelineProcessor(null, null, pipelineName, false, ingestService)); } Pipeline pipeline = new Pipeline(pipelineId, null, null, null, new CompoundProcessor(false, processors, List.of())); From 72568239204b77aefe0cece18f5f31e837b8fd16 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 Jun 2022 11:20:49 +0200 Subject: [PATCH 2/3] Update docs/changelog/87354.yaml --- docs/changelog/87354.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/87354.yaml diff --git a/docs/changelog/87354.yaml b/docs/changelog/87354.yaml new file mode 100644 index 0000000000000..8bcef731203ad --- /dev/null +++ b/docs/changelog/87354.yaml @@ -0,0 +1,5 @@ +pr: 87354 +summary: Allow pipeline processor to ignore missing pipelines +area: Ingest +type: enhancement +issues: [] From 0268b509eaeb7b2728c6cd6e4659fa34c0ec9aac Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 Jun 2022 11:31:22 +0200 Subject: [PATCH 3/3] fixed checkstyle violation --- .../java/org/elasticsearch/ingest/PipelineProcessor.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index b8d3466355a24..85c57e2b0b891 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -21,7 +21,13 @@ public class PipelineProcessor extends AbstractProcessor { private final boolean ignoreMissingPipeline; private final IngestService ingestService; - PipelineProcessor(String tag, String description, TemplateScript.Factory pipelineTemplate, boolean ignoreMissingPipeline, IngestService ingestService) { + PipelineProcessor( + String tag, + String description, + TemplateScript.Factory pipelineTemplate, + boolean ignoreMissingPipeline, + IngestService ingestService + ) { super(tag, description); this.pipelineTemplate = pipelineTemplate; this.ignoreMissingPipeline = ignoreMissingPipeline;