Skip to content

Commit

Permalink
ingest: Add ignore_missing property to foreach filter (#22147) (#31578)
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear authored Jun 26, 2018
1 parent 26a927a commit 13e1cf6
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 23 deletions.
7 changes: 4 additions & 3 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,10 @@ then it aborts the execution and leaves the array unmodified.
.Foreach Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The array field
| `processor` | yes | - | The processor to execute against each field
| Name | Required | Default | Description
| `field` | yes | - | The array field
| `processor` | yes | - | The processor to execute against each field
| `ignore_missing` | no | false | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
|======

Assume the following document:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
import static org.elasticsearch.ingest.ConfigurationUtils.readMap;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;

Expand All @@ -47,16 +48,28 @@ public final class ForEachProcessor extends AbstractProcessor {

private final String field;
private final Processor processor;
private final boolean ignoreMissing;

ForEachProcessor(String tag, String field, Processor processor) {
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
super(tag);
this.field = field;
this.processor = processor;
this.ignoreMissing = ignoreMissing;
}

boolean isIgnoreMissing() {
return ignoreMissing;
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
List values = ingestDocument.getFieldValue(field, List.class);
List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
if (values == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
}
List<Object> newValues = new ArrayList<>(values.size());
for (Object value : values) {
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
Expand Down Expand Up @@ -87,14 +100,15 @@ public static final class Factory implements Processor.Factory {
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
Set<Map.Entry<String, Map<String, Object>>> entries = processorConfig.entrySet();
if (entries.size() != 1) {
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor);
return new ForEachProcessor(tag, field, processor, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ public void testCreate() throws Exception {
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
assertFalse(forEachProcessor.isIgnoreMissing());
}

public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
config.put("ignore_missing", true);
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
assertTrue(forEachProcessor.isIgnoreMissing());
}

public void testCreateWithTooManyProcessorTypes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;

import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;

public class ForEachProcessorTests extends ESTestCase {
Expand All @@ -49,7 +49,8 @@ public void testExecute() throws Exception {
);

ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value")
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
false
);
processor.execute(ingestDocument);

Expand All @@ -69,7 +70,7 @@ public void testExecuteWithFailure() throws Exception {
throw new RuntimeException("failure");
}
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
try {
processor.execute(ingestDocument);
fail("exception expected");
Expand All @@ -89,7 +90,8 @@ public void testExecuteWithFailure() throws Exception {
});
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
processor = new ForEachProcessor(
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
false
);
processor.execute(ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
Expand All @@ -109,7 +111,7 @@ public void testMetaDataAvailable() throws Exception {
id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type"));
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument);

assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
Expand Down Expand Up @@ -137,7 +139,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new SetProcessor("_tag",
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")));
(model) -> model.get("other")), false);
processor.execute(ingestDocument);

assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
Expand Down Expand Up @@ -174,7 +176,7 @@ public String getTag() {
"_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values)
);

ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument);
@SuppressWarnings("unchecked")
List<String> result = ingestDocument.getFieldValue("values", List.class);
Expand All @@ -199,7 +201,7 @@ public void testModifyFieldsOutsideArray() throws Exception {
"_tag", "values", new CompoundProcessor(false,
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
));
), false);
processor.execute(ingestDocument);

List result = ingestDocument.getFieldValue("values", List.class);
Expand All @@ -225,7 +227,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws

TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
doc.getFieldValue("_source._value", String.class)));
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor);
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
forEachProcessor.execute(ingestDocument);

List result = ingestDocument.getFieldValue("values", List.class);
Expand Down Expand Up @@ -258,7 +260,7 @@ public void testNestedForEach() throws Exception {
doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
);
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor));
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
processor.execute(ingestDocument);

List result = ingestDocument.getFieldValue("values1.0.values2", List.class);
Expand All @@ -270,4 +272,16 @@ public void testNestedForEach() throws Exception {
assertThat(result.get(1), equalTo("JKL"));
}

public void testIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, Collections.emptyMap()
);
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
TestProcessor testProcessor = new TestProcessor(doc -> {});
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
}

}

0 comments on commit 13e1cf6

Please sign in to comment.