From 3da8cf03484e9993df4aeb94b2d8cfaf1a8b02cb Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Wed, 11 Dec 2019 14:52:04 +0100 Subject: [PATCH] CSV ingest processor (#49509) * CSV Processor for Ingest This change adds new ingest processor that breaks line from CSV file into separate fields. By default it conforms to RFC 4180 but can be tweaked. Closes #49113 --- docs/reference/ingest/ingest-node.asciidoc | 1 + docs/reference/ingest/processors/csv.asciidoc | 33 +++ .../ingest/common/CsvParser.java | 206 ++++++++++++++++ .../ingest/common/CsvProcessor.java | 108 +++++++++ .../ingest/common/IngestCommonPlugin.java | 3 +- .../ingest/common/CsvProcessorTests.java | 221 ++++++++++++++++++ .../rest-api-spec/test/ingest/250_csv.yml | 164 +++++++++++++ 7 files changed, 735 insertions(+), 1 deletion(-) create mode 100644 docs/reference/ingest/processors/csv.asciidoc create mode 100644 modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java create mode 100644 modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java create mode 100644 modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/250_csv.yml diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 0da0fd19e16ef..596bda67d3ed8 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -825,6 +825,7 @@ include::processors/append.asciidoc[] include::processors/bytes.asciidoc[] include::processors/circle.asciidoc[] include::processors/convert.asciidoc[] +include::processors/csv.asciidoc[] include::processors/date.asciidoc[] include::processors/date-index-name.asciidoc[] include::processors/dissect.asciidoc[] diff --git a/docs/reference/ingest/processors/csv.asciidoc b/docs/reference/ingest/processors/csv.asciidoc new file mode 100644 index 0000000000000..c589c9eb4361c --- /dev/null +++ b/docs/reference/ingest/processors/csv.asciidoc @@ -0,0 +1,33 @@ +[[csv-processor]] +=== CSV Processor +Extracts fields from CSV line out of a single text field within a document. Any empty field in CSV will be skipped. + +[[csv-options]] +.CSV Options +[options="header"] +|====== +| Name | Required | Default | Description +| `field` | yes | - | The field to extract data from +| `target_fields` | yes | - | The array of fields to assign extracted values to +| `separator` | no | , | Separator used in CSV, has to be single character string +| `quote` | no | " | Quote used in CSV, has to be single character string +| `ignore_missing` | no | `true` | If `true` and `field` does not exist, the processor quietly exits without modifying the document +| `trim` | no | `false` | Trim whitespaces in unquoted fields +include::common-options.asciidoc[] +|====== + +[source,js] +-------------------------------------------------- +{ + "csv": { + "field": "my_field", + "target_fields": ["field1, field2"], + } +} +-------------------------------------------------- +// NOTCONSOLE + +If the `trim` option is enabled then any whitespace in the beginning and in the end of each unquoted field will be trimmed. +For example with configuration above, a value of `A, B` will result in field `field2` +having value `{nbsp}B` (with space at the beginning). If `trim` is enabled `A, B` will result in field `field2` +having value `B` (no whitespace). Quoted fields will be left untouched. diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java new file mode 100644 index 0000000000000..077d12684e9a1 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ingest.IngestDocument; + +final class CsvParser { + + private static final char LF = '\n'; + private static final char CR = '\r'; + private static final char SPACE = ' '; + private static final char TAB = '\t'; + + private enum State { + START, UNQUOTED, QUOTED, QUOTED_END + } + + private final char quote; + private final char separator; + private final boolean trim; + private final String[] headers; + private final IngestDocument ingestDocument; + private final StringBuilder builder = new StringBuilder(); + private State state = State.START; + private String line; + private int currentHeader = 0; + private int startIndex = 0; + private int length; + private int currentIndex; + + CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers) { + this.ingestDocument = ingestDocument; + this.quote = quote; + this.separator = separator; + this.trim = trim; + this.headers = headers; + } + + void process(String line) { + this.line = line; + length = line.length(); + for (currentIndex = 0; currentIndex < length; currentIndex++) { + switch (state) { + case START: + if (processStart()) { + return; + } + break; + case UNQUOTED: + if (processUnquoted()) { + return; + } + break; + case QUOTED: + processQuoted(); + break; + case QUOTED_END: + if (processQuotedEnd()) { + return; + } + break; + } + } + + //we've reached end of string, we need to handle last field + switch (state) { + case UNQUOTED: + setField(length); + break; + case QUOTED_END: + setField(length - 1); + break; + case QUOTED: + throw new IllegalArgumentException("Unmatched quote"); + } + } + + private boolean processStart() { + for (; currentIndex < length; currentIndex++) { + char c = currentChar(); + if (c == quote) { + state = State.QUOTED; + builder.setLength(0); + startIndex = currentIndex + 1; + return false; + } else if (c == separator) { + startIndex++; + if (nextHeader()) { + return true; + } + } else if (isWhitespace(c)) { + if (trim) { + startIndex++; + } + } else { + state = State.UNQUOTED; + builder.setLength(0); + return false; + } + } + return true; + } + + private boolean processUnquoted() { + int spaceCount = 0; + for (; currentIndex < length; currentIndex++) { + char c = currentChar(); + if (c == LF || c == CR || c == quote) { + throw new IllegalArgumentException("Illegal character inside unquoted field at " + currentIndex); + } else if (trim && isWhitespace(c)) { + spaceCount++; + } else if (c == separator) { + state = State.START; + if (setField(currentIndex - spaceCount)) { + return true; + } + startIndex = currentIndex + 1; + return false; + } else { + spaceCount = 0; + } + } + return false; + } + + private void processQuoted() { + for (; currentIndex < length; currentIndex++) { + if (currentChar() == quote) { + state = State.QUOTED_END; + break; + } + } + } + + private boolean processQuotedEnd() { + char c = currentChar(); + if (c == quote) { + builder.append(line, startIndex, currentIndex - 1).append(quote); + startIndex = currentIndex + 1; + state = State.QUOTED; + return false; + } + boolean shouldSetField = true; + for (; currentIndex < length; currentIndex++) { + c = currentChar(); + if (isWhitespace(c)) { + if (shouldSetField) { + if (setField(currentIndex - 1)) { + return true; + } + shouldSetField = false; + } + } else if (c == separator) { + if (shouldSetField && setField(currentIndex - 1)) { + return true; + } + startIndex = currentIndex + 1; + state = State.START; + return false; + } else { + throw new IllegalArgumentException("character '" + c + "' after quoted field at " + currentIndex); + } + } + return true; + } + + private char currentChar() { + return line.charAt(currentIndex); + } + + private boolean isWhitespace(char c) { + return c == SPACE || c == TAB; + } + + private boolean setField(int endIndex) { + if (builder.length() == 0) { + ingestDocument.setFieldValue(headers[currentHeader], line.substring(startIndex, endIndex)); + } else { + builder.append(line, startIndex, endIndex); + ingestDocument.setFieldValue(headers[currentHeader], builder.toString()); + } + return nextHeader(); + } + + private boolean nextHeader() { + currentHeader++; + return currentHeader == headers.length; + } +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java new file mode 100644 index 0000000000000..66d10cc239e46 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; + +/** + * A processor that breaks line from CSV file into separate fields. + * If there's more fields requested than there is in the CSV, extra field will not be present in the document after processing. + * In the same way this processor will skip any field that is empty in CSV. + * + * By default it uses rules according to RCF 4180 with one exception: whitespaces are + * allowed before or after quoted field. Processor can be tweaked with following parameters: + * + * quote: set custom quote character (defaults to ") + * separator: set custom separator (defaults to ,) + * trim: trim leading and trailing whitespaces in unquoted fields + */ +public final class CsvProcessor extends AbstractProcessor { + + public static final String TYPE = "csv"; + + private final String field; + private final String[] headers; + private final boolean trim; + private final char quote; + private final char separator; + private final boolean ignoreMissing; + + CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing) { + super(tag); + this.field = field; + this.headers = headers; + this.trim = trim; + this.quote = quote; + this.separator = separator; + this.ignoreMissing = ignoreMissing; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) { + if (headers.length == 0) { + return ingestDocument; + } + + String line = ingestDocument.getFieldValue(field, String.class, ignoreMissing); + if (line == null && ignoreMissing == false) { + return ingestDocument; + } else if (line == null) { + throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); + } + new CsvParser(ingestDocument, quote, separator, trim, headers).process(line); + return ingestDocument; + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements org.elasticsearch.ingest.Processor.Factory { + @Override + public CsvProcessor create(Map registry, String processorTag, + Map config) { + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + String quote = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "quote", "\""); + if (quote.length() != 1) { + throw newConfigurationException(TYPE, processorTag, "quote", "quote has to be single character like \" or '"); + } + String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator", ","); + if (separator.length() != 1) { + throw newConfigurationException(TYPE, processorTag, "separator", "separator has to be single character like , or ;"); + } + boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + List targetFields = ConfigurationUtils.readList(TYPE, processorTag, config, "target_fields"); + if (targetFields.isEmpty()) { + throw newConfigurationException(TYPE, processorTag, "target_fields", "target fields list can't be empty"); + } + return new CsvProcessor(processorTag, field, targetFields.toArray(String[]::new), trim, separator.charAt(0), quote.charAt(0), + ignoreMissing); + } + } +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 4f99c850e5bd3..b37e5d13e4602 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -88,7 +88,8 @@ public Map getProcessors(Processor.Parameters paramet entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)), entry(DissectProcessor.TYPE, new DissectProcessor.Factory()), entry(DropProcessor.TYPE, new DropProcessor.Factory()), - entry(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory())); + entry(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory()), + entry(CsvProcessor.TYPE, new CsvProcessor.Factory())); } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java new file mode 100644 index 0000000000000..87da73cce129d --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java @@ -0,0 +1,221 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class CsvProcessorTests extends ESTestCase { + + private static final Character[] SEPARATORS = new Character[]{',', ';', '|', '.'}; + private final String quote; + private char separator; + + + public CsvProcessorTests(@Name("quote") String quote) { + this.quote = quote; + } + + @ParametersFactory + public static Iterable parameters() { + return Arrays.asList(new Object[]{"'"}, new Object[]{"\""}, new Object[]{""}); + } + + @Before + public void setup() { + separator = randomFrom(SEPARATORS); + } + + public void testExactNumberOfFields() throws Exception { + int numItems = randomIntBetween(2, 10); + Map items = new LinkedHashMap<>(); + for (int i = 0; i < numItems; i++) { + items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)); + } + String[] headers = items.keySet().toArray(new String[numItems]); + String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + "")); + + IngestDocument ingestDocument = processDocument(headers, csv); + + items.forEach((key, value) -> assertEquals(value, ingestDocument.getFieldValue(key, String.class))); + } + + public void testLessFieldsThanHeaders() throws Exception { + int numItems = randomIntBetween(4, 10); + Map items = new LinkedHashMap<>(); + for (int i = 0; i < numItems; i++) { + items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)); + } + String[] headers = items.keySet().toArray(new String[numItems]); + String csv = items.values().stream().map(v -> quote + v + quote).limit(3).collect(Collectors.joining(separator + "")); + + IngestDocument ingestDocument = processDocument(headers, csv); + + items.keySet().stream().skip(3).forEach(key -> assertFalse(ingestDocument.hasField(key))); + items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class))); + } + + public void testLessHeadersThanFields() throws Exception { + int numItems = randomIntBetween(5, 10); + Map items = new LinkedHashMap<>(); + for (int i = 0; i < numItems; i++) { + items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)); + } + String[] headers = items.keySet().stream().limit(3).toArray(String[]::new); + String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + "")); + + IngestDocument ingestDocument = processDocument(headers, csv); + + items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class))); + } + + public void testSingleField() throws Exception { + String[] headers = new String[]{randomAlphaOfLengthBetween(5, 10)}; + String value = randomAlphaOfLengthBetween(5, 10); + String csv = quote + value + quote; + + IngestDocument ingestDocument = processDocument(headers, csv); + + assertEquals(value, ingestDocument.getFieldValue(headers[0], String.class)); + } + + public void testEscapedQuote() throws Exception { + int numItems = randomIntBetween(2, 10); + Map items = new LinkedHashMap<>(); + for (int i = 0; i < numItems; i++) { + items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10) + quote + quote + randomAlphaOfLengthBetween(5 + , 10) + quote + quote); + } + String[] headers = items.keySet().toArray(new String[numItems]); + String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + "")); + + IngestDocument ingestDocument = processDocument(headers, csv); + + items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key, String.class))); + } + + public void testQuotedStrings() throws Exception { + assumeFalse("quote needed", quote.isEmpty()); + int numItems = randomIntBetween(2, 10); + Map items = new LinkedHashMap<>(); + for (int i = 0; i < numItems; i++) { + items.put(randomAlphaOfLengthBetween(5, 10), + separator + randomAlphaOfLengthBetween(5, 10) + separator + "\n\r" + randomAlphaOfLengthBetween(5, 10)); + } + String[] headers = items.keySet().toArray(new String[numItems]); + String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + "")); + + IngestDocument ingestDocument = processDocument(headers, csv); + + items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key, + String.class))); + } + + public void testEmptyFields() throws Exception { + int numItems = randomIntBetween(5, 10); + Map items = new LinkedHashMap<>(); + for (int i = 0; i < numItems; i++) { + items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)); + } + String[] headers = items.keySet().toArray(new String[numItems]); + String csv = + items.values().stream().map(v -> quote + v + quote).limit(numItems - 1).skip(3).collect(Collectors.joining(separator + "")); + + IngestDocument ingestDocument = processDocument(headers, + "" + separator + "" + separator + "" + separator + csv + separator + separator + + "abc"); + + items.keySet().stream().limit(3).forEach(key -> assertFalse(ingestDocument.hasField(key))); + items.entrySet().stream().limit(numItems - 1).skip(3).forEach(e -> assertEquals(e.getValue(), + ingestDocument.getFieldValue(e.getKey(), String.class))); + items.keySet().stream().skip(numItems - 1).forEach(key -> assertFalse(ingestDocument.hasField(key))); + } + + public void testWrongStings() throws Exception { + assumeTrue("single run only", quote.isEmpty()); + expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\"abc")); + expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "\"abc\"asd")); + expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "\"abcasd")); + expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\nabc")); + expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\rabc")); + } + + public void testQuotedWhitespaces() throws Exception { + assumeFalse("quote needed", quote.isEmpty()); + IngestDocument document = processDocument(new String[]{"a", "b", "c", "d"}, + " abc " + separator + " def" + separator + "ghi " + separator + " " + quote + " ooo " + quote); + assertEquals("abc", document.getFieldValue("a", String.class)); + assertEquals("def", document.getFieldValue("b", String.class)); + assertEquals("ghi", document.getFieldValue("c", String.class)); + assertEquals(" ooo ", document.getFieldValue("d", String.class)); + } + + public void testUntrimmed() throws Exception { + assumeFalse("quote needed", quote.isEmpty()); + IngestDocument document = processDocument(new String[]{"a", "b", "c", "d", "e", "f"}, + " abc " + separator + " def" + separator + "ghi " + separator + " " + + quote + "ooo" + quote + " " + separator + " " + quote + "jjj" + quote + " ", false); + assertEquals(" abc ", document.getFieldValue("a", String.class)); + assertEquals(" def", document.getFieldValue("b", String.class)); + assertEquals("ghi ", document.getFieldValue("c", String.class)); + assertEquals("ooo", document.getFieldValue("d", String.class)); + assertEquals("jjj", document.getFieldValue("e", String.class)); + assertFalse(document.hasField("f")); + } + + public void testEmptyHeaders() throws Exception { + assumeTrue("single run only", quote.isEmpty()); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "abc,abc"); + HashMap metadata = new HashMap<>(ingestDocument.getSourceAndMetadata()); + + CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false); + + processor.execute(ingestDocument); + + assertEquals(metadata, ingestDocument.getSourceAndMetadata()); + } + + private IngestDocument processDocument(String[] headers, String csv) throws Exception { + return processDocument(headers, csv, true); + } + + private IngestDocument processDocument(String[] headers, String csv, boolean trim) throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, csv); + char quoteChar = quote.isEmpty() ? '"' : quote.charAt(0); + CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false); + + processor.execute(ingestDocument); + + return ingestDocument; + } +} diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/250_csv.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/250_csv.yml new file mode 100644 index 0000000000000..a38805fb1fec3 --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/250_csv.yml @@ -0,0 +1,164 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test CSV Processor defaults": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "csv": { + "field": "value", + "target_fields":["a","b","c"] + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "value": "aa,bb,cc" + } + + - do: + get: + index: test + id: 1 + - match: { _source.a: "aa" } + - match: { _source.b: "bb" } + - match: { _source.c: "cc" } + +--- +"Test CSV Processor quote and separator": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "csv": { + "field": "value", + "target_fields":["a","b","c","d","e"], + "quote": "'", + "separator": ";" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "value": "'aa';'b;b';'cc';d,d;'ee''ee'" + } + + - do: + get: + index: test + id: 1 + - match: { _source.a: "aa" } + - match: { _source.b: "b;b" } + - match: { _source.c: "cc" } + - match: { _source.d: "d,d" } + - match: { _source.e: "ee'ee" } + +--- +"Test CSV Processor trim": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "csv": { + "field": "value", + "target_fields":["a","b","c"], + "trim": true, + "quote": "'" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "value": " aa, bb , 'cc'" + } + + - do: + get: + index: test + id: 1 + - match: { _source.a: "aa" } + - match: { _source.b: "bb" } + - match: { _source.c: "cc" } + +--- +"Test CSV Processor trim log": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "csv": { + "field": "value", + "target_fields":["date","level","server","id","msg"], + "trim": true, + "separator": "|" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "value": "2018-01-06 16:56:14.295748|INFO |VirtualServer |1 |listening on 0.0.0.0:9987, :::9987" + } + + - do: + get: + index: test + id: 1 + - match: { _source.date: "2018-01-06 16:56:14.295748" } + - match: { _source.level: "INFO" } + - match: { _source.server: "VirtualServer" } + - match: { _source.id: "1" } + - match: { _source.msg: "listening on 0.0.0.0:9987, :::9987" }