diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 99c188d38e183c..2f1a3acc0302be 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -245,8 +245,12 @@ Status CsvReader::init_reader(bool is_load) { _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) { return Status::InternalError("For now we do not support split compressed file"); } - _start_offset -= 1; - _size += 1; + // pre-read to promise first line skipped always read + int64_t pre_read_len = std::min( + static_cast(_params.file_attributes.text_params.line_delimiter.size()), + _start_offset); + _start_offset -= pre_read_len; + _size += pre_read_len; // not first range will always skip one line _skip_lines = 1; } diff --git a/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql new file mode 100644 index 00000000000000..c0be1514f90a62 --- /dev/null +++ b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql @@ -0,0 +1,76 @@ +CREATE TABLE `test_multi_char_line_delimiter` ( + `col1` bigint NULL, + `col2` bigint NULL, + `col3` varchar(765) NULL, + `col4` varchar(765) NULL, + `col5` bigint NULL, + `col6` bigint NULL, + `col7` int NULL, + `col8` int NULL, + `col9` tinyint NULL, + `col10` bigint NULL, + `col11` datetime NULL, + `col12` bigint NULL, + `col13` bigint NULL, + `col14` int NULL, + `col15` bigint MIN NULL, + `col16` decimal(19,4) SUM NULL, + `col17` decimal(19,4) SUM NULL, + `col18` decimal(19,4) SUM NULL, + `col19` decimal(19,4) SUM NULL, + `col20` decimal(19,4) SUM NULL, + `col21` decimal(19,4) SUM NULL, + `col22` decimal(19,4) SUM NULL, + `col23` decimal(19,4) SUM NULL, + `col24` decimal(19,4) SUM NULL, + `col25` decimal(19,4) SUM NULL, + `col26` decimal(19,4) SUM NULL, + `col27` decimal(19,4) SUM NULL, + `col28` decimal(19,4) SUM NULL, + `col29` decimal(19,4) SUM NULL, + `col30` decimal(19,4) SUM NULL, + `col31` decimal(19,4) SUM NULL, + `col32` decimal(19,4) SUM NULL, + `col33` decimal(19,4) SUM NULL DEFAULT "0", + `col34` decimal(19,4) SUM NULL DEFAULT "0", + `col35` decimal(19,4) SUM NULL DEFAULT "0", + `col36` decimal(19,4) SUM NULL DEFAULT "0", + `col37` decimal(19,4) SUM NULL DEFAULT "0", + `col38` decimal(19,4) SUM NULL DEFAULT "0", + `col39` decimal(19,4) SUM NULL DEFAULT "0", + `col40` decimal(19,4) SUM NULL DEFAULT "0", + `col41` decimal(19,4) SUM NULL DEFAULT "0", + `col42` decimal(19,4) SUM NULL DEFAULT "0", + `col43` decimal(19,4) SUM NULL DEFAULT "0", + `col44` decimal(19,4) SUM NULL DEFAULT "0", + `col45` decimal(19,4) SUM NULL DEFAULT "0", + `col46` decimal(19,4) SUM NULL DEFAULT "0", + `col47` decimal(19,4) SUM NULL DEFAULT "0", + `col48` decimal(19,4) SUM NULL DEFAULT "0", + `col49` decimal(19,4) SUM NULL DEFAULT "0", + `col50` decimal(19,4) SUM NULL DEFAULT "0", + `col51` decimal(19,4) SUM NULL, + `col52` datetime MIN NULL, + `col53` bigint MIN NULL, + `col54` datetime MAX NULL, + `col55` bigint MAX NULL, + `col56` tinyint MIN NULL, + `col57` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY +) ENGINE=OLAP +AGGREGATE KEY(`col1`, `col2`, `col3`, `col4`, `col5`, `col6`, `col7`, `col8`, `col9`, `col10`, `col11`, `col12`, `col13`, `col14`) +PARTITION BY RANGE(`col12`, `col11`) +(PARTITION p_default VALUES [("0", '1900-01-01 00:00:00'), ("99999", '2030-01-01 00:00:00'))) +DISTRIBUTED BY HASH(`col8`) BUCKETS 1 +PROPERTIES ( +"file_cache_ttl_seconds" = "0", +"is_being_synced" = "false", +"storage_medium" = "hdd", +"storage_format" = "V2", +"inverted_index_storage_format" = "V2", +"light_schema_change" = "true", +"disable_auto_compaction" = "false", +"enable_single_replica_compaction" = "false", +"group_commit_interval_ms" = "10000", +"group_commit_data_bytes" = "134217728", +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file diff --git a/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy new file mode 100644 index 00000000000000..184b3d5cf07da8 --- /dev/null +++ b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_multi_char_line_delimiter", "p2") { + def s3BucketName = getS3BucketName() + def s3Endpoint = getS3Endpoint() + def s3Region = getS3Region() + def ak = getS3AK() + def sk = getS3SK() + def tableName = "test_multi_char_line_delimiter" + def label = "test_multi_char_line_delimiter" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${s3BucketName}/regression/load/data/test_multi_char_line_delimiter*.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "\tcolumn_separator" + LINES TERMINATED BY "\nline_delimiter" + FORMAT AS CSV + (`col1`,`col2`,`col3`,`col4`,`col5`,`col6`,`col7`,`col8`,`col9`,`col10`,`col11`,`col12`,`col13`,`col14`,`col15`,`col16`,`col17`,`col18`,`col19`,`col20`,`col21`,`col22`,`col23`,`col24`,`col25`,`col26`,`col27`,`col28`,`col29`,`col30`,`col31`,`col32`,`col33`,`col34`,`col35`,`col36`,`col37`,`col38`,`col39`,`col40`,`col41`,`col42`,`col43`,`col44`,`col45`,`col46`,`col47`,`col48`,`col49`,`col50`,`col51`,`col52`,`col53`,`col54`,`col55`,`col56`,`col57`) + SET(`col1`=`col1`,`col2`=`col2`,`col3`=`col3`,`col4`=`col4`,`col5`=`col5`,`col6`=`col6`,`col7`=`col7`,`col8`=`col8`,`col9`=`col9`,`col10`=`col10`,`col11`=`col11`,`col12`=`col12`,`col13`=`col13`,`col14`=`col14`,`col15`=`col15`,`col16`=`col16`,`col17`=`col17`,`col18`=`col18`,`col19`=`col19`,`col20`=`col20`,`col21`=`col21`,`col22`=`col22`,`col23`=`col23`,`col24`=`col24`,`col25`=`col25`,`col26`=`col26`,`col27`=`col27`,`col28`=`col28`,`col29`=`col29`,`col30`=`col30`,`col31`=`col31`,`col32`=`col32`,`col33`=`col33`,`col34`=`col34`,`col35`=`col35`,`col36`=`col36`,`col37`=`col37`,`col38`=`col38`,`col39`=`col39`,`col40`=`col40`,`col41`=`col41`,`col42`=`col42`,`col43`=`col43`,`col44`=`col44`,`col45`=`col45`,`col46`=`col46`,`col47`=`col47`,`col48`=`col48`,`col49`=`col49`,`col50`=`col50`,`col51`=`col51`,`col52`=`col52`,`col53`=`col53`,`col54`=`col54`,`col55`=`col55`,`col56`=`col56`,col57=bitmap_from_string(col57)) + ) + WITH S3 + ( + "s3.region" = "${s3Region}", + "s3.endpoint" = "${s3Endpoint}", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}" + ) + PROPERTIES + ( + "timeout" = "3600", + "load_parallelism" = "4" + ); + """ + + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + def String[][] result = sql """ show load where label="$label"; """ + logger.info("Load status: " + result[0][2] + ", label: $label") + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + label) + break; + } + if (result[0][2].equals("CANCELLED")) { + assertTrue(false, "load failed: $result") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $label") + } + } + + def result = sql """ select count(*) from ${tableName}; """ + logger.info("result: ${result[0][0]}") + assertTrue(result[0][0] == 2060625, "load result is not correct") + sql """ DROP TABLE IF EXISTS ${tableName} """ +} \ No newline at end of file