Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add xz compression support #6421

Merged
merged 3 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/io/src/format_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub enum Compression {
RawDeflate,
Lzo,
Snappy,
Xz,
}

impl Default for Compression {
Expand All @@ -99,6 +100,7 @@ impl FromStr for Compression {
"rawdeflate" | "raw_deflate" => Ok(Compression::RawDeflate),
"lzo" => Ok(Compression::Lzo),
"snappy" => Ok(Compression::Snappy),
"xz" => Ok(Compression::Xz),
"none" => Ok(Compression::None),
_ => Err(ErrorCode::UnknownCompressionType(format!(
"Unknown compression: {s}"
Expand Down
4 changes: 3 additions & 1 deletion common/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub enum StageFileCompression {
RawDeflate,
Lzo,
Snappy,
Xz,
None,
}

Expand All @@ -105,8 +106,9 @@ impl FromStr for StageFileCompression {
"rawdeflate" | "raw_deflate" => Ok(StageFileCompression::RawDeflate),
"lzo" => Ok(StageFileCompression::Lzo),
"snappy" => Ok(StageFileCompression::Snappy),
"xz" => Ok(StageFileCompression::Xz),
"none" => Ok(StageFileCompression::None),
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | raw_deflate | lzo | snappy | none }"
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | raw_deflate | lzo | snappy | xz | none }"
.to_string()),
}
}
Expand Down
2 changes: 2 additions & 0 deletions common/proto-conv/src/user_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl FromToProto<pb::user_stage_info::StageFileCompression> for mt::StageFileCom
Ok(mt::StageFileCompression::Snappy)
}
pb::user_stage_info::StageFileCompression::None => Ok(mt::StageFileCompression::None),
pb::user_stage_info::StageFileCompression::Xz => Ok(mt::StageFileCompression::Xz),
}
}

Expand All @@ -370,6 +371,7 @@ impl FromToProto<pb::user_stage_info::StageFileCompression> for mt::StageFileCom
Ok(pb::user_stage_info::StageFileCompression::Snappy)
}
mt::StageFileCompression::None => Ok(pb::user_stage_info::StageFileCompression::None),
mt::StageFileCompression::Xz => Ok(pb::user_stage_info::StageFileCompression::Xz),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions common/protos/proto/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ message UserStageInfo {
RawDeflate = 6;
Lzo = 7;
Snappy = 8;
// We used to assign `None` as 9, it's a mistake.
// Please change this value to 0 instead in next version bump.
None = 9;
Xz = 10;
}

message FileFormatOptions {
Expand Down
3 changes: 2 additions & 1 deletion docs/doc/30-reference/30-sql/10-dml/dml-copy-into-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Description: Number of lines at the start of the file to skip.

Default: `0`

#### `COMPRESSION = AUTO | GZIP | BZ2 | BROTLI | ZSTD | DEFLATE | RAW_DEFLATE | NONE`
#### `COMPRESSION = AUTO | GZIP | BZ2 | BROTLI | ZSTD | DEFLATE | RAW_DEFLATE | XZ | NONE`

Description: String that represents the compression algorithm.

Expand All @@ -105,6 +105,7 @@ Values:
| `ZSTD` | Zstandard v0.8 (and higher) is supported. |
| `DEFLATE` | Deflate-compressed files (with zlib header, RFC1950). |
| `RAW_DEFLATE` | Deflate-compressed files (without any header, RFC1951). |
| `XZ` | |
| `NONE` | Indicates that the files have not been compressed. |

### copyOptions
Expand Down
1 change: 1 addition & 0 deletions query/src/servers/http/v1/multipart_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl MultipartFormat {
Compression::Zstd => Some(CompressAlgorithm::Zstd),
Compression::Deflate => Some(CompressAlgorithm::Zlib),
Compression::RawDeflate => Some(CompressAlgorithm::Deflate),
Compression::Xz => Some(CompressAlgorithm::Xz),
Compression::Lzo => {
return Err(ErrorCode::UnImplement("compress type lzo is unimplemented"));
}
Expand Down
1 change: 1 addition & 0 deletions query/src/storages/stage/stage_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl StageSource {
StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd),
StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib),
StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate),
StageFileCompression::Xz => Some(CompressAlgorithm::Xz),
StageFileCompression::Lzo => {
return Err(ErrorCode::UnImplement("compress type lzo is unimplemented"))
}
Expand Down
Binary file added tests/data/ontime_200.csv.xz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Test copy from zstd file
199 2020.0 769
Test copy from bzip2 file
199 2020.0 769
Test copy from xz file
199 2020.0 769
398 2020.0 1538
398 2020.0 1538
398 2020.0 1538
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ echo "select count(1), avg(Year), sum(DayOfWeek) from ontime200" | $MYSQL_CLIEN
# Truncate the ontime table.
echo "truncate table ontime200" | $MYSQL_CLIENT_CONNECT

## Copy from s3 with compression xz.
echo "Test copy from xz file"
echo "set enable_planner_v2 = 1; copy into ontime200 from 's3://testbucket/admin/data/ontime_200.csv.xz' credentials=(aws_key_id='minioadmin' aws_secret_key='minioadmin') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'xz' record_delimiter = '\n' skip_header = 1)" | $MYSQL_CLIENT_CONNECT

## Result.
echo "select count(1), avg(Year), sum(DayOfWeek) from ontime200" | $MYSQL_CLIENT_CONNECT

# Truncate the ontime table.
echo "truncate table ontime200" | $MYSQL_CLIENT_CONNECT

## Copy from s3 with files.
echo "set enable_planner_v2 = 1; copy into ontime200 from 's3://testbucket/admin/data/' credentials=(aws_key_id='minioadmin' aws_secret_key='minioadmin') FILES = ('ontime_200.csv', 'ontime_200_v1.csv') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' record_delimiter = '\n' skip_header = 1)" | $MYSQL_CLIENT_CONNECT
## Result.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
ontime_200.csv
ontime_200.csv.bz2
ontime_200.csv.gz
ontime_200.csv.xz
ontime_200.csv.zst
ontime_200.parquet
199 2020.0 769
199 2020.0 769
199 2020.0 769
199 2020.0 769
597 2020.0 2307
199 2020.0 769
796 2020.0 3076
ontime_200.parquet
ontime_200_v1.parquet
398 2020.0 1538
Expand All @@ -19,3 +21,4 @@ ontime_200.csv
ontime_200.csv
ontime_200.csv
ontime_200.csv
ontime_200.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontim
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.gz s3://testbucket/admin/stage/s1/ontime_200.csv.gz >/dev/null 2>&1
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.zst s3://testbucket/admin/stage/s1/ontime_200.csv.zst >/dev/null 2>&1
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.bz2 s3://testbucket/admin/stage/s1/ontime_200.csv.bz2 >/dev/null 2>&1
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.xz s3://testbucket/admin/stage/s1/ontime_200.csv.xz >/dev/null 2>&1
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.parquet s3://testbucket/admin/stage/s1/ontime_200.parquet >/dev/null 2>&1

## Copy from internal stage
Expand All @@ -30,8 +31,10 @@ copy_from_stage_cases=(
"copy into ontime200 from '@s1' FILES = ('ontime_200.csv.zst') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'zstd' record_delimiter = '\n' skip_header = 1);"
# copy bz2 csv
"copy into ontime200 from '@s1' FILES = ('ontime_200.csv.bz2') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'bz2' record_delimiter = '\n' skip_header = 1);"
# copy bz2 csv
"copy into ontime200 from '@s1' FILES = ('ontime_200.csv.xz') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'xz' record_delimiter = '\n' skip_header = 1);"
# copy auto csv
"copy into ontime200 from '@s1' FILES = ('ontime_200.csv.gz', 'ontime_200.csv.zst', 'ontime_200.csv.bz2') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = AUTO record_delimiter = '\n' skip_header = 1);"
"copy into ontime200 from '@s1' FILES = ('ontime_200.csv.gz', 'ontime_200.csv.zst', 'ontime_200.csv.bz2', 'ontime_200.csv.xz') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = AUTO record_delimiter = '\n' skip_header = 1);"
)

for i in "${copy_from_stage_cases[@]}"; do
Expand Down