Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV processor #49509

Merged
merged 18 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ include::processors/append.asciidoc[]
include::processors/bytes.asciidoc[]
include::processors/circle.asciidoc[]
include::processors/convert.asciidoc[]
include::processors/csv.asciidoc[]
include::processors/date.asciidoc[]
include::processors/date-index-name.asciidoc[]
include::processors/dissect.asciidoc[]
Expand Down
33 changes: 33 additions & 0 deletions docs/reference/ingest/processors/csv.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[[csv-processor]]
=== CSV Processor
Extracts fields from CSV line out of a single text field within a document. Any empty field in CSV will be skipped.

[[csv-options]]
.CSV Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to extract data from
| `target_fields` | yes | - | The array of fields to assign extracted values to
| `separator` | no | , | Separator used in CSV, has to be single character string
| `quote` | no | " | Quote used in CSV, has to be single character string
| `ignore_missing` | no | `true` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `trim` | no | `false` | Trim whitespaces in unquoted fields
include::common-options.asciidoc[]
|======

[source,js]
--------------------------------------------------
{
"csv": {
"field": "my_field",
"target_fields": ["field1, field2"],
}
}
--------------------------------------------------
// NOTCONSOLE

If the `trim` option is enabled then any whitespace in the beginning and in the end of each unquoted field will be trimmed.
For example with configuration above, a value of `A, B` will result in field `field2`
having value `{nbsp}B` (with space at the beginning). If `trim` is enabled `A, B` will result in field `field2`
having value `B` (no whitespace). Quoted fields will be left untouched.
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.IngestDocument;

final class CsvParser {

private static final char LF = '\n';
private static final char CR = '\r';
private static final char SPACE = ' ';
private static final char TAB = '\t';

private enum State {
START, UNQUOTED, QUOTED, QUOTED_END
}

private final char quote;
private final char separator;
private final boolean trim;
private final String[] headers;
private final IngestDocument ingestDocument;
private final StringBuilder builder = new StringBuilder();
private State state = State.START;
private String line;
private int currentHeader = 0;
private int startIndex = 0;
private int length;
private int currentIndex;

CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers) {
this.ingestDocument = ingestDocument;
this.quote = quote;
this.separator = separator;
this.trim = trim;
this.headers = headers;
}

void process(String line) {
this.line = line;
length = line.length();
for (currentIndex = 0; currentIndex < length; currentIndex++) {
switch (state) {
case START:
if (processStart()) {
return;
}
break;
case UNQUOTED:
if (processUnquoted()) {
return;
}
break;
case QUOTED:
processQuoted();
break;
case QUOTED_END:
if (processQuotedEnd()) {
return;
}
break;
}
}

//we've reached end of string, we need to handle last field
switch (state) {
case UNQUOTED:
setField(length);
break;
case QUOTED_END:
setField(length - 1);
break;
case QUOTED:
throw new IllegalArgumentException("Unmatched quote");
}
}

private boolean processStart() {
for (; currentIndex < length; currentIndex++) {
char c = currentChar();
if (c == quote) {
state = State.QUOTED;
builder.setLength(0);
startIndex = currentIndex + 1;
return false;
} else if (c == separator) {
startIndex++;
if (nextHeader()) {
return true;
}
} else if (isWhitespace(c)) {
if (trim) {
startIndex++;
}
} else {
state = State.UNQUOTED;
builder.setLength(0);
return false;
}
}
return true;
}

private boolean processUnquoted() {
int spaceCount = 0;
for (; currentIndex < length; currentIndex++) {
char c = currentChar();
if (c == LF || c == CR || c == quote) {
throw new IllegalArgumentException("Illegal character inside unquoted field at " + currentIndex);
} else if (trim && isWhitespace(c)) {
spaceCount++;
} else if (c == separator) {
state = State.START;
if (setField(currentIndex - spaceCount)) {
return true;
}
startIndex = currentIndex + 1;
return false;
} else {
spaceCount = 0;
}
}
return false;
}

private void processQuoted() {
for (; currentIndex < length; currentIndex++) {
if (currentChar() == quote) {
state = State.QUOTED_END;
break;
}
}
}

private boolean processQuotedEnd() {
char c = currentChar();
if (c == quote) {
builder.append(line, startIndex, currentIndex - 1).append(quote);
startIndex = currentIndex + 1;
state = State.QUOTED;
return false;
}
boolean shouldSetField = true;
for (; currentIndex < length; currentIndex++) {
c = currentChar();
if (isWhitespace(c)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the value we are parsing is a quoted string, are the spaces around it always trimmed?

I've taken a look at the RFC but I'm still not sure what the right path here is. It states "Spaces are considered part of a field and should not be ignored." In the implementation we have a trim option which is a fine extension, but if reading a quoted string, the spaces are trimmed regardless of if the trim option is set to true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spaces around quotes are always trimmed since it's well defined where start and end of data is. It's deviation from RFC, where no spaces are allowed both before and after quotes at all. But it's common for CSV files in the wild to have them, so this change make it easier for a user with no ambiguity introduced.
For unquoted fields situation is different, there's no way to tell automatically where the data starts so a user must decide here if he wants to trim whitespaces or not. That's why it's parametrized.
Now when I think of it, my gut feeling is most use cases would trim leading/trailing whitespaces so I would even consider default trim to true. What do you think? @martijnvg you opinion would be appreciated as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with trimming leading/trailing whitespaces by default, because it seems more practical to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@droberts195 convinced me otherwise: #49509 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, me too, reverted back to false

if (shouldSetField) {
if (setField(currentIndex - 1)) {
return true;
}
shouldSetField = false;
}
} else if (c == separator) {
if (shouldSetField && setField(currentIndex - 1)) {
return true;
}
startIndex = currentIndex + 1;
state = State.START;
return false;
} else {
throw new IllegalArgumentException("character '" + c + "' after quoted field at " + currentIndex);
}
}
return true;
}

private char currentChar() {
return line.charAt(currentIndex);
}

private boolean isWhitespace(char c) {
return c == SPACE || c == TAB;
}

private boolean setField(int endIndex) {
if (builder.length() == 0) {
ingestDocument.setFieldValue(headers[currentHeader], line.substring(startIndex, endIndex));
} else {
builder.append(line, startIndex, endIndex);
ingestDocument.setFieldValue(headers[currentHeader], builder.toString());
}
return nextHeader();
}

private boolean nextHeader() {
currentHeader++;
return currentHeader == headers.length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;

import java.util.List;
import java.util.Map;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* A processor that breaks line from CSV file into separate fields.
* If there's more fields requested than there is in the CSV, extra field will not be present in the document after processing.
* In the same way this processor will skip any field that is empty in CSV.
*
* By default it uses rules according to <a href="https://tools.ietf.org/html/rfc4180">RCF 4180</a> with one exception: whitespaces are
* allowed before or after quoted field. Processor can be tweaked with following parameters:
*
* quote: set custom quote character (defaults to ")
* separator: set custom separator (defaults to ,)
* trim: trim leading and trailing whitespaces in unquoted fields
*/
public final class CsvProcessor extends AbstractProcessor {

public static final String TYPE = "csv";

private final String field;
private final String[] headers;
private final boolean trim;
private final char quote;
private final char separator;
private final boolean ignoreMissing;

CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing) {
super(tag);
this.field = field;
this.headers = headers;
this.trim = trim;
this.quote = quote;
this.separator = separator;
this.ignoreMissing = ignoreMissing;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) {
if (headers.length == 0) {
return ingestDocument;
}

String line = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (line == null && ignoreMissing == false) {
return ingestDocument;
} else if (line == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
new CsvParser(ingestDocument, quote, separator, trim, headers).process(line);
return ingestDocument;
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements org.elasticsearch.ingest.Processor.Factory {
@Override
public CsvProcessor create(Map<String, org.elasticsearch.ingest.Processor.Factory> registry, String processorTag,
Map<String, Object> config) {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String quote = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "quote", "\"");
if (quote.length() != 1) {
throw newConfigurationException(TYPE, processorTag, "quote", "quote has to be single character like \" or '");
}
String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator", ",");
if (separator.length() != 1) {
throw newConfigurationException(TYPE, processorTag, "separator", "separator has to be single character like , or ;");
}
boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false);
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
List<String> targetFields = ConfigurationUtils.readList(TYPE, processorTag, config, "target_fields");
if (targetFields.isEmpty()) {
throw newConfigurationException(TYPE, processorTag, "target_fields", "target fields list can't be empty");
}
return new CsvProcessor(processorTag, field, targetFields.toArray(String[]::new), trim, separator.charAt(0), quote.charAt(0),
ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)),
entry(DissectProcessor.TYPE, new DissectProcessor.Factory()),
entry(DropProcessor.TYPE, new DropProcessor.Factory()),
entry(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory()));
entry(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory()),
entry(CsvProcessor.TYPE, new CsvProcessor.Factory()));
}

@Override
Expand Down
Loading