diff --git a/be/src/vec/runtime/vcsv_transformer.cpp b/be/src/vec/runtime/vcsv_transformer.cpp index 0821915e57826e..9fdc02c4523130 100644 --- a/be/src/vec/runtime/vcsv_transformer.cpp +++ b/be/src/vec/runtime/vcsv_transformer.cpp @@ -86,17 +86,26 @@ VCSVTransformer::VCSVTransformer(RuntimeState* state, doris::io::FileWriter* fil Status VCSVTransformer::open() { RETURN_IF_ERROR(get_block_compression_codec(_compress_type, &_compress_codec)); if (_with_bom) { + Slice bom_slice(reinterpret_cast(bom), sizeof(bom)); if (_compress_codec) { - return Status::InternalError("compressed csv with bom is not supported yet"); + faststring compressed_data; + RETURN_IF_ERROR(_compress_codec->compress(bom_slice, &compressed_data)); + RETURN_IF_ERROR( + _file_writer->append(Slice(compressed_data.data(), compressed_data.size()))); + } else { + RETURN_IF_ERROR(_file_writer->append(bom_slice)); } - RETURN_IF_ERROR( - _file_writer->append(Slice(reinterpret_cast(bom), sizeof(bom)))); } if (!_csv_header.empty()) { + Slice header(_csv_header.data(), _csv_header.size()); if (_compress_codec) { - return Status::InternalError("compressed csv with header is not supported yet"); + faststring compressed_data; + RETURN_IF_ERROR(_compress_codec->compress(header, &compressed_data)); + RETURN_IF_ERROR( + _file_writer->append(Slice(compressed_data.data(), compressed_data.size()))); + } else { + return _file_writer->append(header); } - return _file_writer->append(Slice(_csv_header.data(), _csv_header.size())); } return Status::OK(); } diff --git a/regression-test/data/export_p0/test_outfile_csv_compress.out b/regression-test/data/export_p0/test_outfile_csv_compress.out index 7d3965e897461c..aa3c8fb7c25c83 100644 --- a/regression-test/data/export_p0/test_outfile_csv_compress.out +++ b/regression-test/data/export_p0/test_outfile_csv_compress.out @@ -113,6 +113,234 @@ c2 text Yes false \N NONE c1 text Yes false \N NONE c2 text Yes false \N NONE +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + +-- !select -- +1048576 1048576 + +-- !select -- +id text Yes false \N NONE +name text Yes false \N NONE + -- !select -- 1 2 @@ -176,3 +404,15 @@ c2 text Yes false \N NONE -- !select -- __dummy_col text Yes false \N NONE +-- !select -- +1 zhangsan +1 zhangsan1 +10 zhangsan10 +10 zhangsan110 +10 zhangsan1210 +10 zhangsan12410 +10 zhangsan12510 +10 zhangsan1310 +10 zhangsan13610 +10 zhangsan1410 + diff --git a/regression-test/suites/export_p0/test_outfile_csv_compress.groovy b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy index 01e5f0664407d6..c780fd9b1b58ac 100644 --- a/regression-test/suites/export_p0/test_outfile_csv_compress.groovy +++ b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy @@ -56,11 +56,11 @@ suite("test_outfile_csv_compress", "p0") { create_table(table_name) def outFilePath = """s3://${bucket}/outfile_""" - def csv_outfile_result = { the_table_name, compression_type -> + def csv_outfile_result = { the_table_name, compression_type, format_type -> def result = sql """ select * from ${the_table_name} into outfile "${outFilePath}" - FORMAT AS CSV + FORMAT AS ${format_type} PROPERTIES( "s3.endpoint" = "${s3_endpoint}", "s3.region" = "${region}", @@ -73,43 +73,82 @@ suite("test_outfile_csv_compress", "p0") { } for (String compression_type: ["plain", "gz", "bz2", "snappyblock", "lz4block", "zstd"]) { - def outfile_url = csv_outfile_result(table_name, compression_type); - print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.") - qt_select """ select c1, c2 from s3( - "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", - "ACCESS_KEY"= "${ak}", - "SECRET_KEY" = "${sk}", - "format" = "csv", - "provider" = "${getS3Provider()}", - "region" = "${region}", - "compress_type" = "${compression_type}" - ) order by c1, c2 limit 10; - """ - qt_select """ select count(c1), count(c2) from s3( - "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", - "ACCESS_KEY"= "${ak}", - "SECRET_KEY" = "${sk}", - "format" = "csv", - "provider" = "${getS3Provider()}", - "region" = "${region}", - "compress_type" = "${compression_type}" - ); - """ - qt_select """desc function s3( - "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", - "ACCESS_KEY"= "${ak}", - "SECRET_KEY" = "${sk}", - "format" = "csv", - "provider" = "${getS3Provider()}", - "region" = "${region}", - "compress_type" = "${compression_type}" - ); - """ + for (String format_type : ["csv"]) { + def outfile_url = csv_outfile_result(table_name, compression_type, format_type); + print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.") + qt_select """ select c1, c2 from s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format_type}", + "provider" = "${getS3Provider()}", + "region" = "${region}", + "compress_type" = "${compression_type}" + ) order by c1, c2 limit 10; + """ + qt_select """ select count(c1), count(c2) from s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format_type}", + "provider" = "${getS3Provider()}", + "region" = "${region}", + "compress_type" = "${compression_type}" + ); + """ + qt_select """desc function s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format_type}", + "provider" = "${getS3Provider()}", + "region" = "${region}", + "compress_type" = "${compression_type}" + ); + """ + } + } + + for (String compression_type: ["plain", "gz", "bz2", "snappyblock", "lz4block", "zstd"]) { + for (String format_type : ["csv_with_names", "csv_with_names_and_types"]) { + def outfile_url = csv_outfile_result(table_name, compression_type, format_type); + print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.") + qt_select """ select id, name from s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format_type}", + "provider" = "${getS3Provider()}", + "region" = "${region}", + "compress_type" = "${compression_type}" + ) order by id, name limit 10; + """ + qt_select """ select count(id), count(name) from s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format_type}", + "provider" = "${getS3Provider()}", + "region" = "${region}", + "compress_type" = "${compression_type}" + ); + """ + qt_select """desc function s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format_type}", + "provider" = "${getS3Provider()}", + "region" = "${region}", + "compress_type" = "${compression_type}" + ); + """ + } } for (String compression_type: ["plain", "gz", "bz2", "snappyblock", "lz4block", "zstd"]) { def small = "small_${table_name}" - def outfile_url = csv_outfile_result(small, compression_type); + def outfile_url = csv_outfile_result(small, compression_type, "csv"); print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.") qt_select """ select c1, c2 from s3( "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", @@ -163,7 +202,7 @@ suite("test_outfile_csv_compress", "p0") { // test empty table sql """drop table if exists test_outfile_csv_compress_empty_table""" sql """create table test_outfile_csv_compress_empty_table(k1 int) distributed by hash(k1) buckets 1 properties("replication_num" = "1")""" - def empty_outfile_url = csv_outfile_result("test_outfile_csv_compress_empty_table", "gz"); + def empty_outfile_url = csv_outfile_result("test_outfile_csv_compress_empty_table", "gz", "csv"); qt_select """desc function s3( "uri" = "http://${bucket}.${s3_endpoint}${empty_outfile_url.substring(5 + bucket.length(), empty_outfile_url.length() - 1)}*", "ACCESS_KEY"= "${ak}", @@ -174,5 +213,33 @@ suite("test_outfile_csv_compress", "p0") { "compress_type" = "gz" ); """ + + // test bom compress + def result = sql """ + select * from ${table_name} + into outfile "${outFilePath}" + FORMAT AS CSV + PROPERTIES( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}", + "compress_type" = "gz", + "with_bom" = "true" + ); + """ + def outfile_url = result[0][3] + print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.") + qt_select """ select c1, c2 from s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}*", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "csv", + "with_bom" = "true", + "provider" = "${getS3Provider()}", + "region" = "${region}", + "compress_type" = "gz" + ) order by c1, c2 limit 10; + """ }