Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ struct ResultFileOptions {
bool is_refactor_before_flag = false;
std::string orc_schema;
TFileCompressType::type orc_compression_type;
// currently only for csv
// TODO: we should merge parquet_commpression_type/orc_compression_type/compression_type
TFileCompressType::type compression_type = TFileCompressType::PLAIN;

bool delete_existing_files = false;
std::string file_suffix;
Expand Down Expand Up @@ -117,6 +120,9 @@ struct ResultFileOptions {
if (t_opt.__isset.orc_writer_version) {
orc_writer_version = t_opt.orc_writer_version;
}
if (t_opt.__isset.compression_type) {
compression_type = t_opt.compression_type;
}
}
};

Expand Down
27 changes: 22 additions & 5 deletions be/src/vec/sink/writer/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
}));
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_vfile_writer.reset(new VCSVTransformer(_state, _file_writer_impl.get(),
_vec_output_expr_ctxs, _output_object_data,
_header_type, _header, _file_opts->column_separator,
_file_opts->line_delimiter, _file_opts->with_bom));
_vfile_writer.reset(new VCSVTransformer(
_state, _file_writer_impl.get(), _vec_output_expr_ctxs, _output_object_data,
_header_type, _header, _file_opts->column_separator, _file_opts->line_delimiter,
_file_opts->with_bom, _file_opts->compression_type));
break;
case TFileFormatType::FORMAT_PARQUET:
_vfile_writer.reset(new VParquetTransformer(
Expand Down Expand Up @@ -196,7 +196,7 @@ void VFileResultWriter::_get_file_url(std::string* file_url) {
std::string VFileResultWriter::_file_format_to_name() {
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
return "csv";
return "csv" + _compression_type_to_name();
case TFileFormatType::FORMAT_PARQUET:
return "parquet";
case TFileFormatType::FORMAT_ORC:
Expand All @@ -206,6 +206,23 @@ std::string VFileResultWriter::_file_format_to_name() {
}
}

std::string VFileResultWriter::_compression_type_to_name() {
switch (_file_opts->compression_type) {
case TFileCompressType::GZ:
return ".gzip";
case TFileCompressType::BZ2:
return ".bzip2";
case TFileCompressType::SNAPPYBLOCK:
return ".snappy";
case TFileCompressType::LZ4BLOCK:
return ".lz4";
case TFileCompressType::ZSTD:
return ".zstd";
default:
return "";
}
}

Status VFileResultWriter::write(RuntimeState* state, Block& block) {
if (block.rows() == 0) {
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/writer/vfile_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ class VFileResultWriter final : public AsyncResultWriter {
// delete the dir of file_path
Status _delete_dir();
double _get_write_speed(int64_t write_bytes, int64_t write_time);
std::string _compression_type_to_name();

private:
RuntimeState* _state; // not owned, set when init
const pipeline::ResultFileOptions* _file_opts = nullptr;
TStorageBackendType::type _storage_type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ private void analyzeProperties() throws UserException {
analyzeBrokerDesc(copiedProps);

fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
// check if compression type for csv is supported
if (fileFormatProperties instanceof CsvFileFormatProperties) {
CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) fileFormatProperties;
csvFileFormatProperties.checkSupportedCompressionType(true);
}

if (copiedProps.containsKey(PROP_MAX_FILE_SIZE)) {
maxFileSizeBytes = ParseUtil.analyzeDataVolume(copiedProps.get(PROP_MAX_FILE_SIZE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,15 +610,15 @@ public static TFileCompressType inferFileCompressTypeByPath(String path) {
}
}

public static TFileCompressType getFileCompressType(String compressType) {
public static TFileCompressType getFileCompressType(String compressType) throws AnalysisException {
if (Strings.isNullOrEmpty(compressType)) {
return TFileCompressType.UNKNOWN;
}
final String upperCaseType = compressType.toUpperCase();
try {
return TFileCompressType.valueOf(upperCaseType);
} catch (IllegalArgumentException e) {
return TFileCompressType.UNKNOWN;
throw new AnalysisException("Unknown compression type: " + compressType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,35 @@
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TResultFileSinkOptions;

import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Set;

public class CsvFileFormatProperties extends FileFormatProperties {
public static final Logger LOG = LogManager.getLogger(
org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class);

// supported compression types for csv writer
public static final Set<TFileCompressType> SUPPORTED_CSV_WRITE_COMPRESSION_TYPES = Sets.newHashSet();

static {
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.PLAIN);
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.GZ);
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.BZ2);
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.SNAPPYBLOCK);
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.LZ4BLOCK);
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.ZSTD);
}

public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
public static final String DEFAULT_LINE_DELIMITER = "\n";

Expand Down Expand Up @@ -117,6 +132,7 @@ public void analyzeFileFormatProperties(Map<String, String> formatProperties, bo
throw new AnalysisException("skipLines should not be less than 0.");
}

// This default value is "UNKNOWN", so that the caller may infer the compression type by suffix of file.
String compressTypeStr = getOrDefault(formatProperties,
PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty);
compressionType = Util.getFileCompressType(compressTypeStr);
Expand All @@ -138,10 +154,26 @@ public void analyzeFileFormatProperties(Map<String, String> formatProperties, bo
}
}

public void checkSupportedCompressionType(boolean isWrite) {
// Currently, only check for write operation.
// Because we only support a subset of compression type for writing.
if (isWrite) {
// "UNKNOWN" means user does not specify the compression type
if (this.compressionType == TFileCompressType.UNKNOWN) {
this.compressionType = TFileCompressType.PLAIN;
}
if (!SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.contains(this.compressionType)) {
throw new AnalysisException(
"csv compression type [" + this.compressionType.name() + "] is invalid for writing");
}
}
}

@Override
public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) {
sinkOptions.setColumnSeparator(columnSeparator);
sinkOptions.setLineDelimiter(lineDelimiter);
sinkOptions.setCompressionType(compressionType);
}

// The method `analyzeFileFormatProperties` must have been called once before this method
Expand Down
11 changes: 5 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,12 @@ private Map<String, String> convertOutfileProperties() {
if (format.equals("csv") || format.equals("csv_with_names") || format.equals("csv_with_names_and_types")) {
outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR, columnSeparator);
outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER, lineDelimiter);
} else {
// orc / parquet
// compressType == null means outfile will use default compression type
if (compressType != null) {
outfileProperties.put(ExportCommand.COMPRESS_TYPE, compressType);
}
}
// compressType == null means outfile will use default compression type
if (compressType != null) {
outfileProperties.put(ExportCommand.COMPRESS_TYPE, compressType);
}

if (!maxFileSize.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.property.fileformat;

import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.thrift.TFileCompressType;

Expand Down Expand Up @@ -130,17 +131,18 @@ public void testAnalyzeFileFormatPropertiesTrimDoubleQuotesFalse() throws Analys
public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
Map<String, String> properties = new HashMap<>();
properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(TFileCompressType.UNKNOWN, csvFileFormatProperties.getCompressionType());
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
"Unknown compression type: invalid",
() -> csvFileFormatProperties.analyzeFileFormatProperties(properties, true));
}

@Test
public void testAnalyzeFileFormatPropertiesValidCompressType() throws AnalysisException {
Map<String, String> properties = new HashMap<>();
properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "gz");

csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(TFileCompressType.GZ, csvFileFormatProperties.getCompressionType());
ExceptionChecker.expectThrowsNoException(() -> csvFileFormatProperties.checkSupportedCompressionType(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.property.fileformat;

import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.thrift.TFileCompressType;

Expand Down Expand Up @@ -93,8 +94,8 @@ public void testAnalyzeFileFormatPropertiesSkipLinesLargeValue() throws Analysis
public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
Map<String, String> properties = new HashMap<>();
properties.put(TextFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
textFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(TFileCompressType.UNKNOWN, textFileFormatProperties.getCompressionType());
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Unknown compression type: invalid",
() -> textFileFormatProperties.analyzeFileFormatProperties(properties, true));
}

@Test
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ struct TResultFileSinkOptions {
//hive write sink use int96
//export data to file use by user define properties
21: optional bool enable_int96_timestamps
// currently only for csv
// TODO: merge with parquet_compression_type and orc_compression_type
22: optional PlanNodes.TFileCompressType compression_type
}

struct TMemoryScratchSink {
Expand Down
Loading
Loading