-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat](refactor-param) add file format configuration #50225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package org.apache.doris.datasource.property.fileformat; | ||
|
|
||
| import org.apache.doris.nereids.exceptions.AnalysisException; | ||
| import org.apache.doris.thrift.TFileAttributes; | ||
| import org.apache.doris.thrift.TFileFormatType; | ||
| import org.apache.doris.thrift.TFileTextScanRangeParams; | ||
| import org.apache.doris.thrift.TResultFileSinkOptions; | ||
|
|
||
| import java.util.Map; | ||
|
|
||
| public class AvroFileFormatProperties extends FileFormatProperties { | ||
| public AvroFileFormatProperties() { | ||
| super(TFileFormatType.FORMAT_AVRO); | ||
| } | ||
|
|
||
| @Override | ||
| public void analyzeFileFormatProperties(Map<String, String> formatProperties, boolean isRemoveOriginProperty) | ||
| throws AnalysisException { | ||
| } | ||
|
|
||
| @Override | ||
| public TResultFileSinkOptions toTResultFileSinkOptions() { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public TFileAttributes toTFileAttributes() { | ||
| TFileAttributes fileAttributes = new TFileAttributes(); | ||
| TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); | ||
| fileAttributes.setTextParams(fileTextScanRangeParams); | ||
| return fileAttributes; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package org.apache.doris.datasource.property.fileformat; | ||
|
|
||
| import org.apache.doris.analysis.Separator; | ||
| import org.apache.doris.catalog.Column; | ||
| import org.apache.doris.common.util.Util; | ||
| import org.apache.doris.nereids.exceptions.AnalysisException; | ||
| import org.apache.doris.qe.ConnectContext; | ||
| import org.apache.doris.thrift.TFileAttributes; | ||
| import org.apache.doris.thrift.TFileFormatType; | ||
| import org.apache.doris.thrift.TFileTextScanRangeParams; | ||
| import org.apache.doris.thrift.TResultFileSinkOptions; | ||
| import org.apache.doris.thrift.TTextSerdeType; | ||
|
|
||
| import com.google.common.base.Strings; | ||
| import com.google.common.collect.Lists; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| public class CsvFileFormatProperties extends FileFormatProperties { | ||
| public static final Logger LOG = LogManager.getLogger( | ||
| org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class); | ||
|
|
||
| public static final String DEFAULT_COLUMN_SEPARATOR = "\t"; | ||
| public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001"; | ||
| public static final String DEFAULT_LINE_DELIMITER = "\n"; | ||
|
|
||
| public static final String PROP_COLUMN_SEPARATOR = "column_separator"; | ||
| public static final String PROP_LINE_DELIMITER = "line_delimiter"; | ||
|
|
||
| public static final String PROP_SKIP_LINES = "skip_lines"; | ||
| public static final String PROP_CSV_SCHEMA = "csv_schema"; | ||
| public static final String PROP_COMPRESS_TYPE = "compress_type"; | ||
| public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; | ||
|
|
||
| public static final String PROP_ENCLOSE = "enclose"; | ||
|
|
||
| private String headerType = ""; | ||
| private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE; | ||
| private String columnSeparator = DEFAULT_COLUMN_SEPARATOR; | ||
| private String lineDelimiter = DEFAULT_LINE_DELIMITER; | ||
| private boolean trimDoubleQuotes; | ||
| private int skipLines; | ||
| private byte enclose; | ||
|
|
||
| // used by tvf | ||
| // User specified csv columns, it will override columns got from file | ||
| private final List<Column> csvSchema = Lists.newArrayList(); | ||
|
|
||
| String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR; | ||
|
|
||
| public CsvFileFormatProperties() { | ||
| super(TFileFormatType.FORMAT_CSV_PLAIN); | ||
| } | ||
|
|
||
| public CsvFileFormatProperties(String defaultColumnSeparator, TTextSerdeType textSerdeType) { | ||
| super(TFileFormatType.FORMAT_CSV_PLAIN); | ||
| this.defaultColumnSeparator = defaultColumnSeparator; | ||
| this.textSerdeType = textSerdeType; | ||
| } | ||
|
|
||
| public CsvFileFormatProperties(String headerType) { | ||
| super(TFileFormatType.FORMAT_CSV_PLAIN); | ||
| this.headerType = headerType; | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| public void analyzeFileFormatProperties(Map<String, String> formatProperties, boolean isRemoveOriginProperty) | ||
| throws AnalysisException { | ||
| try { | ||
| // analyze properties specified by user | ||
| columnSeparator = getOrDefault(formatProperties, PROP_COLUMN_SEPARATOR, | ||
| defaultColumnSeparator, isRemoveOriginProperty); | ||
| if (Strings.isNullOrEmpty(columnSeparator)) { | ||
| throw new AnalysisException("column_separator can not be empty."); | ||
| } | ||
| columnSeparator = Separator.convertSeparator(columnSeparator); | ||
|
|
||
| lineDelimiter = getOrDefault(formatProperties, PROP_LINE_DELIMITER, | ||
| DEFAULT_LINE_DELIMITER, isRemoveOriginProperty); | ||
| if (Strings.isNullOrEmpty(lineDelimiter)) { | ||
| throw new AnalysisException("line_delimiter can not be empty."); | ||
| } | ||
| lineDelimiter = Separator.convertSeparator(lineDelimiter); | ||
|
|
||
| String enclosedString = getOrDefault(formatProperties, PROP_ENCLOSE, | ||
| "", isRemoveOriginProperty); | ||
| if (!Strings.isNullOrEmpty(enclosedString)) { | ||
| if (enclosedString.length() > 1) { | ||
| throw new AnalysisException("enclose should not be longer than one byte."); | ||
| } | ||
| enclose = (byte) enclosedString.charAt(0); | ||
| if (enclose == 0) { | ||
| throw new AnalysisException("enclose should not be byte [0]."); | ||
| } | ||
| } | ||
|
|
||
| trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties, | ||
| PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty)) | ||
| .booleanValue(); | ||
| skipLines = Integer.valueOf(getOrDefault(formatProperties, | ||
| PROP_SKIP_LINES, "0", isRemoveOriginProperty)).intValue(); | ||
| if (skipLines < 0) { | ||
| throw new AnalysisException("skipLines should not be less than 0."); | ||
| } | ||
|
|
||
| String compressTypeStr = getOrDefault(formatProperties, | ||
| PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty); | ||
| compressionType = Util.getFileCompressType(compressTypeStr); | ||
|
|
||
| } catch (org.apache.doris.common.AnalysisException e) { | ||
morningman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| throw new AnalysisException(e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public TResultFileSinkOptions toTResultFileSinkOptions() { | ||
| return null; | ||
| } | ||
|
|
||
| // The method `analyzeFileFormatProperties` must have been called once before this method | ||
| @Override | ||
| public TFileAttributes toTFileAttributes() { | ||
| TFileAttributes fileAttributes = new TFileAttributes(); | ||
| TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); | ||
| fileTextScanRangeParams.setColumnSeparator(this.columnSeparator); | ||
| fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter); | ||
| if (this.enclose != 0) { | ||
| fileTextScanRangeParams.setEnclose(this.enclose); | ||
| } | ||
| fileAttributes.setTextParams(fileTextScanRangeParams); | ||
| fileAttributes.setHeaderType(headerType); | ||
| fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes); | ||
| fileAttributes.setSkipLines(skipLines); | ||
| fileAttributes.setEnableTextValidateUtf8( | ||
| ConnectContext.get().getSessionVariable().enableTextValidateUtf8); | ||
| return fileAttributes; | ||
| } | ||
|
|
||
| public String getHeaderType() { | ||
| return headerType; | ||
| } | ||
|
|
||
| public TTextSerdeType getTextSerdeType() { | ||
| return textSerdeType; | ||
| } | ||
|
|
||
| public String getColumnSeparator() { | ||
| return columnSeparator; | ||
| } | ||
|
|
||
| public String getLineDelimiter() { | ||
| return lineDelimiter; | ||
| } | ||
|
|
||
| public boolean isTrimDoubleQuotes() { | ||
| return trimDoubleQuotes; | ||
| } | ||
|
|
||
| public int getSkipLines() { | ||
| return skipLines; | ||
| } | ||
|
|
||
| public byte getEnclose() { | ||
| return enclose; | ||
| } | ||
|
|
||
| public List<Column> getCsvSchema() { | ||
| return csvSchema; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package org.apache.doris.datasource.property.fileformat; | ||
|
|
||
| import org.apache.doris.nereids.exceptions.AnalysisException; | ||
| import org.apache.doris.thrift.TFileAttributes; | ||
| import org.apache.doris.thrift.TFileCompressType; | ||
| import org.apache.doris.thrift.TFileFormatType; | ||
| import org.apache.doris.thrift.TResultFileSinkOptions; | ||
| import org.apache.doris.thrift.TTextSerdeType; | ||
|
|
||
| import java.util.Map; | ||
|
|
||
| public abstract class FileFormatProperties { | ||
| public static final String PROP_FORMAT = "format"; | ||
| public static final String FORMAT_PARQUET = "parquet"; | ||
| public static final String FORMAT_CSV = "csv"; | ||
| public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names"; | ||
| public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; | ||
| public static final String FORMAT_HIVE_TEXT = "hive_text"; | ||
| public static final String FORMAT_ORC = "orc"; | ||
| public static final String FORMAT_JSON = "json"; | ||
| public static final String FORMAT_AVRO = "avro"; | ||
| public static final String FORMAT_WAL = "wal"; | ||
| public static final String FORMAT_ARROW = "arrow"; | ||
| public static final String PROP_COMPRESS_TYPE = "compress_type"; | ||
|
|
||
| protected TFileFormatType fileFormatType; | ||
|
|
||
| protected TFileCompressType compressionType; | ||
|
|
||
| public FileFormatProperties(TFileFormatType fileFormatType) { | ||
| this.fileFormatType = fileFormatType; | ||
| } | ||
|
|
||
| /** | ||
| * Analyze user properties | ||
| * @param formatProperties properties specified by user | ||
| * @param isRemoveOriginProperty if this param is set to true, then this method would remove the origin property | ||
| * @throws AnalysisException | ||
| */ | ||
| public abstract void analyzeFileFormatProperties( | ||
| Map<String, String> formatProperties, boolean isRemoveOriginProperty) | ||
| throws AnalysisException; | ||
|
|
||
| /** | ||
| * generate TResultFileSinkOptions according to the properties of specified file format | ||
| * You must call method `analyzeFileFormatProperties` once before calling method `toTResultFileSinkOptions` | ||
| */ | ||
| public abstract TResultFileSinkOptions toTResultFileSinkOptions(); | ||
|
|
||
| /** | ||
| * generate TFileAttributes according to the properties of specified file format | ||
| * You must call method `analyzeFileFormatProperties` once before calling method `toTFileAttributes` | ||
| */ | ||
| public abstract TFileAttributes toTFileAttributes(); | ||
|
|
||
| public static FileFormatProperties createFileFormatProperties(String formatString) { | ||
morningman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| switch (formatString) { | ||
| case FORMAT_CSV: | ||
| return new CsvFileFormatProperties(); | ||
| case FORMAT_HIVE_TEXT: | ||
| return new CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR, | ||
| TTextSerdeType.HIVE_TEXT_SERDE); | ||
| case FORMAT_CSV_WITH_NAMES: | ||
| return new CsvFileFormatProperties( | ||
| FORMAT_CSV_WITH_NAMES); | ||
| case FORMAT_CSV_WITH_NAMES_AND_TYPES: | ||
| return new CsvFileFormatProperties( | ||
| FORMAT_CSV_WITH_NAMES_AND_TYPES); | ||
| case FORMAT_PARQUET: | ||
| return new ParquetFileFormatProperties(); | ||
| case FORMAT_ORC: | ||
| return new OrcFileFormatProperties(); | ||
| case FORMAT_JSON: | ||
| return new JsonFileFormatProperties(); | ||
| case FORMAT_AVRO: | ||
| return new AvroFileFormatProperties(); | ||
| case FORMAT_WAL: | ||
| return new WalFileFormatProperties(); | ||
| default: | ||
| throw new AnalysisException("format:" + formatString + " is not supported."); | ||
| } | ||
| } | ||
|
|
||
| public static FileFormatProperties createFileFormatProperties(Map<String, String> formatProperties) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sometimes we may use the suffix of a file to guess its format.
And I think without the suffix and user specified "file_format" property, we should provide a "default" format file, eg, csv. But it depends because I am not sure what the previous logic is |
||
| throws AnalysisException { | ||
| String formatString = formatProperties.getOrDefault(PROP_FORMAT, "") | ||
| .toLowerCase(); | ||
| return createFileFormatProperties(formatString); | ||
| } | ||
|
|
||
| protected String getOrDefault(Map<String, String> props, String key, String defaultValue, | ||
| boolean isRemove) { | ||
| String value = props.getOrDefault(key, defaultValue); | ||
| if (isRemove) { | ||
| props.remove(key); | ||
| } | ||
| return value; | ||
| } | ||
|
|
||
| public TFileFormatType getFileFormatType() { | ||
| return fileFormatType; | ||
| } | ||
|
|
||
| public TFileCompressType getCompressionType() { | ||
| return compressionType; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.