Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [ML] Allow a certain number of ill-formatted rows when delimited format is specified (#55735) #55944

Merged
merged 1 commit into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.filestructurefinder;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure;
Expand Down Expand Up @@ -34,7 +35,6 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {
private static final String REGEX_NEEDS_ESCAPE_PATTERN = "([\\\\|()\\[\\]{}^$.+*?])";
private static final int MAX_LEVENSHTEIN_COMPARISONS = 100;
private static final int LONG_FIELD_THRESHOLD = 100;

private final List<String> sampleMessages;
private final FileStructure structure;

Expand Down Expand Up @@ -80,6 +80,11 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
for (int index = isHeaderInFile ? 1 : 0; index < rows.size(); ++index) {
List<String> row = rows.get(index);
int lineNumber = lineNumbers.get(index);
// Indicates an illformatted row. We allow a certain number of these
if (row.size() != columnNames.length) {
prevMessageEndLineNumber = lineNumber;
continue;
}
Map<String, String> sampleRecord = new LinkedHashMap<>();
Util.filterListToMap(sampleRecord, columnNames,
trimFields ? row.stream().map(field -> (field == null) ? null : field.trim()).collect(Collectors.toList()) : row);
Expand Down Expand Up @@ -488,7 +493,7 @@ static boolean lineHasUnescapedQuote(String line, CsvPreference csvPreference) {
}

static boolean canCreateFromSample(List<String> explanation, String sample, int minFieldsPerRow, CsvPreference csvPreference,
String formatName) {
String formatName, double allowedFractionOfBadLines) {

// Logstash's CSV parser won't tolerate fields where just part of the
// value is quoted, whereas SuperCSV will, hence this extra check
Expand All @@ -501,11 +506,13 @@ static boolean canCreateFromSample(List<String> explanation, String sample, int
}
}

int numberOfLinesInSample = sampleLines.length;
try (CsvListReader csvReader = new CsvListReader(new StringReader(sample), csvPreference)) {

int fieldsInFirstRow = -1;
int fieldsInLastRow = -1;

List<Integer> illFormattedRows = new ArrayList<>();
int numberOfRows = 0;
try {
List<String> row;
Expand All @@ -529,11 +536,27 @@ static boolean canCreateFromSample(List<String> explanation, String sample, int
--fieldsInThisRow;
}

if (fieldsInLastRow != fieldsInFirstRow) {
explanation.add("Not " + formatName + " because row [" + (numberOfRows - 1) +
"] has a different number of fields to the first row: [" + fieldsInFirstRow + "] and [" +
fieldsInLastRow + "]");
return false;
// TODO: might be good one day to gather a distribution of the most common field counts
// But, this would require iterating (or at least sampling) all the lines.
if (fieldsInThisRow != fieldsInFirstRow) {
illFormattedRows.add(numberOfRows - 1);
// This calculation is complicated by the possibility of multi-lined CSV columns
// `getLineNumber` is a current count of lines, regardless of row count, so
// this formula is just an approximation, but gets more accurate the further
// through the sample you are.
double totalNumberOfRows = (numberOfRows + numberOfLinesInSample - csvReader.getLineNumber());
// We should only allow a certain percentage of ill formatted rows
// as it may have and down stream effects
if (illFormattedRows.size() > Math.ceil(allowedFractionOfBadLines * totalNumberOfRows)) {
explanation.add(new ParameterizedMessage(
"Not {} because {} or more rows did not have the same number of fields as the first row ({}). Bad rows {}",
formatName,
illFormattedRows.size(),
fieldsInFirstRow,
illFormattedRows).getFormattedMessage());
return false;
}
continue;
}

fieldsInLastRow = fieldsInThisRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

public class DelimitedFileStructureFinderFactory implements FileStructureFinderFactory {

static final double DELIMITER_OVERRIDDEN_ALLOWED_FRACTION_OF_BAD_LINES = 0.10d;
static final double FORMAT_OVERRIDDEN_ALLOWED_FRACTION_OF_BAD_LINES = 0.05d;
private final CsvPreference csvPreference;
private final int minFieldsPerRow;
private final boolean trimFields;
Expand Down Expand Up @@ -44,7 +46,7 @@ public boolean canFindFormat(FileStructure.Format format) {
* it could have been truncated when the file was sampled.
*/
@Override
public boolean canCreateFromSample(List<String> explanation, String sample) {
public boolean canCreateFromSample(List<String> explanation, String sample, double allowedFractionOfBadLines) {
String formatName;
switch ((char) csvPreference.getDelimiterChar()) {
case ',':
Expand All @@ -57,7 +59,12 @@ public boolean canCreateFromSample(List<String> explanation, String sample) {
formatName = Character.getName(csvPreference.getDelimiterChar()).toLowerCase(Locale.ROOT) + " delimited values";
break;
}
return DelimitedFileStructureFinder.canCreateFromSample(explanation, sample, minFieldsPerRow, csvPreference, formatName);
return DelimitedFileStructureFinder.canCreateFromSample(explanation,
sample,
minFieldsPerRow,
csvPreference,
formatName,
allowedFractionOfBadLines);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ public interface FileStructureFinderFactory {
* @param explanation List of reasons for making decisions. May contain items when passed and new reasons
* can be appended by this method.
* @param sample A sample from the file to be ingested.
* @param allowedFractionOfBadLines How many lines of the passed sample are allowed to be considered "bad".
* Provided as a fraction from interval [0, 1]
* @return <code>true</code> if this factory can create an appropriate
* file structure given the sample; otherwise <code>false</code>.
*/
boolean canCreateFromSample(List<String> explanation, String sample);
boolean canCreateFromSample(List<String> explanation, String sample, double allowedFractionOfBadLines);

/**
* Create an object representing the structure of a file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
Expand Down Expand Up @@ -276,14 +277,17 @@ FileStructureFinder makeBestStructureFinder(List<String> explanation, String sam
Character quote = overrides.getQuote();
Boolean shouldTrimFields = overrides.getShouldTrimFields();
List<FileStructureFinderFactory> factories;
double allowedFractionOfBadLines = 0.0;
if (delimiter != null) {
allowedFractionOfBadLines = DelimitedFileStructureFinderFactory.DELIMITER_OVERRIDDEN_ALLOWED_FRACTION_OF_BAD_LINES;

// If a precise delimiter is specified, we only need one structure finder
// factory, and we'll tolerate as little as one column in the input
factories = Collections.singletonList(new DelimitedFileStructureFinderFactory(delimiter, (quote == null) ? '"' : quote, 1,
(shouldTrimFields == null) ? (delimiter == '|') : shouldTrimFields));

} else if (quote != null || shouldTrimFields != null) {
} else if (quote != null || shouldTrimFields != null || FileStructure.Format.DELIMITED.equals(overrides.getFormat())) {
allowedFractionOfBadLines = DelimitedFileStructureFinderFactory.FORMAT_OVERRIDDEN_ALLOWED_FRACTION_OF_BAD_LINES;

// The delimiter is not specified, but some other aspect of delimited files is,
// so clone our default delimited factories altering the overridden values
Expand All @@ -301,7 +305,7 @@ FileStructureFinder makeBestStructureFinder(List<String> explanation, String sam

for (FileStructureFinderFactory factory : factories) {
timeoutChecker.check("high level format detection");
if (factory.canCreateFromSample(explanation, sample)) {
if (factory.canCreateFromSample(explanation, sample, allowedFractionOfBadLines)) {
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, lineMergeSizeLimit, overrides,
timeoutChecker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public boolean canFindFormat(FileStructure.Format format) {
* documents must be non-empty, to prevent lines containing "{}" from matching.
*/
@Override
public boolean canCreateFromSample(List<String> explanation, String sample) {
public boolean canCreateFromSample(List<String> explanation, String sample, double allowedFractionOfBadLines) {

int completeDocCount = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public boolean canFindFormat(FileStructure.Format format) {
* non-blank lines.
*/
@Override
public boolean canCreateFromSample(List<String> explanation, String sample) {
public boolean canCreateFromSample(List<String> explanation, String sample, double allowedFractionOfBadLines) {
if (sample.indexOf('\n') < 0) {
explanation.add("Not text because sample contains no newlines");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public boolean canFindFormat(FileStructure.Format format) {
* necessarily have to be complete (as the sample could have truncated it).
*/
@Override
public boolean canCreateFromSample(List<String> explanation, String sample) {
public boolean canCreateFromSample(List<String> explanation, String sample, double allowedFractionOfBadLines) {

int completeDocCount = 0;
String commonRootElementName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,78 +16,78 @@ public class DelimitedFileStructureFinderFactoryTests extends FileStructureTestC

public void testCanCreateCsvFromSampleGivenCsv() {

assertTrue(csvFactory.canCreateFromSample(explanation, CSV_SAMPLE));
assertTrue(csvFactory.canCreateFromSample(explanation, CSV_SAMPLE, 0.0));
}

public void testCanCreateCsvFromSampleGivenTsv() {

assertFalse(csvFactory.canCreateFromSample(explanation, TSV_SAMPLE));
assertFalse(csvFactory.canCreateFromSample(explanation, TSV_SAMPLE, 0.0));
}

public void testCanCreateCsvFromSampleGivenSemiColonDelimited() {

assertFalse(csvFactory.canCreateFromSample(explanation, SEMI_COLON_DELIMITED_SAMPLE));
assertFalse(csvFactory.canCreateFromSample(explanation, SEMI_COLON_DELIMITED_SAMPLE, 0.0));
}

public void testCanCreateCsvFromSampleGivenPipeDelimited() {

assertFalse(csvFactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE));
assertFalse(csvFactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE, 0.0));
}

public void testCanCreateCsvFromSampleGivenText() {

assertFalse(csvFactory.canCreateFromSample(explanation, TEXT_SAMPLE));
assertFalse(csvFactory.canCreateFromSample(explanation, TEXT_SAMPLE, 0.0));
}

// TSV - no need to check NDJSON, XML or CSV because they come earlier in the order we check formats

public void testCanCreateTsvFromSampleGivenTsv() {

assertTrue(tsvFactory.canCreateFromSample(explanation, TSV_SAMPLE));
assertTrue(tsvFactory.canCreateFromSample(explanation, TSV_SAMPLE, 0.0));
}

public void testCanCreateTsvFromSampleGivenSemiColonDelimited() {

assertFalse(tsvFactory.canCreateFromSample(explanation, SEMI_COLON_DELIMITED_SAMPLE));
assertFalse(tsvFactory.canCreateFromSample(explanation, SEMI_COLON_DELIMITED_SAMPLE, 0.0));
}

public void testCanCreateTsvFromSampleGivenPipeDelimited() {

assertFalse(tsvFactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE));
assertFalse(tsvFactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE, 0.0));
}

public void testCanCreateTsvFromSampleGivenText() {

assertFalse(tsvFactory.canCreateFromSample(explanation, TEXT_SAMPLE));
assertFalse(tsvFactory.canCreateFromSample(explanation, TEXT_SAMPLE, 0.0));
}

// Semi-colon delimited - no need to check NDJSON, XML, CSV or TSV because they come earlier in the order we check formats

public void testCanCreateSemiColonDelimitedFromSampleGivenSemiColonDelimited() {

assertTrue(semiColonDelimitedfactory.canCreateFromSample(explanation, SEMI_COLON_DELIMITED_SAMPLE));
assertTrue(semiColonDelimitedfactory.canCreateFromSample(explanation, SEMI_COLON_DELIMITED_SAMPLE, 0.0));
}

public void testCanCreateSemiColonDelimitedFromSampleGivenPipeDelimited() {

assertFalse(semiColonDelimitedfactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE));
assertFalse(semiColonDelimitedfactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE, 0.0));
}

public void testCanCreateSemiColonDelimitedFromSampleGivenText() {

assertFalse(semiColonDelimitedfactory.canCreateFromSample(explanation, TEXT_SAMPLE));
assertFalse(semiColonDelimitedfactory.canCreateFromSample(explanation, TEXT_SAMPLE, 0.0));
}

// Pipe delimited - no need to check NDJSON, XML, CSV, TSV or semi-colon delimited
// values because they come earlier in the order we check formats

public void testCanCreatePipeDelimitedFromSampleGivenPipeDelimited() {

assertTrue(pipeDelimitedFactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE));
assertTrue(pipeDelimitedFactory.canCreateFromSample(explanation, PIPE_DELIMITED_SAMPLE, 0.0));
}

public void testCanCreatePipeDelimitedFromSampleGivenText() {

assertFalse(pipeDelimitedFactory.canCreateFromSample(explanation, TEXT_SAMPLE));
assertFalse(pipeDelimitedFactory.canCreateFromSample(explanation, TEXT_SAMPLE, 0.0));
}
}
Loading