From 2974bf32887b6d3dd97fe49a07b5a9c4f6d58884 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 21 Aug 2023 11:26:25 +0800 Subject: [PATCH 01/12] 1 --- .../datasource/hive/HiveMetaStoreCache.java | 2 ++ .../doris/external/hive/util/HiveUtil.java | 3 ++- .../doris/planner/external/FileScanNode.java | 17 ++++++++++------- .../doris/planner/external/HiveSplit.java | 5 ----- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index e7e621948e24ab..45b84dfb174d41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -1000,6 +1000,7 @@ public static class FileCacheValue { // File Cache for self splitter. private final List files = Lists.newArrayList(); // File split cache for old splitter. This is a temp variable. + @Deprecated private final List splits = Lists.newArrayList(); private boolean isSplittable; // The values of partitions. @@ -1021,6 +1022,7 @@ public void addFile(RemoteFile file) { } } + @Deprecated public void addSplit(FileSplit split) { if (isFileVisible(split.getPath())) { splits.add(split); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java index 704d0fadf84fee..e1baea3652cd89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -190,7 +190,8 @@ public static boolean isSplittable(InputFormat inputFormat, Path path, Job return true; } - // use reflection to get isSplittable method on FileInputFormat + // use reflection to get isSplitable method on FileInputFormat + // ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable" Method method = null; for (Class clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 662aa939ee4b16..8e2c8ed3a4521e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.Util; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.FileSplit.FileSplitCreator; import org.apache.doris.qe.ConnectContext; @@ -32,6 +33,7 @@ import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileScanNode; import org.apache.doris.thrift.TFileScanRangeParams; @@ -221,19 +223,20 @@ protected List splitFile(Path path, long blockSize, BlockLocation[] block if (blockLocations == null) { blockLocations = new BlockLocation[0]; } - long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); - if (splitSize <= 0) { - splitSize = blockSize; - } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE); List result = Lists.newArrayList(); - if (!splittable) { + TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.toString()); + if (!splittable || compressType != TFileCompressType.PLAIN) { LOG.debug("Path {} is not splittable.", path); String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues)); return result; } + long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + if (splitSize <= 0) { + splitSize = blockSize; + } + // Min split size is DEFAULT_SPLIT_SIZE(128MB). + splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE); long bytesRemaining; for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java index 0f230c85f43943..0bc8442760710e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java @@ -32,11 +32,6 @@ public HiveSplit(Path path, long start, long length, long fileLength, this.acidInfo = acidInfo; } - public HiveSplit(Path path, long start, long length, long fileLength, String[] hosts, AcidInfo acidInfo) { - super(path, start, length, fileLength, hosts, null); - this.acidInfo = acidInfo; - } - @Override public Object getInfo() { return acidInfo; From d3d43edfc3dc5913c782545e360cd950c3b766ec Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 21 Aug 2023 14:29:27 +0800 Subject: [PATCH 02/12] 2 --- .../hive/test_compress_type.out | 19 ++++++ .../hive/test_compress_type.groovy | 61 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 regression-test/data/external_table_p2/hive/test_compress_type.out create mode 100644 regression-test/suites/external_table_p2/hive/test_compress_type.groovy diff --git a/regression-test/data/external_table_p2/hive/test_compress_type.out b/regression-test/data/external_table_p2/hive/test_compress_type.out new file mode 100644 index 00000000000000..b7f9645c63e447 --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_compress_type.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q21 -- +600005 + +-- !q22 -- +1310008 + +-- !q23 -- +13 + +-- !q31 -- +600005 + +-- !q32 -- +1310008 + +-- !q33 -- +13 + diff --git a/regression-test/suites/external_table_p2/hive/test_compress_type.groovy b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy new file mode 100644 index 00000000000000..81a4a016cb5888 --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compress_type", "p2,external,hive,external_remote,external_remote_hive") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_compress_type" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + + sql """ use multi_catalog """ + + // table test_compress_partitioned has 6 partitions with different compressed file: plain, gzip, bzip2, deflate + sql """set file_split_size=0""" + explain { + sql("select count(*) from test_compress_partitioned") + contains "inputSplitNum=14, totalFileSize=682053470" + contains "partition=6/6" + } + qt_q21 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" + qt_q22 """select count(*) from test_compress_partitioned""" + qt_q23 """select count(*) from test_compress_partitioned where watchid=4611870011201662970""" + + sql """set file_split_size=8388608""" + explain { + sql("select count(*) from test_compress_partitioned") + contains "inputSplitNum=80, totalFileSize=682053470" + contains "partition=6/6" + } + + qt_q31 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" + qt_q32 """select count(*) from test_compress_partitioned""" + qt_q33 """select count(*) from test_compress_partitioned where watchid=4611870011201662970""" + sql """set file_split_size=0""" + } +} From c283d6aa56a79866415c50db6811496d2849dfeb Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 21 Aug 2023 14:35:34 +0800 Subject: [PATCH 03/12] 2 --- .../hive/test_compress_type.out | 28 +++++++++++++++++-- .../hive/test_compress_type.groovy | 4 +-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/regression-test/data/external_table_p2/hive/test_compress_type.out b/regression-test/data/external_table_p2/hive/test_compress_type.out index b7f9645c63e447..1c3cf7e96e6f6c 100644 --- a/regression-test/data/external_table_p2/hive/test_compress_type.out +++ b/regression-test/data/external_table_p2/hive/test_compress_type.out @@ -6,7 +6,19 @@ 1310008 -- !q23 -- -13 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 2023-08-21 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 bzip2 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 bzip2 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 deflate +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 deflate +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain -- !q31 -- 600005 @@ -15,5 +27,17 @@ 1310008 -- !q33 -- -13 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 2023-08-21 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 bzip2 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 bzip2 +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 deflate +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 deflate +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain diff --git a/regression-test/suites/external_table_p2/hive/test_compress_type.groovy b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy index 81a4a016cb5888..5e572e214cd77d 100644 --- a/regression-test/suites/external_table_p2/hive/test_compress_type.groovy +++ b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy @@ -44,7 +44,7 @@ suite("test_compress_type", "p2,external,hive,external_remote,external_remote_hi } qt_q21 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" qt_q22 """select count(*) from test_compress_partitioned""" - qt_q23 """select count(*) from test_compress_partitioned where watchid=4611870011201662970""" + order_qt_q23 """select * from test_compress_partitioned where watchid=4611870011201662970""" sql """set file_split_size=8388608""" explain { @@ -55,7 +55,7 @@ suite("test_compress_type", "p2,external,hive,external_remote,external_remote_hi qt_q31 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" qt_q32 """select count(*) from test_compress_partitioned""" - qt_q33 """select count(*) from test_compress_partitioned where watchid=4611870011201662970""" + order_qt_q33 """select * from test_compress_partitioned where watchid=4611870011201662970""" sql """set file_split_size=0""" } } From 1148a0ac471c911068481ac908717c7409cbb9f0 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 22 Aug 2023 16:11:45 +0800 Subject: [PATCH 04/12] add lz4 --- be/src/exec/decompressor.cpp | 96 +++++++++++++++++-- be/src/exec/decompressor.h | 19 +++- be/src/vec/exec/format/csv/csv_reader.cpp | 57 ++++++++--- .../new_plain_text_line_reader.cpp | 10 +- .../file_reader/new_plain_text_line_reader.h | 1 - gensrc/thrift/PlanNodes.thrift | 2 + .../hive/test_compress_type.out | 6 +- .../hive/test_compress_type.groovy | 8 +- 8 files changed, 165 insertions(+), 34 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index af69a896a2b8e4..d44853ee0ea089 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -42,6 +42,9 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom case CompressType::LZ4FRAME: *decompressor = new Lz4FrameDecompressor(); break; + case CompressType::LZ4BLOCK: + *decompressor = new Lz4BlockDecompressor(); + break; #ifdef DORIS_WITH_LZO case CompressType::LZOP: *decompressor = new LzopDecompressor(); @@ -239,7 +242,7 @@ Status Lz4FrameDecompressor::decompress(uint8_t* input, size_t input_len, size_t size_t* decompressed_len, bool* stream_end, size_t* more_input_bytes, size_t* more_output_bytes) { uint8_t* src = input; - size_t src_size = input_len; + size_t remaining_input_size = input_len; size_t ret = 1; *input_bytes_read = 0; @@ -257,7 +260,7 @@ Status Lz4FrameDecompressor::decompress(uint8_t* input, size_t input_len, size_t } LZ4F_frameInfo_t info; - ret = LZ4F_getFrameInfo(_dctx, &info, (void*)src, &src_size); + ret = LZ4F_getFrameInfo(_dctx, &info, (void*)src, &remaining_input_size); if (LZ4F_isError(ret)) { return Status::InternalError("LZ4F_getFrameInfo error: {}", std::string(LZ4F_getErrorName(ret))); @@ -270,17 +273,17 @@ Status Lz4FrameDecompressor::decompress(uint8_t* input, size_t input_len, size_t std::string(LZ4F_getErrorName(ret))); } - *input_bytes_read = src_size; + *input_bytes_read = remaining_input_size; - src += src_size; - src_size = input_len - src_size; + src += remaining_input_size; + remaining_input_size = input_len - remaining_input_size; LOG(INFO) << "lz4 block size: " << _expect_dec_buf_size; } // decompress size_t output_len = output_max_len; - ret = LZ4F_decompress(_dctx, (void*)output, &output_len, (void*)src, &src_size, + ret = LZ4F_decompress(_dctx, (void*)output, &output_len, (void*)src, &remaining_input_size, /* LZ4F_decompressOptions_t */ nullptr); if (LZ4F_isError(ret)) { return Status::InternalError("Decompression error: {}", @@ -288,7 +291,7 @@ Status Lz4FrameDecompressor::decompress(uint8_t* input, size_t input_len, size_t } // update - *input_bytes_read += src_size; + *input_bytes_read += remaining_input_size; *decompressed_len = output_len; if (ret == 0) { *stream_end = true; @@ -324,4 +327,83 @@ size_t Lz4FrameDecompressor::get_block_size(const LZ4F_frameInfo_t* info) { } } +/// Lz4BlockDecompressor +Status Lz4BlockDecompressor::init() { + return Status::OK(); +} + +uint32_t Lz4BlockDecompressor::_read_int32(uint8_t* buf) { + return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; +} + +Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, + uint8_t* output, size_t output_max_len, + size_t* decompressed_len, bool* stream_end, + size_t* more_input_bytes, size_t* more_output_bytes) { + uint8_t* src = input; + size_t remaining_input_size = input_len; + int64_t uncompressed_total_len = 0; + *input_bytes_read = 0; + + // The hadoop lz4 codec is as: + // <4 byte big endian uncompressed size> + // <4 byte big endian compressed size> + // + // .... + // <4 byte big endian uncompressed size> + // <4 byte big endian compressed size> + // + // + // See: + // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc + while (remaining_input_size > 0) { + // Read uncompressed size + uint32_t uncompressed_block_len = _read_int32(src); + int64_t remaining_output_size = output_max_len - uncompressed_total_len; + if (remaining_output_size < uncompressed_block_len) { + // Need more output buffer + *more_output_bytes = uncompressed_block_len - remaining_output_size; + break; + } + + // Read compressed size + size_t tmp_src_size = remaining_input_size - sizeof(uint32_t); + size_t compressed_len = _read_int32(src + sizeof(uint32_t)); + if (compressed_len == 0 || compressed_len > tmp_src_size) { + // Need more input data + *more_input_bytes = compressed_len - tmp_src_size; + break; + } + + src += 2 * sizeof(uint32_t); + remaining_input_size -= 2 * sizeof(uint32_t); + + // Decompress + int uncompressed_len = LZ4_decompress_safe(reinterpret_cast(src), + reinterpret_cast(output), compressed_len, remaining_output_size); + if (uncompressed_len < 0) { + return Status::InternalError("lz4 block decompress failed. uncompressed_len < 0: {}", uncompressed_len); + } + + output += uncompressed_len; + src += compressed_len; + remaining_input_size -= compressed_len; + uncompressed_block_len -= uncompressed_len; + uncompressed_total_len += uncompressed_len; + } + + *input_bytes_read += (input_len - remaining_input_size); + *decompressed_len = uncompressed_total_len; + // If no more input and output need, means this is the end of a compressed block + *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0); + + return Status::OK(); +} + +std::string Lz4BlockDecompressor::debug_info() { + std::stringstream ss; + ss << "Lz4BlockDecompressor."; + return ss.str(); +} + } // namespace doris diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h index af37335f1f7145..0bfd141c4dfdc8 100644 --- a/be/src/exec/decompressor.h +++ b/be/src/exec/decompressor.h @@ -34,7 +34,7 @@ namespace doris { -enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP }; +enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP, LZ4BLOCK }; class Decompressor { public: @@ -140,6 +140,23 @@ class Lz4FrameDecompressor : public Decompressor { const static unsigned DORIS_LZ4F_VERSION; }; +class Lz4BlockDecompressor : public Decompressor { +public: + ~Lz4BlockDecompressor() override {} + + Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output, + size_t output_max_len, size_t* decompressed_len, bool* stream_end, + size_t* more_input_bytes, size_t* more_output_bytes) override; + + std::string debug_info() override; + +private: + friend class Decompressor; + Lz4BlockDecompressor() : Decompressor(CompressType::LZ4FRAME) {} + Status init() override; + uint32_t _read_int32(uint8_t* buf); +}; + #ifdef DORIS_WITH_LZO class LzopDecompressor : public Decompressor { public: diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index ba5d69cb73708a..ec32f7345188d7 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -397,21 +397,44 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); size_t rows = 0; - auto columns = block->mutate_columns(); - while (rows < batch_size && !_line_reader_eof) { - const uint8_t* ptr = nullptr; - size_t size = 0; - RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); - if (_skip_lines > 0) { - _skip_lines--; - continue; - } - if (size == 0) { - // Read empty row, just continue - continue; - } - RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); + if (_push_down_agg_type == TPushAggOp::type::COUNT) { + while (rows < batch_size && !_line_reader_eof) { + const uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); + if (_skip_lines > 0) { + _skip_lines--; + continue; + } + if (size == 0) { + // Read empty row, just continue + continue; + } + ++rows; + } + + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + + } else { + auto columns = block->mutate_columns(); + while (rows < batch_size && !_line_reader_eof) { + const uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); + if (_skip_lines > 0) { + _skip_lines--; + continue; + } + if (size == 0) { + // Read empty row, just continue + continue; + } + + RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); + } } *eof = (rows == 0); @@ -473,6 +496,9 @@ Status CsvReader::_create_decompressor() { case TFileCompressType::LZ4FRAME: compress_type = CompressType::LZ4FRAME; break; + case TFileCompressType::LZ4BLOCK: + compress_type = CompressType::LZ4BLOCK; + break; case TFileCompressType::DEFLATE: compress_type = CompressType::DEFLATE; break; @@ -495,6 +521,9 @@ Status CsvReader::_create_decompressor() { case TFileFormatType::FORMAT_CSV_LZ4FRAME: compress_type = CompressType::LZ4FRAME; break; + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: + compress_type = CompressType::LZ4BLOCK; + break; case TFileFormatType::FORMAT_CSV_LZOP: compress_type = CompressType::LZOP; break; diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index b59bbef1f1c085..c27aba354f6e1f 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -201,7 +201,6 @@ NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, _output_buf_limit(0), _file_eof(false), _eof(false), - _stream_end(true), _more_input_bytes(0), _more_output_bytes(0), _current_offset(current_offset), @@ -324,6 +323,7 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool _line_reader_ctx->refresh(); int found_line_delimiter = 0; size_t offset = 0; + bool stream_end = true; while (!done()) { // find line delimiter in current decompressed data uint8_t* cur_ptr = _output_buf + _output_buf_pos; @@ -379,7 +379,7 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool COUNTER_UPDATE(_bytes_read_counter, read_len); } if (_file_eof || read_len == 0) { - if (!_stream_end) { + if (!stream_end) { return Status::InternalError( "Compressed file has been truncated, which is not allowed"); } else { @@ -392,7 +392,7 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool if (_decompressor == nullptr) { _output_buf_limit += read_len; - _stream_end = true; + stream_end = true; } else { // only update input limit. // input pos is set at MARK step @@ -418,10 +418,10 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool _input_buf_limit - _input_buf_pos, /* input_len */ &input_read_bytes, _output_buf + _output_buf_limit, /* output */ _output_buf_size - _output_buf_limit, /* output_max_len */ - &decompressed_len, &_stream_end, &_more_input_bytes, &_more_output_bytes)); + &decompressed_len, &stream_end, &_more_input_bytes, &_more_output_bytes)); // LOG(INFO) << "after decompress:" - // << " stream_end: " << _stream_end + // << " stream_end: " << stream_end // << " input_read_bytes: " << input_read_bytes // << " decompressed_len: " << decompressed_len // << " more_input_bytes: " << _more_input_bytes diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h index 7326812b92bcb3..9947259300dae4 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h @@ -235,7 +235,6 @@ class NewPlainTextLineReader : public LineReader { bool _file_eof; bool _eof; - bool _stream_end; size_t _more_input_bytes; size_t _more_output_bytes; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0717fc498dfc20..5c5825607bcfcf 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -116,6 +116,7 @@ enum TFileFormatType { FORMAT_PROTO, FORMAT_JNI, FORMAT_AVRO, + FORMAT_CSV_LZ4BLOCK } // In previous versions, the data compression format and file format were stored together, as TFileFormatType, @@ -132,6 +133,7 @@ enum TFileCompressType { LZ4FRAME, DEFLATE, LZOP, + LZ4BLOCK } struct THdfsConf { diff --git a/regression-test/data/external_table_p2/hive/test_compress_type.out b/regression-test/data/external_table_p2/hive/test_compress_type.out index 1c3cf7e96e6f6c..a80b6b0bf21c6d 100644 --- a/regression-test/data/external_table_p2/hive/test_compress_type.out +++ b/regression-test/data/external_table_p2/hive/test_compress_type.out @@ -3,7 +3,7 @@ 600005 -- !q22 -- -1310008 +1410009 -- !q23 -- 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 2023-08-21 @@ -13,6 +13,7 @@ 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 deflate 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 lz4 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix @@ -24,7 +25,7 @@ 600005 -- !q32 -- -1310008 +1410009 -- !q33 -- 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 2023-08-21 @@ -34,6 +35,7 @@ 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 deflate 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 gzip +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 lz4 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix diff --git a/regression-test/suites/external_table_p2/hive/test_compress_type.groovy b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy index 5e572e214cd77d..c033d7070941c4 100644 --- a/regression-test/suites/external_table_p2/hive/test_compress_type.groovy +++ b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy @@ -39,8 +39,8 @@ suite("test_compress_type", "p2,external,hive,external_remote,external_remote_hi sql """set file_split_size=0""" explain { sql("select count(*) from test_compress_partitioned") - contains "inputSplitNum=14, totalFileSize=682053470" - contains "partition=6/6" + contains "inputSplitNum=15, totalFileSize=706873074" + contains "partition=7/7" } qt_q21 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" qt_q22 """select count(*) from test_compress_partitioned""" @@ -49,8 +49,8 @@ suite("test_compress_type", "p2,external,hive,external_remote,external_remote_hi sql """set file_split_size=8388608""" explain { sql("select count(*) from test_compress_partitioned") - contains "inputSplitNum=80, totalFileSize=682053470" - contains "partition=6/6" + contains "inputSplitNum=81, totalFileSize=706873074" + contains "partition=7/7" } qt_q31 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" From 19ee35a23dcbd58161dd7fbe94eb9269b917f6f2 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 22 Aug 2023 16:28:45 +0800 Subject: [PATCH 05/12] 5 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 9 +++++++++ be/src/service/internal_service.cpp | 1 + be/src/util/load_util.cpp | 12 +++++++++--- be/src/vec/exec/format/csv/csv_reader.cpp | 6 ++++-- be/src/vec/exec/scan/vfile_scanner.cpp | 1 + .../main/java/org/apache/doris/common/util/Util.java | 1 + 7 files changed, 27 insertions(+), 5 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a8307e7de1bfdd..b03f5236161795 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1069,6 +1069,8 @@ DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); DEFINE_Int32(partition_topn_partition_threshold, "1024"); +DEFINE_mString(default_lz4_codec, "block"); + #ifdef BE_TEST // test s3 DEFINE_String(test_s3_resource, "resource"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 8d978c768e7c36..815e387a6aef44 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1124,10 +1124,19 @@ DECLARE_mBool(enable_merge_on_write_correctness_check); // The secure path with user files, used in the `local` table function. DECLARE_mString(user_files_secure_path); +<<<<<<< HEAD // This threshold determines how many partitions will be allocated for window function get topn. // and if this threshold is exceeded, the remaining data will be pass through to other node directly. DECLARE_Int32(partition_topn_partition_threshold); +// The default lz4 codec. Options: frame, block +// In previous, we use lz4 "frame" as the default codec +// but the hadoop use lz4 block to write data +// So in v2.0, change the default codec to "block" +// So that we can read lz4 data from hive table by default. +// TODO: find a way to auto detect this. +DECLARE_mString(default_lz4_codec); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3838668577ac38..501ffd476cfd19 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -632,6 +632,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c case TFileFormatType::FORMAT_CSV_GZ: case TFileFormatType::FORMAT_CSV_BZ2: case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: { // file_slots is no use diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp index 8736561db4f3e1..bc34cece657888 100644 --- a/be/src/util/load_util.cpp +++ b/be/src/util/load_util.cpp @@ -44,8 +44,13 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *format_type = TFileFormatType::FORMAT_CSV_BZ2; *compress_type = TFileCompressType::BZ2; } else if (iequal(compress_type_str, "LZ4")) { - *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; - *compress_type = TFileCompressType::LZ4FRAME; + if (config::default_lz4_codec == "block") { + *format_type = TFileFormatType::FORMAT_CSV_LZ4BLOCK; + *compress_type = TFileCompressType::LZ4BLOCK; + } else { + *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + *compress_type = TFileCompressType::LZ4FRAME; + } } else if (iequal(compress_type_str, "LZOP")) { *format_type = TFileFormatType::FORMAT_CSV_LZOP; *compress_type = TFileCompressType::LZO; @@ -72,6 +77,7 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) { case TFileFormatType::FORMAT_CSV_DEFLATE: case TFileFormatType::FORMAT_CSV_GZ: case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: case TFileFormatType::FORMAT_CSV_LZO: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_JSON: @@ -81,4 +87,4 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) { } return false; } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index ec32f7345188d7..d3fed13c53c917 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -340,6 +340,8 @@ Status CsvReader::init_reader(bool is_load) { [[fallthrough]]; case TFileFormatType::FORMAT_CSV_LZ4FRAME: [[fallthrough]]; + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: + [[fallthrough]]; case TFileFormatType::FORMAT_CSV_LZOP: [[fallthrough]]; case TFileFormatType::FORMAT_CSV_DEFLATE: @@ -494,7 +496,7 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::BZIP2; break; case TFileCompressType::LZ4FRAME: - compress_type = CompressType::LZ4FRAME; + compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK : CompressType::LZ4FRAME; break; case TFileCompressType::LZ4BLOCK: compress_type = CompressType::LZ4BLOCK; @@ -519,7 +521,7 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::BZIP2; break; case TFileFormatType::FORMAT_CSV_LZ4FRAME: - compress_type = CompressType::LZ4FRAME; + compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK : CompressType::LZ4FRAME; break; case TFileFormatType::FORMAT_CSV_LZ4BLOCK: compress_type = CompressType::LZ4BLOCK; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 055ec224a3e6c7..1b3e3d0315614e 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -755,6 +755,7 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_GZ: case TFileFormatType::FORMAT_CSV_BZ2: case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: case TFileFormatType::FORMAT_PROTO: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 8d9c2c6b0c07aa..09b94e3bba2b9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -599,6 +599,7 @@ public static boolean isCsvFormat(TFileFormatType fileFormatType) { || fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE || fileFormatType == TFileFormatType.FORMAT_CSV_GZ || fileFormatType == TFileFormatType.FORMAT_CSV_LZ4FRAME + || fileFormatType == TFileFormatType.FORMAT_CSV_LZ4BLOCK || fileFormatType == TFileFormatType.FORMAT_CSV_LZO || fileFormatType == TFileFormatType.FORMAT_CSV_LZOP || fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN; From 5908b9bcb90d4d01e2043a035b9b41de99d1565e Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 22 Aug 2023 16:29:33 +0800 Subject: [PATCH 06/12] 6 --- be/src/exec/decompressor.cpp | 6 ++- be/src/vec/exec/format/csv/csv_reader.cpp | 48 ++++++++++++----------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index d44853ee0ea089..face2043a63a6c 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -380,9 +380,11 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t // Decompress int uncompressed_len = LZ4_decompress_safe(reinterpret_cast(src), - reinterpret_cast(output), compressed_len, remaining_output_size); + reinterpret_cast(output), compressed_len, + remaining_output_size); if (uncompressed_len < 0) { - return Status::InternalError("lz4 block decompress failed. uncompressed_len < 0: {}", uncompressed_len); + return Status::InternalError("lz4 block decompress failed. uncompressed_len < 0: {}", + uncompressed_len); } output += uncompressed_len; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index d3fed13c53c917..220108f6a096f1 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -400,27 +400,27 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); size_t rows = 0; - if (_push_down_agg_type == TPushAggOp::type::COUNT) { - while (rows < batch_size && !_line_reader_eof) { - const uint8_t* ptr = nullptr; - size_t size = 0; - RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); - if (_skip_lines > 0) { - _skip_lines--; - continue; - } - if (size == 0) { - // Read empty row, just continue - continue; - } - ++rows; - } - - for (auto& col : block->mutate_columns()) { - col->resize(rows); - } - - } else { + if (_push_down_agg_type == TPushAggOp::type::COUNT) { + while (rows < batch_size && !_line_reader_eof) { + const uint8_t* ptr = nullptr; + size_t size = 0; + RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); + if (_skip_lines > 0) { + _skip_lines--; + continue; + } + if (size == 0) { + // Read empty row, just continue + continue; + } + ++rows; + } + + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + + } else { auto columns = block->mutate_columns(); while (rows < batch_size && !_line_reader_eof) { const uint8_t* ptr = nullptr; @@ -496,7 +496,8 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::BZIP2; break; case TFileCompressType::LZ4FRAME: - compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK : CompressType::LZ4FRAME; + compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK + : CompressType::LZ4FRAME; break; case TFileCompressType::LZ4BLOCK: compress_type = CompressType::LZ4BLOCK; @@ -521,7 +522,8 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::BZIP2; break; case TFileFormatType::FORMAT_CSV_LZ4FRAME: - compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK : CompressType::LZ4FRAME; + compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK + : CompressType::LZ4FRAME; break; case TFileFormatType::FORMAT_CSV_LZ4BLOCK: compress_type = CompressType::LZ4BLOCK; From aab7b117df0d20e86b30e516e02789b79622e641 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 22 Aug 2023 16:54:41 +0800 Subject: [PATCH 07/12] 2 --- be/src/common/config.h | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 815e387a6aef44..2fe483d5c96456 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1124,7 +1124,6 @@ DECLARE_mBool(enable_merge_on_write_correctness_check); // The secure path with user files, used in the `local` table function. DECLARE_mString(user_files_secure_path); -<<<<<<< HEAD // This threshold determines how many partitions will be allocated for window function get topn. // and if this threshold is exceeded, the remaining data will be pass through to other node directly. DECLARE_Int32(partition_topn_partition_threshold); From e6fc3cbee9920efde46bb4565296744b050f3dc4 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 22 Aug 2023 23:47:24 +0800 Subject: [PATCH 08/12] add snappy --- be/src/exec/decompressor.cpp | 107 ++++++++++++++++-- be/src/exec/decompressor.h | 22 +++- be/src/service/internal_service.cpp | 1 + be/src/util/load_util.cpp | 3 + be/src/vec/exec/format/csv/csv_reader.cpp | 21 +++- be/src/vec/exec/format/csv/csv_reader.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 1 + .../org/apache/doris/common/util/Util.java | 5 + gensrc/thrift/PlanNodes.thrift | 6 +- .../hive/test_compress_type.out | 6 +- .../hive/test_compress_type.groovy | 8 +- 11 files changed, 161 insertions(+), 21 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index face2043a63a6c..964654132a3a29 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -45,6 +45,9 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom case CompressType::LZ4BLOCK: *decompressor = new Lz4BlockDecompressor(); break; + case CompressType::SNAPPYBLOCK: + *decompressor = new SnappyBlockDecompressor(); + break; #ifdef DORIS_WITH_LZO case CompressType::LZOP: *decompressor = new LzopDecompressor(); @@ -62,6 +65,10 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom return st; } +uint32_t Decompressor::_read_int32(uint8_t* buf) { + return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; +} + std::string Decompressor::debug_info() { return "Decompressor"; } @@ -332,10 +339,6 @@ Status Lz4BlockDecompressor::init() { return Status::OK(); } -uint32_t Lz4BlockDecompressor::_read_int32(uint8_t* buf) { - return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; -} - Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output, size_t output_max_len, size_t* decompressed_len, bool* stream_end, @@ -358,7 +361,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc while (remaining_input_size > 0) { // Read uncompressed size - uint32_t uncompressed_block_len = _read_int32(src); + uint32_t uncompressed_block_len = Decompressor::_read_int32(src); int64_t remaining_output_size = output_max_len - uncompressed_total_len; if (remaining_output_size < uncompressed_block_len) { // Need more output buffer @@ -368,7 +371,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t // Read compressed size size_t tmp_src_size = remaining_input_size - sizeof(uint32_t); - size_t compressed_len = _read_int32(src + sizeof(uint32_t)); + size_t compressed_len = Decompressor::_read_int32(src + sizeof(uint32_t)); if (compressed_len == 0 || compressed_len > tmp_src_size) { // Need more input data *more_input_bytes = compressed_len - tmp_src_size; @@ -382,15 +385,15 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t int uncompressed_len = LZ4_decompress_safe(reinterpret_cast(src), reinterpret_cast(output), compressed_len, remaining_output_size); - if (uncompressed_len < 0) { - return Status::InternalError("lz4 block decompress failed. uncompressed_len < 0: {}", - uncompressed_len); + if (uncompressed_len < 0 || uncompressed_len != uncompressed_block_len) { + return Status::InternalError( + "lz4 block decompress failed. uncompressed_len: {}, expected: {}", + uncompressed_len, uncompressed_block_len); } output += uncompressed_len; src += compressed_len; remaining_input_size -= compressed_len; - uncompressed_block_len -= uncompressed_len; uncompressed_total_len += uncompressed_len; } @@ -408,4 +411,88 @@ std::string Lz4BlockDecompressor::debug_info() { return ss.str(); } +/// SnappyBlockDecompressor +Status SnappyBlockDecompressor::init() { + return Status::OK(); +} + +Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, + size_t* input_bytes_read, uint8_t* output, + size_t output_max_len, size_t* decompressed_len, + bool* stream_end, size_t* more_input_bytes, + size_t* more_output_bytes) { + uint8_t* src = input; + size_t remaining_input_size = input_len; + int64_t uncompressed_total_len = 0; + *input_bytes_read = 0; + + // The hadoop snappy codec is as: + // <4 byte big endian uncompressed size> + // <4 byte big endian compressed size> + // + // .... + // <4 byte big endian uncompressed size> + // <4 byte big endian compressed size> + // + // + // See: + // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc + while (remaining_input_size > 0) { + // Read uncompressed size + uint32_t uncompressed_block_len = Decompressor::_read_int32(src); + int64_t remaining_output_size = output_max_len - uncompressed_total_len; + if (remaining_output_size < uncompressed_block_len) { + // Need more output buffer + *more_output_bytes = uncompressed_block_len - remaining_output_size; + break; + } + + // Read compressed size + size_t tmp_src_size = remaining_input_size - sizeof(uint32_t); + size_t compressed_len = _read_int32(src + sizeof(uint32_t)); + if (compressed_len == 0 || compressed_len > tmp_src_size) { + // Need more input data + *more_input_bytes = compressed_len - tmp_src_size; + break; + } + + src += 2 * sizeof(uint32_t); + remaining_input_size -= 2 * sizeof(uint32_t); + + // ATTN: the uncompressed len from GetUncompressedLength() is same as + // uncompressed_block_len, so I think it is unnecessary to get it again. + // Get uncompressed len from snappy + // size_t uncompressed_len; + // if (!snappy::GetUncompressedLength(reinterpret_cast(src), + // compressed_len, &uncompressed_len)) { + // return Status::InternalError("snappy block decompress failed to get uncompressed len"); + // } + + // Decompress + if (!snappy::RawUncompress(reinterpret_cast(src), compressed_len, + reinterpret_cast(output))) { + return Status::InternalError("snappy block decompress failed. uncompressed_len: {}", + uncompressed_block_len); + } + + output += uncompressed_block_len; + src += compressed_len; + remaining_input_size -= compressed_len; + uncompressed_total_len += uncompressed_block_len; + } + + *input_bytes_read += (input_len - remaining_input_size); + *decompressed_len = uncompressed_total_len; + // If no more input and output need, means this is the end of a compressed block + *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0); + + return Status::OK(); +} + +std::string SnappyBlockDecompressor::debug_info() { + std::stringstream ss; + ss << "SnappyBlockDecompressor."; + return ss.str(); +} + } // namespace doris diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h index 0bfd141c4dfdc8..0a89546ea9fb16 100644 --- a/be/src/exec/decompressor.h +++ b/be/src/exec/decompressor.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -34,7 +35,7 @@ namespace doris { -enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP, LZ4BLOCK }; +enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP, LZ4BLOCK, SNAPPYBLOCK }; class Decompressor { public: @@ -68,6 +69,8 @@ class Decompressor { protected: virtual Status init() = 0; + static uint32_t _read_int32(uint8_t* buf); + Decompressor(CompressType ctype) : _ctype(ctype) {} CompressType _ctype; @@ -154,7 +157,22 @@ class Lz4BlockDecompressor : public Decompressor { friend class Decompressor; Lz4BlockDecompressor() : Decompressor(CompressType::LZ4FRAME) {} Status init() override; - uint32_t _read_int32(uint8_t* buf); +}; + +class SnappyBlockDecompressor : public Decompressor { +public: + ~SnappyBlockDecompressor() override {} + + Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output, + size_t output_max_len, size_t* decompressed_len, bool* stream_end, + size_t* more_input_bytes, size_t* more_output_bytes) override; + + std::string debug_info() override; + +private: + friend class Decompressor; + SnappyBlockDecompressor() : Decompressor(CompressType::SNAPPYBLOCK) {} + Status init() override; }; #ifdef DORIS_WITH_LZO diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 501ffd476cfd19..b458af6336317e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -633,6 +633,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c case TFileFormatType::FORMAT_CSV_BZ2: case TFileFormatType::FORMAT_CSV_LZ4FRAME: case TFileFormatType::FORMAT_CSV_LZ4BLOCK: + case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: { // file_slots is no use diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp index bc34cece657888..3c51a6ca107408 100644 --- a/be/src/util/load_util.cpp +++ b/be/src/util/load_util.cpp @@ -54,6 +54,9 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co } else if (iequal(compress_type_str, "LZOP")) { *format_type = TFileFormatType::FORMAT_CSV_LZOP; *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "SNAPPY_BLOCK")) { + *format_type = TFileFormatType::FORMAT_CSV_SNAPPYBLOCK; + *compress_type = TFileCompressType::SNAPPYBLOCK; } else if (iequal(compress_type_str, "DEFLATE")) { *format_type = TFileFormatType::FORMAT_CSV_DEFLATE; *compress_type = TFileCompressType::DEFLATE; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 220108f6a096f1..3bffa10aa491e0 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -344,6 +344,8 @@ Status CsvReader::init_reader(bool is_load) { [[fallthrough]]; case TFileFormatType::FORMAT_CSV_LZOP: [[fallthrough]]; + case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: + [[fallthrough]]; case TFileFormatType::FORMAT_CSV_DEFLATE: _line_reader = NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(), @@ -400,6 +402,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); size_t rows = 0; + bool success = false; if (_push_down_agg_type == TPushAggOp::type::COUNT) { while (rows < batch_size && !_line_reader_eof) { const uint8_t* ptr = nullptr; @@ -413,6 +416,8 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { // Read empty row, just continue continue; } + + RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); ++rows; } @@ -435,6 +440,10 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { continue; } + RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); + if (!success) { + continue; + } RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); } } @@ -505,6 +514,9 @@ Status CsvReader::_create_decompressor() { case TFileCompressType::DEFLATE: compress_type = CompressType::DEFLATE; break; + case TFileCompressType::SNAPPYBLOCK: + compress_type = CompressType::SNAPPYBLOCK; + break; default: return Status::InternalError("unknown compress type: {}", _file_compress_type); } @@ -534,6 +546,9 @@ Status CsvReader::_create_decompressor() { case TFileFormatType::FORMAT_CSV_DEFLATE: compress_type = CompressType::DEFLATE; break; + case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: + compress_type = CompressType::SNAPPYBLOCK; + break; default: return Status::InternalError("unknown format type: {}", _file_format_type); } @@ -587,7 +602,7 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, return Status::OK(); } -Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { +Status CsvReader::_validate_line(const Slice& line, bool* success) { if (!_is_proto_format && !validate_utf8(line.data, line.size)) { if (!_is_load) { return Status::InternalError("Only support csv data in utf8 codec"); @@ -605,7 +620,11 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { return Status::OK(); } } + *success = true; + return Status::OK(); +} +Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { _split_line(line); if (_is_load) { diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 5721bbd9291a38..398d8e916afbf3 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -221,6 +221,8 @@ class CsvReader : public GenericReader { // TODO(ftw): parse type Status _parse_col_types(size_t col_nums, std::vector* col_types); + Status _validate_line(const Slice& line, bool* success); + RuntimeState* _state; RuntimeProfile* _profile; ScannerCounter* _counter; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 1b3e3d0315614e..a94289b31294d0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -758,6 +758,7 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_LZ4BLOCK: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: case TFileFormatType::FORMAT_PROTO: { _cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range, _file_slot_descs, _io_ctx.get()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 09b94e3bba2b9a..0dff4b6caaba08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -550,6 +550,8 @@ public static TFileFormatType getFileFormatType(String path) { return TFileFormatType.FORMAT_CSV_LZO; } else if (lowerCasePath.endsWith(".deflate")) { return TFileFormatType.FORMAT_CSV_DEFLATE; + } else if (lowerCasePath.endsWith(".snappy")) { + return TFileFormatType.FORMAT_CSV_SNAPPYBLOCK; } else { return TFileFormatType.FORMAT_CSV_PLAIN; } @@ -575,6 +577,8 @@ public static TFileCompressType inferFileCompressTypeByPath(String path) { return TFileCompressType.LZO; } else if (lowerCasePath.endsWith(".deflate")) { return TFileCompressType.DEFLATE; + } else if (lowerCasePath.endsWith(".snappy")) { + return TFileCompressType.SNAPPYBLOCK; } else { return TFileCompressType.PLAIN; } @@ -600,6 +604,7 @@ public static boolean isCsvFormat(TFileFormatType fileFormatType) { || fileFormatType == TFileFormatType.FORMAT_CSV_GZ || fileFormatType == TFileFormatType.FORMAT_CSV_LZ4FRAME || fileFormatType == TFileFormatType.FORMAT_CSV_LZ4BLOCK + || fileFormatType == TFileFormatType.FORMAT_CSV_SNAPPYBLOCK || fileFormatType == TFileFormatType.FORMAT_CSV_LZO || fileFormatType == TFileFormatType.FORMAT_CSV_LZOP || fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5c5825607bcfcf..cc3a3bf2c30d20 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -116,7 +116,8 @@ enum TFileFormatType { FORMAT_PROTO, FORMAT_JNI, FORMAT_AVRO, - FORMAT_CSV_LZ4BLOCK + FORMAT_CSV_LZ4BLOCK, + FORMAT_CSV_SNAPPYBLOCK, } // In previous versions, the data compression format and file format were stored together, as TFileFormatType, @@ -133,7 +134,8 @@ enum TFileCompressType { LZ4FRAME, DEFLATE, LZOP, - LZ4BLOCK + LZ4BLOCK, + SNAPPYBLOCK } struct THdfsConf { diff --git a/regression-test/data/external_table_p2/hive/test_compress_type.out b/regression-test/data/external_table_p2/hive/test_compress_type.out index a80b6b0bf21c6d..a95bf1f0dd3c02 100644 --- a/regression-test/data/external_table_p2/hive/test_compress_type.out +++ b/regression-test/data/external_table_p2/hive/test_compress_type.out @@ -3,7 +3,7 @@ 600005 -- !q22 -- -1410009 +1510010 -- !q23 -- 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 2023-08-21 @@ -20,12 +20,13 @@ 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 snappy -- !q31 -- 600005 -- !q32 -- -1410009 +1510010 -- !q33 -- 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 2023-08-21 @@ -42,4 +43,5 @@ 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 mix 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain 4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 plain +4611870011201662970 0 HD Tube 5* 1 2014-03-22T05:11:29 2014-03-22 598875 4243808759 92f6fe1be9b9773206d6b63e50feb470 196 2314158381335918424 0 3 3 http://public_search yandex.ru.livemaster 0 0 [] [4,15,333,3912,14512,12818] [18,348,1010] [] 1846 952 29 10 1 0.77 0 0 24 73d7 1 1 0 0 3238011 0 0 0 0 1119 641 157 2014-03-22T19:51:48 0 0 0 0 utf-8 330 0 0 0 7774109565808082252 11274076 0 0 0 0 0 E 2014-03-22T11:54:54 55 2 3 4 6 [105,11,9,88,45,14,98,72,3,925,2193,6,25,1] 3137666015 cc184643699dccab8d5d4af796c47449 -1 -1 -1 nD Tp 0 -1 0 0 81 0 0 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 07d21f 0 [] 0 15284527577228392792 14270691585016129648 0 0 [] [] [] [] [] \N c1889e2b9ad1e219ed04c0e9624b5139 1404 0 snappy diff --git a/regression-test/suites/external_table_p2/hive/test_compress_type.groovy b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy index c033d7070941c4..d02ff3fbd0c47d 100644 --- a/regression-test/suites/external_table_p2/hive/test_compress_type.groovy +++ b/regression-test/suites/external_table_p2/hive/test_compress_type.groovy @@ -39,8 +39,8 @@ suite("test_compress_type", "p2,external,hive,external_remote,external_remote_hi sql """set file_split_size=0""" explain { sql("select count(*) from test_compress_partitioned") - contains "inputSplitNum=15, totalFileSize=706873074" - contains "partition=7/7" + contains "inputSplitNum=16, totalFileSize=734675596, scanRanges=16" + contains "partition=8/8" } qt_q21 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" qt_q22 """select count(*) from test_compress_partitioned""" @@ -49,8 +49,8 @@ suite("test_compress_type", "p2,external,hive,external_remote,external_remote_hi sql """set file_split_size=8388608""" explain { sql("select count(*) from test_compress_partitioned") - contains "inputSplitNum=81, totalFileSize=706873074" - contains "partition=7/7" + contains "inputSplitNum=82, totalFileSize=734675596, scanRanges=82" + contains "partition=8/8" } qt_q31 """select count(*) from test_compress_partitioned where dt="gzip" or dt="mix"""" From 53bc5f93863546f8e8f1c124f971cc99ef99bf58 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 22 Aug 2023 23:57:51 +0800 Subject: [PATCH 09/12] 7 --- be/src/vec/exec/format/csv/csv_reader.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 398d8e916afbf3..2659703f8dce6e 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -221,6 +221,10 @@ class CsvReader : public GenericReader { // TODO(ftw): parse type Status _parse_col_types(size_t col_nums, std::vector* col_types); + // check the utf8 encoding of a line. + // return error status to stop processing. + // If return Status::OK but "success" is false, which means this is load request + // and the line is skipped as unqualified row, and the process should continue. Status _validate_line(const Slice& line, bool* success); RuntimeState* _state; From d526979f1b75a705ab39fc8ddc53ef9ccaa11fc9 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 23 Aug 2023 20:38:16 +0800 Subject: [PATCH 10/12] remove lz4 config --- be/src/common/config.cpp | 2 -- be/src/common/config.h | 8 -------- be/src/util/load_util.cpp | 12 +++++------- be/src/vec/exec/format/csv/csv_reader.cpp | 6 ++---- .../apache/doris/planner/external/HiveScanNode.java | 11 +++++++++++ 5 files changed, 18 insertions(+), 21 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b03f5236161795..a8307e7de1bfdd 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1069,8 +1069,6 @@ DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); DEFINE_Int32(partition_topn_partition_threshold, "1024"); -DEFINE_mString(default_lz4_codec, "block"); - #ifdef BE_TEST // test s3 DEFINE_String(test_s3_resource, "resource"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 2fe483d5c96456..8d978c768e7c36 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1128,14 +1128,6 @@ DECLARE_mString(user_files_secure_path); // and if this threshold is exceeded, the remaining data will be pass through to other node directly. DECLARE_Int32(partition_topn_partition_threshold); -// The default lz4 codec. Options: frame, block -// In previous, we use lz4 "frame" as the default codec -// but the hadoop use lz4 block to write data -// So in v2.0, change the default codec to "block" -// So that we can read lz4 data from hive table by default. -// TODO: find a way to auto detect this. -DECLARE_mString(default_lz4_codec); - #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp index 3c51a6ca107408..1277132378b85a 100644 --- a/be/src/util/load_util.cpp +++ b/be/src/util/load_util.cpp @@ -44,13 +44,11 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *format_type = TFileFormatType::FORMAT_CSV_BZ2; *compress_type = TFileCompressType::BZ2; } else if (iequal(compress_type_str, "LZ4")) { - if (config::default_lz4_codec == "block") { - *format_type = TFileFormatType::FORMAT_CSV_LZ4BLOCK; - *compress_type = TFileCompressType::LZ4BLOCK; - } else { - *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; - *compress_type = TFileCompressType::LZ4FRAME; - } + *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + *compress_type = TFileCompressType::LZ4FRAME; + } else if (iequal(compress_type_str, "LZ4_BLOCK")) { + *format_type = TFileFormatType::FORMAT_CSV_LZ4BLOCK; + *compress_type = TFileCompressType::LZ4BLOCK; } else if (iequal(compress_type_str, "LZOP")) { *format_type = TFileFormatType::FORMAT_CSV_LZOP; *compress_type = TFileCompressType::LZO; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 3bffa10aa491e0..64749e935cf072 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -505,8 +505,7 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::BZIP2; break; case TFileCompressType::LZ4FRAME: - compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK - : CompressType::LZ4FRAME; + compress_type = CompressType::LZ4FRAME; break; case TFileCompressType::LZ4BLOCK: compress_type = CompressType::LZ4BLOCK; @@ -534,8 +533,7 @@ Status CsvReader::_create_decompressor() { compress_type = CompressType::BZIP2; break; case TFileFormatType::FORMAT_CSV_LZ4FRAME: - compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK - : CompressType::LZ4FRAME; + compress_type = CompressType::LZ4FRAME; break; case TFileFormatType::FORMAT_CSV_LZ4BLOCK: compress_type = CompressType::LZ4BLOCK; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 7178a585ff1739..5d0033c90ea806 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -51,6 +51,7 @@ import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; @@ -386,4 +387,14 @@ public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { return !col.isAllowNull(); } + + @Override + protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException { + TFileCompressType compressType = super.getFileCompressType(fileSplit); + // hadoop use lz4 blocked codec + if (compressType == TFileCompressType.LZ4FRAME) { + compressType = TFileCompressType.LZ4BLOCK; + } + return compressType; + } } From 0f02a56e65f54ba98747853657e957500464ccf9 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 25 Aug 2023 23:50:28 +0800 Subject: [PATCH 11/12] fix --- be/src/vec/exec/scan/vfile_scanner.cpp | 34 ++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index a94289b31294d0..505b6807b439ca 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -261,22 +261,26 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result. if (read_rows > 0) { - // Convert the src block columns type to string in-place. - RETURN_IF_ERROR(_cast_to_input_block(block)); - // FileReader can fill partition and missing columns itself - if (!_cur_reader->fill_all_columns()) { - // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) - RETURN_IF_ERROR(_fill_columns_from_path(read_rows)); - // Fill columns not exist in file with null or default value - RETURN_IF_ERROR(_fill_missing_columns(read_rows)); + // If the push_down_agg_type is COUNT, no need to do the rest, + // because we only save a number in block. + if (_parent->get_push_down_agg_type() != TPushAggOp::type::COUNT) { + // Convert the src block columns type to string in-place. + RETURN_IF_ERROR(_cast_to_input_block(block)); + // FileReader can fill partition and missing columns itself + if (!_cur_reader->fill_all_columns()) { + // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) + RETURN_IF_ERROR(_fill_columns_from_path(read_rows)); + // Fill columns not exist in file with null or default value + RETURN_IF_ERROR(_fill_missing_columns(read_rows)); + } + // Apply _pre_conjunct_ctxs to filter src block. + RETURN_IF_ERROR(_pre_filter_src_block()); + // Convert src block to output block (dest block), string to dest data type and apply filters. + RETURN_IF_ERROR(_convert_to_output_block(block)); + // Truncate char columns or varchar columns if size is smaller than file columns + // or not found in the file column schema. + RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); } - // Apply _pre_conjunct_ctxs to filter src block. - RETURN_IF_ERROR(_pre_filter_src_block()); - // Convert src block to output block (dest block), string to dest data type and apply filters. - RETURN_IF_ERROR(_convert_to_output_block(block)); - // Truncate char columns or varchar columns if size is smaller than file columns - // or not found in the file column schema. - RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); break; } } while (true); From baeefcfd27f48bb8b209e8d38dae48a0d9a67bd3 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 26 Aug 2023 10:15:26 +0800 Subject: [PATCH 12/12] lz4 --- be/src/exec/decompressor.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h index 0a89546ea9fb16..2b07e71139fb83 100644 --- a/be/src/exec/decompressor.h +++ b/be/src/exec/decompressor.h @@ -18,7 +18,9 @@ #pragma once #include +#include #include +#include #include #include #include