diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index adf601da1ea40a..744442955c8ab9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -30,19 +30,16 @@ import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TParquetCompressionType; import org.apache.doris.thrift.TParquetDataType; import org.apache.doris.thrift.TParquetRepetitionType; import org.apache.doris.thrift.TParquetSchema; -import org.apache.doris.thrift.TParquetVersion; import org.apache.doris.thrift.TResultFileSinkOptions; import com.google.common.base.Preconditions; @@ -61,7 +58,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; // For syntax select * from tbl INTO OUTFILE xxxx public class OutFileClause { @@ -71,9 +67,6 @@ public class OutFileClause { public static final List RESULT_COL_TYPES = Lists.newArrayList(); public static final Map PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_DATA_TYPE_MAP = Maps.newHashMap(); - public static final Map PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap(); - public static final Map ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap(); - public static final Map PARQUET_VERSION_MAP = Maps.newHashMap(); public static final Set ORC_DATA_TYPE = Sets.newHashSet(); public static final String FILE_NUMBER = "FileNumber"; public static final String TOTAL_ROWS = "TotalRows"; @@ -110,24 +103,6 @@ public class OutFileClause { PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE); PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", TParquetDataType.FIXED_LEN_BYTE_ARRAY); - PARQUET_COMPRESSION_TYPE_MAP.put("snappy", TParquetCompressionType.SNAPPY); - PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP); - PARQUET_COMPRESSION_TYPE_MAP.put("brotli", TParquetCompressionType.BROTLI); - PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD); - PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4); - // arrow do not support lzo and bz2 compression type. - // PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO); - // PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2); - PARQUET_COMPRESSION_TYPE_MAP.put("plain", TParquetCompressionType.UNCOMPRESSED); - - ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN); - ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK); - ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB); - ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD); - - PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0); - PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST); - ORC_DATA_TYPE.add("bigint"); ORC_DATA_TYPE.add("boolean"); ORC_DATA_TYPE.add("double"); @@ -152,9 +127,7 @@ public class OutFileClause { public static final String PROP_DELETE_EXISTING_FILES = "delete_existing_files"; public static final String PROP_FILE_SUFFIX = "file_suffix"; public static final String PROP_WITH_BOM = "with_bom"; - public static final String COMPRESS_TYPE = "compress_type"; - private static final String PARQUET_PROP_PREFIX = "parquet."; private static final String SCHEMA = "schema"; private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB @@ -162,13 +135,8 @@ public class OutFileClause { private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; // 2GB private String filePath; - private String format; private Map properties; - // set following members after analyzing - private String columnSeparator = "\t"; - private String lineDelimiter = "\n"; - private TFileFormatType fileFormatType; private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES; private boolean deleteExistingFiles = false; private String fileSuffix = ""; @@ -184,43 +152,41 @@ public class OutFileClause { private List> orcSchemas = new ArrayList<>(); private boolean isAnalyzed = false; - private String headerType = ""; - private TParquetCompressionType parquetCompressionType = TParquetCompressionType.SNAPPY; - private TFileCompressType orcCompressionType = TFileCompressType.ZLIB; - private static final String PARQUET_DISABLE_DICTIONARY = "disable_dictionary"; - private boolean parquetDisableDictionary = false; - private static final String PARQUET_VERSION = "version"; - private static TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0; + private FileFormatProperties fileFormatProperties; public OutFileClause(String filePath, String format, Map properties) { this.filePath = filePath; - this.format = Strings.isNullOrEmpty(format) ? "csv" : format.toLowerCase(); this.properties = properties; this.isAnalyzed = false; + if (Strings.isNullOrEmpty(format)) { + fileFormatProperties = FileFormatProperties.createFileFormatProperties("csv"); + } else { + fileFormatProperties = FileFormatProperties.createFileFormatProperties(format.toLowerCase()); + } } public OutFileClause(OutFileClause other) { this.filePath = other.filePath; - this.format = other.format; + this.fileFormatProperties = other.fileFormatProperties; this.properties = other.properties == null ? null : Maps.newHashMap(other.properties); this.isAnalyzed = other.isAnalyzed; } public String getColumnSeparator() { - return columnSeparator; + return ((CsvFileFormatProperties) fileFormatProperties).getColumnSeparator(); } public String getLineDelimiter() { - return lineDelimiter; + return ((CsvFileFormatProperties) fileFormatProperties).getLineDelimiter(); } public String getHeaderType() { - return headerType; + return ((CsvFileFormatProperties) fileFormatProperties).getHeaderType(); } public TFileFormatType getFileFormatType() { - return fileFormatType; + return fileFormatProperties.getFileFormatType(); } public long getMaxFileSizeBytes() { @@ -245,28 +211,6 @@ public void analyze(Analyzer analyzer, List resultExprs, List colL } analyzeFilePath(); - switch (this.format) { - case "csv": - fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - break; - case "parquet": - fileFormatType = TFileFormatType.FORMAT_PARQUET; - break; - case "orc": - fileFormatType = TFileFormatType.FORMAT_ORC; - break; - case "csv_with_names": - headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES; - fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - break; - case "csv_with_names_and_types": - headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES; - fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - break; - default: - throw new AnalysisException("format:" + this.format + " is not supported."); - } - analyzeProperties(); if (brokerDesc != null && isLocalOutput) { @@ -559,76 +503,60 @@ private void analyzeProperties() throws UserException { if (properties == null || properties.isEmpty()) { return; } + // Copy the properties, because we will remove the key from properties. + Map copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + copiedProps.putAll(properties); - Set processedPropKeys = Sets.newHashSet(); - analyzeBrokerDesc(processedPropKeys); + analyzeBrokerDesc(copiedProps); - if (properties.containsKey(PROP_COLUMN_SEPARATOR)) { - if (!Util.isCsvFormat(fileFormatType)) { - throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only for CSV format"); - } - columnSeparator = Separator.convertSeparator(properties.get(PROP_COLUMN_SEPARATOR)); - processedPropKeys.add(PROP_COLUMN_SEPARATOR); - } - - if (properties.containsKey(PROP_LINE_DELIMITER)) { - if (!Util.isCsvFormat(fileFormatType)) { - throw new AnalysisException(PROP_LINE_DELIMITER + " is only for CSV format"); - } - lineDelimiter = Separator.convertSeparator(properties.get(PROP_LINE_DELIMITER)); - processedPropKeys.add(PROP_LINE_DELIMITER); - } + fileFormatProperties.analyzeFileFormatProperties(copiedProps, true); - if (properties.containsKey(PROP_MAX_FILE_SIZE)) { - maxFileSizeBytes = ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE)); + if (copiedProps.containsKey(PROP_MAX_FILE_SIZE)) { + maxFileSizeBytes = ParseUtil.analyzeDataVolume(copiedProps.get(PROP_MAX_FILE_SIZE)); if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) { throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes); } - processedPropKeys.add(PROP_MAX_FILE_SIZE); + copiedProps.remove(PROP_MAX_FILE_SIZE); } - if (properties.containsKey(PROP_DELETE_EXISTING_FILES)) { - deleteExistingFiles = Boolean.parseBoolean(properties.get(PROP_DELETE_EXISTING_FILES)) + if (copiedProps.containsKey(PROP_DELETE_EXISTING_FILES)) { + deleteExistingFiles = Boolean.parseBoolean(copiedProps.get(PROP_DELETE_EXISTING_FILES)) & Config.enable_delete_existing_files; - processedPropKeys.add(PROP_DELETE_EXISTING_FILES); + copiedProps.remove(PROP_DELETE_EXISTING_FILES); } - if (properties.containsKey(PROP_FILE_SUFFIX)) { - fileSuffix = properties.get(PROP_FILE_SUFFIX); - processedPropKeys.add(PROP_FILE_SUFFIX); + if (copiedProps.containsKey(PROP_FILE_SUFFIX)) { + fileSuffix = copiedProps.get(PROP_FILE_SUFFIX); + copiedProps.remove(PROP_FILE_SUFFIX); } - if (properties.containsKey(PROP_WITH_BOM)) { - withBom = Boolean.valueOf(properties.get(PROP_WITH_BOM)).booleanValue(); - processedPropKeys.add(PROP_WITH_BOM); + if (copiedProps.containsKey(PROP_WITH_BOM)) { + withBom = Boolean.valueOf(copiedProps.get(PROP_WITH_BOM)).booleanValue(); + copiedProps.remove(PROP_WITH_BOM); } - if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) { - successFileName = properties.get(PROP_SUCCESS_FILE_NAME); + if (copiedProps.containsKey(PROP_SUCCESS_FILE_NAME)) { + successFileName = copiedProps.get(PROP_SUCCESS_FILE_NAME); FeNameFormat.checkOutfileSuccessFileName("file name", successFileName); - processedPropKeys.add(PROP_SUCCESS_FILE_NAME); + copiedProps.remove(PROP_SUCCESS_FILE_NAME); } // For Azure compatibility, this is temporarily added to the map without further processing. // The validity of each provider's value will be checked later in S3Properties' check. - if (properties.containsKey(S3Properties.PROVIDER)) { - processedPropKeys.add(S3Properties.PROVIDER); + if (copiedProps.containsKey(S3Properties.PROVIDER)) { + copiedProps.remove(S3Properties.PROVIDER); } - if (this.fileFormatType == TFileFormatType.FORMAT_PARQUET) { - getParquetProperties(processedPropKeys); + if (fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_PARQUET) { + getParquetProperties(copiedProps); } - if (this.fileFormatType == TFileFormatType.FORMAT_ORC) { - getOrcProperties(processedPropKeys); + if (fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_ORC) { + getOrcProperties(copiedProps); } - if (processedPropKeys.size() != properties.size()) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} vs {}", processedPropKeys, properties); - } - throw new AnalysisException("Unknown properties: " + properties.keySet().stream() - .filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList())); + if (!copiedProps.isEmpty()) { + throw new AnalysisException("Unknown properties: " + copiedProps.keySet()); } } @@ -637,11 +565,11 @@ private void analyzeProperties() throws UserException { * 1. broker: with broker name * 2. s3: with s3 pattern path, without broker name */ - private void analyzeBrokerDesc(Set processedPropKeys) throws UserException { - String brokerName = properties.get(PROP_BROKER_NAME); + private void analyzeBrokerDesc(Map copiedProps) throws UserException { + String brokerName = copiedProps.get(PROP_BROKER_NAME); StorageBackend.StorageType storageType; - if (properties.containsKey(PROP_BROKER_NAME)) { - processedPropKeys.add(PROP_BROKER_NAME); + if (copiedProps.containsKey(PROP_BROKER_NAME)) { + copiedProps.remove(PROP_BROKER_NAME); storageType = StorageBackend.StorageType.BROKER; } else if (filePath.toUpperCase().startsWith(S3_FILE_PREFIX)) { brokerName = StorageBackend.StorageType.S3.name(); @@ -654,29 +582,32 @@ private void analyzeBrokerDesc(Set processedPropKeys) throws UserExcepti } Map brokerProps = Maps.newHashMap(); - for (Map.Entry entry : properties.entrySet()) { + Iterator> iterator = copiedProps.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) { brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue()); - processedPropKeys.add(entry.getKey()); + iterator.remove(); } else if (entry.getKey().toLowerCase().startsWith(S3Properties.S3_PREFIX) || entry.getKey().toUpperCase().startsWith(S3Properties.Env.PROPERTIES_PREFIX)) { brokerProps.put(entry.getKey(), entry.getValue()); - processedPropKeys.add(entry.getKey()); + iterator.remove(); } else if (entry.getKey().contains(HdfsResource.HADOOP_FS_NAME) && storageType == StorageBackend.StorageType.HDFS) { brokerProps.put(entry.getKey(), entry.getValue()); - processedPropKeys.add(entry.getKey()); + iterator.remove(); } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX) || entry.getKey().startsWith(HADOOP_PROP_PREFIX)) && storageType == StorageBackend.StorageType.HDFS) { brokerProps.put(entry.getKey(), entry.getValue()); - processedPropKeys.add(entry.getKey()); + iterator.remove(); } } + if (storageType == StorageBackend.StorageType.S3) { - if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) { - brokerProps.put(PropertyConverter.USE_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE)); - processedPropKeys.add(PropertyConverter.USE_PATH_STYLE); + if (copiedProps.containsKey(PropertyConverter.USE_PATH_STYLE)) { + brokerProps.put(PropertyConverter.USE_PATH_STYLE, copiedProps.get(PropertyConverter.USE_PATH_STYLE)); + copiedProps.remove(PropertyConverter.USE_PATH_STYLE); } S3Properties.requiredS3Properties(brokerProps); } else if (storageType == StorageBackend.StorageType.HDFS) { @@ -694,14 +625,6 @@ public static String getFsName(String path) { return fullPath.replace(filePath, ""); } - void setParquetVersion(String propertyValue) { - if (PARQUET_VERSION_MAP.containsKey(propertyValue)) { - this.parquetVersion = PARQUET_VERSION_MAP.get(propertyValue); - } else { - LOG.debug("not set parquet version type or is invalid, set default to PARQUET_1.0 version."); - } - } - /** * example: * SELECT citycode FROM table1 INTO OUTFILE "file:///root/doris/" @@ -713,36 +636,10 @@ void setParquetVersion(String propertyValue) { * prefix with 'parquet.' defines the properties of parquet file, * currently only supports: compression, disable_dictionary, version */ - private void getParquetProperties(Set processedPropKeys) throws AnalysisException { - // save compress type - if (properties.containsKey(COMPRESS_TYPE)) { - if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase())) { - this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get( - properties.get(COMPRESS_TYPE).toLowerCase()); - processedPropKeys.add(COMPRESS_TYPE); - } else { - throw new AnalysisException("parquet compression type [" + properties.get(COMPRESS_TYPE) - + "] is invalid, please choose one among SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN"); - } - } - - // save all parquet prefix property - Iterator> iter = properties.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { - processedPropKeys.add(entry.getKey()); - if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY)) { - this.parquetDisableDictionary = Boolean.valueOf(entry.getValue()); - } else if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_VERSION)) { - setParquetVersion(entry.getValue()); - } - } - } - + private void getParquetProperties(Map copiedProps) throws AnalysisException { // check schema. if schema is not set, Doris will gen schema by select items // Note: These codes are useless and outdated. - String schema = properties.get(SCHEMA); + String schema = copiedProps.get(SCHEMA); if (schema == null) { return; } @@ -753,43 +650,31 @@ private void getParquetProperties(Set processedPropKeys) throws Analysis schema = schema.toLowerCase(); String[] schemas = schema.split(";"); for (String item : schemas) { - String[] properties = item.split(","); - if (properties.length != 3) { + String[] fields = item.split(","); + if (fields.length != 3) { throw new AnalysisException("must only contains repetition type/column type/column name"); } - if (!PARQUET_REPETITION_TYPE_MAP.containsKey(properties[0])) { + if (!PARQUET_REPETITION_TYPE_MAP.containsKey(fields[0])) { throw new AnalysisException("unknown repetition type"); } - if (!properties[0].equalsIgnoreCase("required")) { + if (!fields[0].equalsIgnoreCase("required")) { throw new AnalysisException("currently only support required type"); } - if (!PARQUET_DATA_TYPE_MAP.containsKey(properties[1])) { - throw new AnalysisException("data type is not supported:" + properties[1]); + if (!PARQUET_DATA_TYPE_MAP.containsKey(fields[1])) { + throw new AnalysisException("data type is not supported:" + fields[1]); } TParquetSchema parquetSchema = new TParquetSchema(); - parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get(properties[0]); - parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get(properties[1]); - parquetSchema.schema_column_name = properties[2]; + parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get(fields[0]); + parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get(fields[1]); + parquetSchema.schema_column_name = fields[2]; parquetSchemas.add(parquetSchema); } - processedPropKeys.add(SCHEMA); + copiedProps.remove(SCHEMA); } - private void getOrcProperties(Set processedPropKeys) throws AnalysisException { - // get compression type - // save compress type - if (properties.containsKey(COMPRESS_TYPE)) { - if (ORC_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase())) { - this.orcCompressionType = ORC_COMPRESSION_TYPE_MAP.get(properties.get(COMPRESS_TYPE).toLowerCase()); - processedPropKeys.add(COMPRESS_TYPE); - } else { - throw new AnalysisException("orc compression type [" + properties.get(COMPRESS_TYPE) + "] is invalid," - + " please choose one among ZLIB, SNAPPY, ZSTD or PLAIN"); - } - } - + private void getOrcProperties(Map copiedProps) throws AnalysisException { // check schema. if schema is not set, Doris will gen schema by select items - String schema = properties.get(SCHEMA); + String schema = copiedProps.get(SCHEMA); if (schema == null) { return; } @@ -800,15 +685,15 @@ private void getOrcProperties(Set processedPropKeys) throws AnalysisExce schema = schema.toLowerCase(); String[] schemas = schema.split(";"); for (String item : schemas) { - String[] properties = item.split(","); - if (properties.length != 2) { + String[] fields = item.split(","); + if (fields.length != 2) { throw new AnalysisException("must only contains type and column name"); } - if (!ORC_DATA_TYPE.contains(properties[1]) && !properties[1].startsWith("decimal")) { - throw new AnalysisException("data type is not supported:" + properties[1]); - } else if (!ORC_DATA_TYPE.contains(properties[1]) && properties[1].startsWith("decimal")) { + if (!ORC_DATA_TYPE.contains(fields[1]) && !fields[1].startsWith("decimal")) { + throw new AnalysisException("data type is not supported:" + fields[1]); + } else if (!ORC_DATA_TYPE.contains(fields[1]) && fields[1].startsWith("decimal")) { String errorMsg = "Format of decimal type must be decimal(%d,%d)"; - String precisionAndScale = properties[1].substring(0, "decimal".length()).trim(); + String precisionAndScale = fields[1].substring(0, "decimal".length()).trim(); if (!precisionAndScale.startsWith("(") || !precisionAndScale.endsWith(")")) { throw new AnalysisException(errorMsg); } @@ -817,17 +702,17 @@ private void getOrcProperties(Set processedPropKeys) throws AnalysisExce throw new AnalysisException(errorMsg); } } - orcSchemas.add(Pair.of(properties[0], properties[1])); + orcSchemas.add(Pair.of(fields[0], fields[1])); } - processedPropKeys.add(SCHEMA); + copiedProps.remove(SCHEMA); } private boolean isParquetFormat() { - return fileFormatType == TFileFormatType.FORMAT_PARQUET; + return fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_PARQUET; } private boolean isOrcFormat() { - return fileFormatType == TFileFormatType.FORMAT_ORC; + return fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_ORC; } public String getFilePath() { @@ -845,7 +730,8 @@ public OutFileClause clone() { public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ").append(format); + sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ") + .append(fileFormatProperties.getFileFormatType()); if (properties != null && !properties.isEmpty()) { sb.append(" PROPERTIES("); sb.append(new PrintableMap<>(properties, " = ", true, false)); @@ -864,11 +750,10 @@ public String toDigest() { } public TResultFileSinkOptions toSinkOptions() { - TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(filePath, fileFormatType); - if (Util.isCsvFormat(fileFormatType)) { - sinkOptions.setColumnSeparator(columnSeparator); - sinkOptions.setLineDelimiter(lineDelimiter); - } + TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(filePath, + fileFormatProperties.getFileFormatType()); + fileFormatProperties.fullTResultFileSinkOptions(sinkOptions); + sinkOptions.setMaxFileSizeBytes(maxFileSizeBytes); sinkOptions.setDeleteExistingFiles(deleteExistingFiles); sinkOptions.setFileSuffix(fileSuffix); @@ -883,15 +768,10 @@ public TResultFileSinkOptions toSinkOptions() { sinkOptions.setSuccessFileName(successFileName); } if (isParquetFormat()) { - sinkOptions.setParquetCompressionType(parquetCompressionType); - sinkOptions.setParquetDisableDictionary(parquetDisableDictionary); - sinkOptions.setParquetVersion(parquetVersion); sinkOptions.setParquetSchemas(parquetSchemas); } if (isOrcFormat()) { sinkOptions.setOrcSchema(serializeOrcSchema()); - sinkOptions.setOrcCompressionType(orcCompressionType); - sinkOptions.setOrcWriterVersion(1); } return sinkOptions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index c394d9abc28709..99e226f87ab6ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -614,7 +614,11 @@ public static TFileCompressType getFileCompressType(String compressType) { return TFileCompressType.UNKNOWN; } final String upperCaseType = compressType.toUpperCase(); - return TFileCompressType.valueOf(upperCaseType); + try { + return TFileCompressType.valueOf(upperCaseType); + } catch (IllegalArgumentException e) { + return TFileCompressType.UNKNOWN; + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java new file mode 100644 index 00000000000000..6d2b799ea00c4c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import java.util.Map; + +public class AvroFileFormatProperties extends FileFormatProperties { + public AvroFileFormatProperties() { + super(TFileFormatType.FORMAT_AVRO); + } + + @Override + public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException { + } + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + } + + @Override + public TFileAttributes toTFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileAttributes.setTextParams(fileTextScanRangeParams); + return fileAttributes; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java new file mode 100644 index 00000000000000..0efea98a5c3c11 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.analysis.Separator; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.util.Util; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TResultFileSinkOptions; +import org.apache.doris.thrift.TTextSerdeType; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +public class CsvFileFormatProperties extends FileFormatProperties { + public static final Logger LOG = LogManager.getLogger( + org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class); + + public static final String DEFAULT_COLUMN_SEPARATOR = "\t"; + public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001"; + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String PROP_COLUMN_SEPARATOR = "column_separator"; + public static final String PROP_LINE_DELIMITER = "line_delimiter"; + + public static final String PROP_SKIP_LINES = "skip_lines"; + public static final String PROP_CSV_SCHEMA = "csv_schema"; + public static final String PROP_COMPRESS_TYPE = "compress_type"; + public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; + + public static final String PROP_ENCLOSE = "enclose"; + + private String headerType = ""; + private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE; + private String columnSeparator = DEFAULT_COLUMN_SEPARATOR; + private String lineDelimiter = DEFAULT_LINE_DELIMITER; + private boolean trimDoubleQuotes; + private int skipLines; + private byte enclose; + + // used by tvf + // User specified csv columns, it will override columns got from file + private final List csvSchema = Lists.newArrayList(); + + String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR; + + public CsvFileFormatProperties() { + super(TFileFormatType.FORMAT_CSV_PLAIN); + } + + public CsvFileFormatProperties(String defaultColumnSeparator, TTextSerdeType textSerdeType) { + super(TFileFormatType.FORMAT_CSV_PLAIN); + this.defaultColumnSeparator = defaultColumnSeparator; + this.textSerdeType = textSerdeType; + } + + public CsvFileFormatProperties(String headerType) { + super(TFileFormatType.FORMAT_CSV_PLAIN); + this.headerType = headerType; + } + + + @Override + public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException { + try { + // analyze properties specified by user + columnSeparator = getOrDefault(formatProperties, PROP_COLUMN_SEPARATOR, + defaultColumnSeparator, isRemoveOriginProperty); + if (Strings.isNullOrEmpty(columnSeparator)) { + throw new AnalysisException("column_separator can not be empty."); + } + columnSeparator = Separator.convertSeparator(columnSeparator); + + lineDelimiter = getOrDefault(formatProperties, PROP_LINE_DELIMITER, + DEFAULT_LINE_DELIMITER, isRemoveOriginProperty); + if (Strings.isNullOrEmpty(lineDelimiter)) { + throw new AnalysisException("line_delimiter can not be empty."); + } + lineDelimiter = Separator.convertSeparator(lineDelimiter); + + String enclosedString = getOrDefault(formatProperties, PROP_ENCLOSE, + "", isRemoveOriginProperty); + if (!Strings.isNullOrEmpty(enclosedString)) { + if (enclosedString.length() > 1) { + throw new AnalysisException("enclose should not be longer than one byte."); + } + enclose = (byte) enclosedString.charAt(0); + if (enclose == 0) { + throw new AnalysisException("enclose should not be byte [0]."); + } + } + + trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties, + PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty)) + .booleanValue(); + skipLines = Integer.valueOf(getOrDefault(formatProperties, + PROP_SKIP_LINES, "0", isRemoveOriginProperty)).intValue(); + if (skipLines < 0) { + throw new AnalysisException("skipLines should not be less than 0."); + } + + String compressTypeStr = getOrDefault(formatProperties, + PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty); + compressionType = Util.getFileCompressType(compressTypeStr); + + } catch (org.apache.doris.common.AnalysisException e) { + throw new AnalysisException(e.getMessage()); + } + } + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + sinkOptions.setColumnSeparator(columnSeparator); + sinkOptions.setLineDelimiter(lineDelimiter); + } + + // The method `analyzeFileFormatProperties` must have been called once before this method + @Override + public TFileAttributes toTFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileTextScanRangeParams.setColumnSeparator(this.columnSeparator); + fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter); + if (this.enclose != 0) { + fileTextScanRangeParams.setEnclose(this.enclose); + } + fileAttributes.setTextParams(fileTextScanRangeParams); + fileAttributes.setHeaderType(headerType); + fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes); + fileAttributes.setSkipLines(skipLines); + fileAttributes.setEnableTextValidateUtf8( + ConnectContext.get().getSessionVariable().enableTextValidateUtf8); + return fileAttributes; + } + + public String getHeaderType() { + return headerType; + } + + public TTextSerdeType getTextSerdeType() { + return textSerdeType; + } + + public String getColumnSeparator() { + return columnSeparator; + } + + public String getLineDelimiter() { + return lineDelimiter; + } + + public boolean isTrimDoubleQuotes() { + return trimDoubleQuotes; + } + + public int getSkipLines() { + return skipLines; + } + + public byte getEnclose() { + return enclose; + } + + public List getCsvSchema() { + return csvSchema; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java new file mode 100644 index 00000000000000..bd0ecc214c6f61 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TResultFileSinkOptions; +import org.apache.doris.thrift.TTextSerdeType; + +import java.util.Map; + +public abstract class FileFormatProperties { + public static final String PROP_FORMAT = "format"; + public static final String FORMAT_PARQUET = "parquet"; + public static final String FORMAT_CSV = "csv"; + public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names"; + public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; + public static final String FORMAT_HIVE_TEXT = "hive_text"; + public static final String FORMAT_ORC = "orc"; + public static final String FORMAT_JSON = "json"; + public static final String FORMAT_AVRO = "avro"; + public static final String FORMAT_WAL = "wal"; + public static final String FORMAT_ARROW = "arrow"; + public static final String PROP_COMPRESS_TYPE = "compress_type"; + + protected TFileFormatType fileFormatType; + + protected TFileCompressType compressionType; + + public FileFormatProperties(TFileFormatType fileFormatType) { + this.fileFormatType = fileFormatType; + } + + /** + * Analyze user properties + * @param formatProperties properties specified by user + * @param isRemoveOriginProperty if this param is set to true, then this method would remove the origin property + * @throws AnalysisException + */ + public abstract void analyzeFileFormatProperties( + Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException; + + /** + * generate TResultFileSinkOptions according to the properties of specified file format + * You must call method `analyzeFileFormatProperties` once before calling method `toTResultFileSinkOptions` + */ + public abstract void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions); + + /** + * generate TFileAttributes according to the properties of specified file format + * You must call method `analyzeFileFormatProperties` once before calling method `toTFileAttributes` + */ + public abstract TFileAttributes toTFileAttributes(); + + public static FileFormatProperties createFileFormatProperties(String formatString) { + switch (formatString) { + case FORMAT_CSV: + return new CsvFileFormatProperties(); + case FORMAT_HIVE_TEXT: + return new CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR, + TTextSerdeType.HIVE_TEXT_SERDE); + case FORMAT_CSV_WITH_NAMES: + return new CsvFileFormatProperties( + FORMAT_CSV_WITH_NAMES); + case FORMAT_CSV_WITH_NAMES_AND_TYPES: + return new CsvFileFormatProperties( + FORMAT_CSV_WITH_NAMES_AND_TYPES); + case FORMAT_PARQUET: + return new ParquetFileFormatProperties(); + case FORMAT_ORC: + return new OrcFileFormatProperties(); + case FORMAT_JSON: + return new JsonFileFormatProperties(); + case FORMAT_AVRO: + return new AvroFileFormatProperties(); + case FORMAT_WAL: + return new WalFileFormatProperties(); + default: + throw new AnalysisException("format:" + formatString + " is not supported."); + } + } + + public static FileFormatProperties createFileFormatProperties(Map formatProperties) + throws AnalysisException { + String formatString = formatProperties.getOrDefault(PROP_FORMAT, "") + .toLowerCase(); + return createFileFormatProperties(formatString); + } + + protected String getOrDefault(Map props, String key, String defaultValue, + boolean isRemove) { + String value = props.getOrDefault(key, defaultValue); + if (isRemove) { + props.remove(key); + } + return value; + } + + public TFileFormatType getFileFormatType() { + return fileFormatType; + } + + public TFileCompressType getCompressionType() { + return compressionType; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java new file mode 100644 index 00000000000000..3431d366f8b547 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.common.util.Util; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import java.util.Map; + +public class JsonFileFormatProperties extends FileFormatProperties { + public static final String PROP_JSON_ROOT = "json_root"; + public static final String PROP_JSON_PATHS = "jsonpaths"; + public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line"; + public static final String PROP_NUM_AS_STRING = "num_as_string"; + public static final String PROP_FUZZY_PARSE = "fuzzy_parse"; + + // from ExternalFileTableValuedFunction: + private String jsonRoot = ""; + private String jsonPaths = ""; + private boolean stripOuterArray; + private boolean readJsonByLine; + private boolean numAsString; + private boolean fuzzyParse; + + + public JsonFileFormatProperties() { + super(TFileFormatType.FORMAT_JSON); + } + + @Override + public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException { + jsonRoot = getOrDefault(formatProperties, PROP_JSON_ROOT, + "", isRemoveOriginProperty); + jsonPaths = getOrDefault(formatProperties, PROP_JSON_PATHS, + "", isRemoveOriginProperty); + readJsonByLine = Boolean.valueOf( + getOrDefault(formatProperties, PROP_READ_JSON_BY_LINE, + "", isRemoveOriginProperty)).booleanValue(); + stripOuterArray = Boolean.valueOf( + getOrDefault(formatProperties, PROP_STRIP_OUTER_ARRAY, + "", isRemoveOriginProperty)).booleanValue(); + numAsString = Boolean.valueOf( + getOrDefault(formatProperties, PROP_NUM_AS_STRING, + "", isRemoveOriginProperty)).booleanValue(); + fuzzyParse = Boolean.valueOf( + getOrDefault(formatProperties, PROP_FUZZY_PARSE, + "", isRemoveOriginProperty)).booleanValue(); + + String compressTypeStr = getOrDefault(formatProperties, PROP_COMPRESS_TYPE, + "UNKNOWN", isRemoveOriginProperty); + compressionType = Util.getFileCompressType(compressTypeStr); + } + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + } + + @Override + public TFileAttributes toTFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileTextScanRangeParams.setLineDelimiter(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER); + fileAttributes.setTextParams(fileTextScanRangeParams); + fileAttributes.setJsonRoot(jsonRoot); + fileAttributes.setJsonpaths(jsonPaths); + fileAttributes.setReadJsonByLine(readJsonByLine); + fileAttributes.setStripOuterArray(stripOuterArray); + fileAttributes.setNumAsString(numAsString); + fileAttributes.setFuzzyParse(fuzzyParse); + return fileAttributes; + } + + public String getJsonRoot() { + return jsonRoot; + } + + public String getJsonPaths() { + return jsonPaths; + } + + public boolean isStripOuterArray() { + return stripOuterArray; + } + + public boolean isReadJsonByLine() { + return readJsonByLine; + } + + public boolean isNumAsString() { + return numAsString; + } + + public boolean isFuzzyParse() { + return fuzzyParse; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java new file mode 100644 index 00000000000000..412c3d237e8cde --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class OrcFileFormatProperties extends FileFormatProperties { + public static final Map ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap(); + + static { + ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN); + ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK); + ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB); + ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD); + } + + private TFileCompressType orcCompressionType = TFileCompressType.ZLIB; + + public OrcFileFormatProperties() { + super(TFileFormatType.FORMAT_ORC); + } + + @Override + public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException { + // get compression type + // save compress type + if (formatProperties.containsKey(PROP_COMPRESS_TYPE)) { + if (ORC_COMPRESSION_TYPE_MAP.containsKey( + formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase())) { + this.orcCompressionType = ORC_COMPRESSION_TYPE_MAP.get( + formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase()); + formatProperties.remove(PROP_COMPRESS_TYPE); + } else { + throw new AnalysisException("orc compression type [" + + formatProperties.get(PROP_COMPRESS_TYPE) + "] is invalid," + + " please choose one among ZLIB, SNAPPY, ZSTD or PLAIN"); + } + } + } + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + sinkOptions.setOrcCompressionType(orcCompressionType); + sinkOptions.setOrcWriterVersion(1); + } + + @Override + public TFileAttributes toTFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileAttributes.setTextParams(fileTextScanRangeParams); + return fileAttributes; + } + + public TFileCompressType getOrcCompressionType() { + return orcCompressionType; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java new file mode 100644 index 00000000000000..8063b25964a230 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TParquetCompressionType; +import org.apache.doris.thrift.TParquetVersion; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +public class ParquetFileFormatProperties extends FileFormatProperties { + public static final String PARQUET_DISABLE_DICTIONARY = "disable_dictionary"; + public static final String PARQUET_VERSION = "version"; + public static final String PARQUET_PROP_PREFIX = "parquet."; + + public static final Logger LOG = LogManager.getLogger(ParquetFileFormatProperties.class); + public static final Map PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap(); + public static final Map PARQUET_VERSION_MAP = Maps.newHashMap(); + + static { + PARQUET_COMPRESSION_TYPE_MAP.put("snappy", TParquetCompressionType.SNAPPY); + PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP); + PARQUET_COMPRESSION_TYPE_MAP.put("brotli", TParquetCompressionType.BROTLI); + PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD); + PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4); + // arrow do not support lzo and bz2 compression type. + // PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO); + // PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2); + PARQUET_COMPRESSION_TYPE_MAP.put("plain", TParquetCompressionType.UNCOMPRESSED); + + PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0); + PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST); + } + + private TParquetCompressionType parquetCompressionType = TParquetCompressionType.SNAPPY; + private boolean parquetDisableDictionary = false; + private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0; + + public ParquetFileFormatProperties() { + super(TFileFormatType.FORMAT_PARQUET); + } + + @Override + public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException { + // save compress type + if (formatProperties.containsKey(PROP_COMPRESS_TYPE)) { + if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(formatProperties.get(PROP_COMPRESS_TYPE) + .toLowerCase())) { + this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get( + formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase()); + formatProperties.remove(PROP_COMPRESS_TYPE); + } else { + throw new AnalysisException("parquet compression type [" + + formatProperties.get(PROP_COMPRESS_TYPE) + + "] is invalid, please choose one among SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN"); + } + } + + // save all parquet prefix property + Iterator> iter = formatProperties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) { + iter.remove(); + if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()) + .equals(PARQUET_DISABLE_DICTIONARY)) { + this.parquetDisableDictionary = Boolean.valueOf(entry.getValue()); + } else if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()) + .equals(PARQUET_VERSION)) { + if (PARQUET_VERSION_MAP.containsKey(entry.getValue())) { + this.parquetVersion = PARQUET_VERSION_MAP.get(entry.getValue()); + } else { + LOG.debug("not set parquet version type or is invalid, set default to PARQUET_1.0 version."); + } + } + } + } + } + + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + sinkOptions.setParquetCompressionType(parquetCompressionType); + sinkOptions.setParquetDisableDictionary(parquetDisableDictionary); + sinkOptions.setParquetVersion(parquetVersion); + } + + @Override + public TFileAttributes toTFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileAttributes.setTextParams(fileTextScanRangeParams); + return fileAttributes; + } + + public TParquetCompressionType getParquetCompressionType() { + return parquetCompressionType; + } + + public boolean isParquetDisableDictionary() { + return parquetDisableDictionary; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java new file mode 100644 index 00000000000000..0c6b1777cf62ad --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import java.util.Map; + +public class WalFileFormatProperties extends FileFormatProperties { + public WalFileFormatProperties() { + super(TFileFormatType.FORMAT_WAL); + } + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + } + + @Override + public TFileAttributes toTFileAttributes() { + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); + fileAttributes.setTextParams(fileTextScanRangeParams); + return fileAttributes; + } + + @Override + public void analyzeFileFormatProperties(Map formatProperties, boolean isRemoveOriginProperty) + throws AnalysisException { + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java index 3dbd5bc2115e25..2f8f9ae8ab1752 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Column; import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.common.util.Util; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; @@ -69,11 +70,13 @@ private String genNames(ArrayList headerNames, String columnSeparator, S public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause, ArrayList labels) { this(exchNodeId, outFileClause); - if (outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES) - || outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) { - header = genNames(labels, outFileClause.getColumnSeparator(), outFileClause.getLineDelimiter()); + if (Util.isCsvFormat(outFileClause.getFileFormatType())) { + if (outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + || outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) { + header = genNames(labels, outFileClause.getColumnSeparator(), outFileClause.getLineDelimiter()); + } + headerType = outFileClause.getHeaderType(); } - headerType = outFileClause.getHeaderType(); } public String getBrokerName() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 5a15ccd21f50d5..9ab6d302c3e13c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -18,7 +18,6 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; @@ -41,6 +40,8 @@ import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; import org.apache.doris.datasource.tvf.source.TVFScanNode; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.PlanNodeId; @@ -63,15 +64,12 @@ import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; -import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPrimitiveType; import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TTextSerdeType; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -110,25 +108,9 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio protected Map locationProperties = Maps.newHashMap(); protected String filePath; - protected TFileFormatType fileFormatType; - protected Optional resourceName = Optional.empty(); - private TFileCompressType compressionType; - private String headerType = ""; - - private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE; - private String columnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; - private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER; - private byte enclose = 0; - private String jsonRoot = ""; - private String jsonPaths = ""; - private boolean stripOuterArray; - private boolean readJsonByLine; - private boolean numAsString; - private boolean fuzzyParse; - private boolean trimDoubleQuotes; - private int skipLines; + public FileFormatProperties fileFormatProperties; private long tableId; public abstract TFileType getTFileType(); @@ -138,11 +120,11 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio public abstract BrokerDesc getBrokerDesc(); public TFileFormatType getTFileFormatType() { - return fileFormatType; + return fileFormatProperties.getFileFormatType(); } public TFileCompressType getTFileCompressType() { - return compressionType; + return fileFormatProperties.getCompressionType(); } public Map getLocationProperties() { @@ -179,94 +161,15 @@ protected Map parseCommonProperties(Map properti Map copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); copiedProps.putAll(mergedProperties); - String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, "").toLowerCase(); - String defaultColumnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; - switch (formatString) { - case "csv": - this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - break; - case "hive_text": - this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - defaultColumnSeparator = FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR; - this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE; - break; - case "csv_with_names": - this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES; - this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - break; - case "csv_with_names_and_types": - this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES; - this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - break; - case "parquet": - this.fileFormatType = TFileFormatType.FORMAT_PARQUET; - break; - case "orc": - this.fileFormatType = TFileFormatType.FORMAT_ORC; - break; - case "json": - this.fileFormatType = TFileFormatType.FORMAT_JSON; - break; - case "avro": - this.fileFormatType = TFileFormatType.FORMAT_AVRO; - break; - case "wal": - this.fileFormatType = TFileFormatType.FORMAT_WAL; - break; - default: - throw new AnalysisException("format:" + formatString + " is not supported."); - } - tableId = Long.valueOf(getOrDefaultAndRemove(copiedProps, PROP_TABLE_ID, "-1")); - columnSeparator = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COLUMN_SEPARATOR, - defaultColumnSeparator); - if (Strings.isNullOrEmpty(columnSeparator)) { - throw new AnalysisException("column_separator can not be empty."); - } - columnSeparator = Separator.convertSeparator(columnSeparator); - - lineDelimiter = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_LINE_DELIMITER, - FileFormatConstants.DEFAULT_LINE_DELIMITER); - if (Strings.isNullOrEmpty(lineDelimiter)) { - throw new AnalysisException("line_delimiter can not be empty."); - } - lineDelimiter = Separator.convertSeparator(lineDelimiter); - String enclosedString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_ENCLOSE, ""); - if (!Strings.isNullOrEmpty(enclosedString)) { - if (enclosedString.length() > 1) { - throw new AnalysisException("enclose should not be longer than one byte."); - } - enclose = (byte) enclosedString.charAt(0); - if (enclose == 0) { - throw new AnalysisException("enclose should not be byte [0]."); - } - } + String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, "").toLowerCase(); + fileFormatProperties = FileFormatProperties.createFileFormatProperties(formatString); + fileFormatProperties.analyzeFileFormatProperties(copiedProps, true); - jsonRoot = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_ROOT, ""); - jsonPaths = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_PATHS, ""); - readJsonByLine = Boolean.valueOf( - getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue(); - stripOuterArray = Boolean.valueOf( - getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue(); - numAsString = Boolean.valueOf( - getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue(); - fuzzyParse = Boolean.valueOf( - getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue(); - trimDoubleQuotes = Boolean.valueOf( - getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue(); - skipLines = Integer.valueOf( - getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_SKIP_LINES, "0")).intValue(); - - String compressTypeStr = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN"); - try { - compressionType = Util.getFileCompressType(compressTypeStr); - } catch (IllegalArgumentException e) { - throw new AnalysisException("Compress type : " + compressTypeStr + " is not supported."); - } - if (FileFormatUtils.isCsv(formatString)) { + if (fileFormatProperties instanceof CsvFileFormatProperties) { FileFormatUtils.parseCsvSchema(csvSchema, getOrDefaultAndRemove(copiedProps, - FileFormatConstants.PROP_CSV_SCHEMA, "")); + CsvFileFormatProperties.PROP_CSV_SCHEMA, "")); if (LOG.isDebugEnabled()) { LOG.debug("get csv schema: {}", csvSchema); } @@ -293,29 +196,7 @@ public List getFileStatuses() { } public TFileAttributes getFileAttributes() { - TFileAttributes fileAttributes = new TFileAttributes(); - TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams(); - fileTextScanRangeParams.setColumnSeparator(this.columnSeparator); - fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter); - if (enclose != 0) { - fileTextScanRangeParams.setEnclose(enclose); - } - fileAttributes.setTextParams(fileTextScanRangeParams); - if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) { - fileAttributes.setHeaderType(this.headerType); - fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes); - fileAttributes.setSkipLines(skipLines); - fileAttributes.setEnableTextValidateUtf8( - ConnectContext.get().getSessionVariable().enableTextValidateUtf8); - } else if (this.fileFormatType == TFileFormatType.FORMAT_JSON) { - fileAttributes.setJsonRoot(jsonRoot); - fileAttributes.setJsonpaths(jsonPaths); - fileAttributes.setReadJsonByLine(readJsonByLine); - fileAttributes.setStripOuterArray(stripOuterArray); - fileAttributes.setNumAsString(numAsString); - fileAttributes.setFuzzyParse(fuzzyParse); - } - return fileAttributes; + return fileFormatProperties.toTFileAttributes(); } @Override @@ -345,7 +226,7 @@ public List getTableColumns() throws AnalysisException { throw new AnalysisException("No Alive backends"); } - if (this.fileFormatType == TFileFormatType.FORMAT_WAL) { + if (fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_WAL) { List fileColumns = new ArrayList<>(); Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); List tableColumns = table.getBaseSchema(true); @@ -452,7 +333,7 @@ private Pair getColumnType(List typeNodes, int start) Pair fieldType = getColumnType(typeNodes, start + parsedNodes); PStructField structField = typeNodes.get(start).getStructFields(i); fields.add(new StructField(structField.getName(), fieldType.key(), structField.getComment(), - structField.getContainsNull())); + structField.getContainsNull())); parsedNodes += fieldType.value(); } type = new StructType(fields); @@ -488,9 +369,11 @@ private void fillColumns(InternalService.PFetchTableSchemaResult result) { private PFetchTableSchemaRequest getFetchTableStructureRequest() throws TException { // set TFileScanRangeParams TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams(); - fileScanRangeParams.setFormatType(fileFormatType); + fileScanRangeParams.setFormatType(fileFormatProperties.getFileFormatType()); fileScanRangeParams.setProperties(locationProperties); - fileScanRangeParams.setTextSerdeType(textSerdeType); + if (fileFormatProperties instanceof CsvFileFormatProperties) { + fileScanRangeParams.setTextSerdeType(((CsvFileFormatProperties) fileFormatProperties).getTextSerdeType()); + } fileScanRangeParams.setFileAttributes(getFileAttributes()); ConnectContext ctx = ConnectContext.get(); fileScanRangeParams.setLoadId(ctx.queryId()); @@ -529,7 +412,8 @@ private PFetchTableSchemaRequest getFetchTableStructureRequest() throws TExcepti TFileRangeDesc fileRangeDesc = new TFileRangeDesc(); fileRangeDesc.setLoadId(ctx.queryId()); fileRangeDesc.setFileType(getTFileType()); - fileRangeDesc.setCompressType(Util.getOrInferCompressType(compressionType, firstFile.getPath())); + fileRangeDesc.setCompressType(Util.getOrInferCompressType( + fileFormatProperties.getCompressionType(), firstFile.getPath())); fileRangeDesc.setPath(firstFile.getPath()); fileRangeDesc.setStartOffset(0); fileRangeDesc.setSize(firstFile.getSize()); @@ -547,9 +431,10 @@ private boolean isFileContentEmpty(TBrokerFileStatus fileStatus) { if (fileStatus.isIsDir() || fileStatus.size == 0) { return true; } - if (Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON) { + if (Util.isCsvFormat(fileFormatProperties.getFileFormatType()) + || fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_JSON) { int magicNumberBytes = 0; - switch (compressionType) { + switch (fileFormatProperties.getCompressionType()) { case GZ: magicNumberBytes = 20; break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java index 5044f045c3130e..72573a2355f451 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java @@ -40,9 +40,9 @@ public HttpStreamTableValuedFunction(Map properties) throws Anal // 1. analyze common properties super.parseCommonProperties(properties); - if (fileFormatType == TFileFormatType.FORMAT_PARQUET - || fileFormatType == TFileFormatType.FORMAT_AVRO - || fileFormatType == TFileFormatType.FORMAT_ORC) { + if (fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_PARQUET + || fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_AVRO + || fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_ORC) { throw new AnalysisException("http_stream does not yet support parquet, avro and orc"); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java new file mode 100644 index 00000000000000..a7fc534e0de5cc --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + + +public class AvroFileFormatPropertiesTest { + + private AvroFileFormatProperties avroFileFormatProperties; + + @Before + public void setUp() { + avroFileFormatProperties = new AvroFileFormatProperties(); + } + + @Test + public void testAnalyzeFileFormatProperties() { + Map properties = new HashMap<>(); + // Add properties if needed + avroFileFormatProperties.analyzeFileFormatProperties(properties, true); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java new file mode 100644 index 00000000000000..a496378b5e57ea --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileCompressType; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class CsvFileFormatPropertiesTest { + + private CsvFileFormatProperties csvFileFormatProperties; + + @Before + public void setUp() { + csvFileFormatProperties = new CsvFileFormatProperties(); + } + + @Test + public void testAnalyzeFileFormatPropertiesValid() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, ","); + properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "\n"); + properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "1"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + + Assert.assertEquals(",", csvFileFormatProperties.getColumnSeparator()); + Assert.assertEquals("\n", csvFileFormatProperties.getLineDelimiter()); + Assert.assertEquals(1, csvFileFormatProperties.getSkipLines()); + } + + @Test + public void testAnalyzeFileFormatPropertiesInvalidSeparator() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, ""); + + Assert.assertThrows(AnalysisException.class, () -> { + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + }); + } + + @Test + public void testAnalyzeFileFormatPropertiesInvalidLineDelimiter() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, ""); + + Assert.assertThrows(AnalysisException.class, () -> { + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + }); + } + + @Test + public void testAnalyzeFileFormatPropertiesInvalidEnclose() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "invalid"); + + Assert.assertThrows(AnalysisException.class, () -> { + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + }); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidEnclose() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "\""); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals((byte) '"', csvFileFormatProperties.getEnclose()); + } + + @Test + public void testAnalyzeFileFormatPropertiesSkipLinesNegative() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "-1"); + + Assert.assertThrows(AnalysisException.class, () -> { + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + }); + } + + @Test + public void testAnalyzeFileFormatPropertiesSkipLinesLargeValue() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "1000"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(1000, csvFileFormatProperties.getSkipLines()); + } + + @Test + public void testAnalyzeFileFormatPropertiesTrimDoubleQuotesTrue() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, "true"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(true, csvFileFormatProperties.isTrimDoubleQuotes()); + } + + @Test + public void testAnalyzeFileFormatPropertiesTrimDoubleQuotesFalse() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, "false"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(false, csvFileFormatProperties.isTrimDoubleQuotes()); + } + + @Test + public void testAnalyzeFileFormatPropertiesInvalidCompressType() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "invalid"); + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.UNKNOWN, csvFileFormatProperties.getCompressionType()); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidCompressType() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "gz"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.GZ, csvFileFormatProperties.getCompressionType()); + } + + @Test + public void testAnalyzeFileFormatPropertiesEmptyCsvSchema() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_CSV_SCHEMA, ""); + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidEncloseMultipleCharacters() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "\"\""); + + Assert.assertThrows(AnalysisException.class, () -> { + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + }); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidEncloseEmpty() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_ENCLOSE, ""); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(0, csvFileFormatProperties.getEnclose()); + } + + @Test + public void testAnalyzeFileFormatPropertiesSkipLinesAsString() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "abc"); + + Assert.assertThrows(NumberFormatException.class, () -> { + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + }); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidColumnSeparator() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, ";"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(";", csvFileFormatProperties.getColumnSeparator()); + } + + @Test + public void testAnalyzeFileFormatPropertiesLineDelimiterAsString() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "abc"); + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidLineDelimiter() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "\r\n"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals("\r\n", csvFileFormatProperties.getLineDelimiter()); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidTrimDoubleQuotes() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, "true"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(true, csvFileFormatProperties.isTrimDoubleQuotes()); + } + + @Test + public void testAnalyzeFileFormatPropertiesInvalidTrimDoubleQuotes() { + Map properties = new HashMap<>(); + properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, "invalid"); + + csvFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(false, csvFileFormatProperties.isTrimDoubleQuotes()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java new file mode 100644 index 00000000000000..74d8d0db2ad19b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; + +import org.junit.Assert; +import org.junit.Test; + + +public class FileFormatPropertiesTest { + + @Test + public void testCreateFileFormatPropertiesInvalidFormat() { + Assert.assertThrows(AnalysisException.class, () -> { + FileFormatProperties.createFileFormatProperties("invalid_format"); + }); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java new file mode 100644 index 00000000000000..f614d3223866f1 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class JsonFileFormatPropertiesTest { + + private JsonFileFormatProperties jsonFileFormatProperties; + + @Before + public void setUp() { + jsonFileFormatProperties = new JsonFileFormatProperties(); + } + + @Test + public void testAnalyzeFileFormatPropertiesEmpty() throws AnalysisException { + Map properties = new HashMap<>(); + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + + Assert.assertEquals("", jsonFileFormatProperties.getJsonRoot()); + Assert.assertEquals("", jsonFileFormatProperties.getJsonPaths()); + Assert.assertEquals(false, jsonFileFormatProperties.isStripOuterArray()); + Assert.assertEquals(false, jsonFileFormatProperties.isReadJsonByLine()); + Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString()); + Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse()); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidJsonRoot() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_JSON_ROOT, "data.items"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals("data.items", jsonFileFormatProperties.getJsonRoot()); + } + + @Test + public void testAnalyzeFileFormatPropertiesValidJsonPaths() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, + "[\"$.name\", \"$.age\", \"$.city\"]"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals("[\"$.name\", \"$.age\", \"$.city\"]", jsonFileFormatProperties.getJsonPaths()); + } + + @Test + public void testAnalyzeFileFormatPropertiesStripOuterArrayTrue() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, "true"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(true, jsonFileFormatProperties.isStripOuterArray()); + } + + @Test + public void testAnalyzeFileFormatPropertiesStripOuterArrayFalse() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, "false"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(false, jsonFileFormatProperties.isStripOuterArray()); + } + + @Test + public void testAnalyzeFileFormatPropertiesReadJsonByLineTrue() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, "true"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine()); + } + + @Test + public void testAnalyzeFileFormatPropertiesReadJsonByLineFalse() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, "false"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(false, jsonFileFormatProperties.isReadJsonByLine()); + } + + @Test + public void testAnalyzeFileFormatPropertiesNumAsStringTrue() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString()); + } + + @Test + public void testAnalyzeFileFormatPropertiesNumAsStringFalse() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "false"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString()); + } + + @Test + public void testAnalyzeFileFormatPropertiesFuzzyParseTrue() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse()); + } + + @Test + public void testAnalyzeFileFormatPropertiesFuzzyParseFalse() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse()); + } + + @Test + public void testAnalyzeFileFormatPropertiesInvalidBooleanValue() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "invalid"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse()); + } + + @Test + public void testAnalyzeFileFormatPropertiesAllProperties() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_JSON_ROOT, "data.records"); + properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, "[\"$.id\", \"$.name\"]"); + properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, "true"); + properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, "true"); + properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true"); + properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + + Assert.assertEquals("data.records", jsonFileFormatProperties.getJsonRoot()); + Assert.assertEquals("[\"$.id\", \"$.name\"]", jsonFileFormatProperties.getJsonPaths()); + Assert.assertEquals(true, jsonFileFormatProperties.isStripOuterArray()); + Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine()); + Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString()); + Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse()); + } + + @Test + public void testAnalyzeFileFormatPropertiesSpecialCharactersInJsonRoot() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_JSON_ROOT, "data.special@#$%^&*()"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals("data.special@#$%^&*()", jsonFileFormatProperties.getJsonRoot()); + } + + @Test + public void testAnalyzeFileFormatPropertiesComplexJsonPaths() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, + "[\"$.deeply.nested[0].array[*].field\", \"$.complex.path[?(@.type=='value')]\"]"); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals("[\"$.deeply.nested[0].array[*].field\", \"$.complex.path[?(@.type=='value')]\"]", + jsonFileFormatProperties.getJsonPaths()); + } + + @Test + public void testAnalyzeFileFormatPropertiesEmptyJsonPaths() throws AnalysisException { + Map properties = new HashMap<>(); + properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, ""); + + jsonFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals("", jsonFileFormatProperties.getJsonPaths()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java new file mode 100644 index 00000000000000..0a63d0cec69b6f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class OrcFileFormatPropertiesTest { + + private OrcFileFormatProperties orcFileFormatProperties; + + @Before + public void setUp() { + orcFileFormatProperties = new OrcFileFormatProperties(); + } + + @Test + public void testAnalyzeFileFormatProperties() { + Map properties = new HashMap<>(); + // Add properties if needed + orcFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.ZLIB, orcFileFormatProperties.getOrcCompressionType()); + } + + @Test + public void testSupportedCompressionTypes() { + Map properties = new HashMap<>(); + properties.put("compress_type", "plain"); + orcFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.PLAIN, orcFileFormatProperties.getOrcCompressionType()); + + properties.put("compress_type", "snappy"); + orcFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.SNAPPYBLOCK, orcFileFormatProperties.getOrcCompressionType()); + + properties.put("compress_type", "zlib"); + orcFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.ZLIB, orcFileFormatProperties.getOrcCompressionType()); + + properties.put("compress_type", "zstd"); + orcFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.ZSTD, orcFileFormatProperties.getOrcCompressionType()); + } + + @Test + public void testCompressionTypeCaseInsensitive() { + Map properties = new HashMap<>(); + properties.put("compress_type", "SNAPPY"); + orcFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TFileCompressType.SNAPPYBLOCK, orcFileFormatProperties.getOrcCompressionType()); + } + + @Test(expected = org.apache.doris.nereids.exceptions.AnalysisException.class) + public void testInvalidCompressionType() { + Map properties = new HashMap<>(); + properties.put("compress_type", "invalid_type"); + orcFileFormatProperties.analyzeFileFormatProperties(properties, true); + } + + @Test + public void testFullTResultFileSinkOptions() { + TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(); + orcFileFormatProperties.fullTResultFileSinkOptions(sinkOptions); + Assert.assertEquals(orcFileFormatProperties.getOrcCompressionType(), sinkOptions.getOrcCompressionType()); + Assert.assertEquals(1, sinkOptions.getOrcWriterVersion()); + } + + @Test + public void testToTFileAttributes() { + TFileAttributes attrs = orcFileFormatProperties.toTFileAttributes(); + Assert.assertNotNull(attrs); + Assert.assertNotNull(attrs.getTextParams()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java new file mode 100644 index 00000000000000..754d857613f61a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TParquetCompressionType; +import org.apache.doris.thrift.TParquetVersion; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class ParquetFileFormatPropertiesTest { + + private ParquetFileFormatProperties parquetFileFormatProperties; + + @Before + public void setUp() { + parquetFileFormatProperties = new ParquetFileFormatProperties(); + } + + @Test + public void testAnalyzeFileFormatProperties() { + Map properties = new HashMap<>(); + // Add properties if needed + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + + Assert.assertEquals(TParquetCompressionType.SNAPPY, parquetFileFormatProperties.getParquetCompressionType()); + Assert.assertEquals(false, parquetFileFormatProperties.isParquetDisableDictionary()); + } + + @Test + public void testSupportedCompressionTypes() { + Map properties = new HashMap<>(); + String[] types = {"snappy", "gzip", "brotli", "zstd", "lz4", "plain"}; + TParquetCompressionType[] expected = { + TParquetCompressionType.SNAPPY, + TParquetCompressionType.GZIP, + TParquetCompressionType.BROTLI, + TParquetCompressionType.ZSTD, + TParquetCompressionType.LZ4, + TParquetCompressionType.UNCOMPRESSED + }; + for (int i = 0; i < types.length; i++) { + properties.put("compress_type", types[i]); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(expected[i], parquetFileFormatProperties.getParquetCompressionType()); + } + } + + @Test + public void testCompressionTypeCaseInsensitive() { + Map properties = new HashMap<>(); + properties.put("compress_type", "SNAPPY"); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertEquals(TParquetCompressionType.SNAPPY, parquetFileFormatProperties.getParquetCompressionType()); + } + + @Test(expected = AnalysisException.class) + public void testInvalidCompressionType() { + Map properties = new HashMap<>(); + properties.put("compress_type", "invalid_type"); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + } + + @Test + public void testParquetDisableDictionary() { + Map properties = new HashMap<>(); + properties.put("parquet.disable_dictionary", "true"); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertTrue(parquetFileFormatProperties.isParquetDisableDictionary()); + properties.put("parquet.disable_dictionary", "false"); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + Assert.assertFalse(parquetFileFormatProperties.isParquetDisableDictionary()); + } + + @Test + public void testParquetVersion() { + Map properties = new HashMap<>(); + properties.put("parquet.version", "v1"); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + + TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(); + parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions); + Assert.assertEquals(TParquetVersion.PARQUET_1_0, sinkOptions.getParquetVersion()); + + properties.put("parquet.version", "latest"); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + sinkOptions = new TResultFileSinkOptions(); + parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions); + Assert.assertEquals(TParquetVersion.PARQUET_2_LATEST, sinkOptions.getParquetVersion()); + } + + @Test + public void testParquetVersionInvalid() { + Map properties = new HashMap<>(); + properties.put("parquet.version", "invalid"); + parquetFileFormatProperties.analyzeFileFormatProperties(properties, true); + + TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(); + parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions); + Assert.assertEquals(TParquetVersion.PARQUET_1_0, sinkOptions.getParquetVersion()); + } + + @Test + public void testFullTResultFileSinkOptions() { + TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(); + parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions); + Assert.assertEquals(parquetFileFormatProperties.getParquetCompressionType(), sinkOptions.getParquetCompressionType()); + Assert.assertEquals(parquetFileFormatProperties.isParquetDisableDictionary(), sinkOptions.isParquetDisableDictionary()); + } + + @Test + public void testToTFileAttributes() { + TFileAttributes attrs = parquetFileFormatProperties.toTFileAttributes(); + Assert.assertNotNull(attrs); + Assert.assertNotNull(attrs.getTextParams()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java new file mode 100644 index 00000000000000..d94b49aca978f4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class WalFileFormatPropertiesTest { + + private WalFileFormatProperties walFileFormatProperties; + + @Before + public void setUp() { + walFileFormatProperties = new WalFileFormatProperties(); + } + + @Test + public void testAnalyzeFileFormatProperties() { + Map properties = new HashMap<>(); + // Add properties if needed + walFileFormatProperties.analyzeFileFormatProperties(properties, true); + } +}