From 8278cd01d5b698aedd289c425327096a46894406 Mon Sep 17 00:00:00 2001 From: hui lai Date: Thu, 17 Jul 2025 11:40:29 +0800 Subject: [PATCH] [fix](csv reader) fix data loss when concurrency read using multi char line delimiter (#53374) Multiple concurrent split file locations will be determined in plan phase, if the split point happens to be in the middle of the multi char line delimiter: - The previous concurrent will read the complete row1 and read a little more to read the line delimiter. - The latter concurrency will start reading from half of the multi char line delimiter, and row2 is the first line of this concurrency, but the first line in the middle range is always discarded, so row2 will be lost. --- be/src/vec/exec/format/csv/csv_reader.cpp | 8 +- .../ddl/test_multi_char_line_delimiter.sql | 76 ++++++++++++++++++ .../test_multi_char_line_delimiter.groovy | 77 +++++++++++++++++++ 3 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql create mode 100644 regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 4c5adf6f4b15d7..3e3ee25284fe17 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -300,8 +300,12 @@ Status CsvReader::init_reader(bool is_load) { _file_compress_type != TFileCompressType::PLAIN)) { return Status::InternalError("For now we do not support split compressed file"); } - start_offset -= 1; - _size += 1; + // pre-read to promise first line skipped always read + int64_t pre_read_len = std::min( + static_cast(_params.file_attributes.text_params.line_delimiter.size()), + start_offset); + start_offset -= pre_read_len; + _size += pre_read_len; // not first range will always skip one line _skip_lines = 1; } diff --git a/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql new file mode 100644 index 00000000000000..c0be1514f90a62 --- /dev/null +++ b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql @@ -0,0 +1,76 @@ +CREATE TABLE `test_multi_char_line_delimiter` ( + `col1` bigint NULL, + `col2` bigint NULL, + `col3` varchar(765) NULL, + `col4` varchar(765) NULL, + `col5` bigint NULL, + `col6` bigint NULL, + `col7` int NULL, + `col8` int NULL, + `col9` tinyint NULL, + `col10` bigint NULL, + `col11` datetime NULL, + `col12` bigint NULL, + `col13` bigint NULL, + `col14` int NULL, + `col15` bigint MIN NULL, + `col16` decimal(19,4) SUM NULL, + `col17` decimal(19,4) SUM NULL, + `col18` decimal(19,4) SUM NULL, + `col19` decimal(19,4) SUM NULL, + `col20` decimal(19,4) SUM NULL, + `col21` decimal(19,4) SUM NULL, + `col22` decimal(19,4) SUM NULL, + `col23` decimal(19,4) SUM NULL, + `col24` decimal(19,4) SUM NULL, + `col25` decimal(19,4) SUM NULL, + `col26` decimal(19,4) SUM NULL, + `col27` decimal(19,4) SUM NULL, + `col28` decimal(19,4) SUM NULL, + `col29` decimal(19,4) SUM NULL, + `col30` decimal(19,4) SUM NULL, + `col31` decimal(19,4) SUM NULL, + `col32` decimal(19,4) SUM NULL, + `col33` decimal(19,4) SUM NULL DEFAULT "0", + `col34` decimal(19,4) SUM NULL DEFAULT "0", + `col35` decimal(19,4) SUM NULL DEFAULT "0", + `col36` decimal(19,4) SUM NULL DEFAULT "0", + `col37` decimal(19,4) SUM NULL DEFAULT "0", + `col38` decimal(19,4) SUM NULL DEFAULT "0", + `col39` decimal(19,4) SUM NULL DEFAULT "0", + `col40` decimal(19,4) SUM NULL DEFAULT "0", + `col41` decimal(19,4) SUM NULL DEFAULT "0", + `col42` decimal(19,4) SUM NULL DEFAULT "0", + `col43` decimal(19,4) SUM NULL DEFAULT "0", + `col44` decimal(19,4) SUM NULL DEFAULT "0", + `col45` decimal(19,4) SUM NULL DEFAULT "0", + `col46` decimal(19,4) SUM NULL DEFAULT "0", + `col47` decimal(19,4) SUM NULL DEFAULT "0", + `col48` decimal(19,4) SUM NULL DEFAULT "0", + `col49` decimal(19,4) SUM NULL DEFAULT "0", + `col50` decimal(19,4) SUM NULL DEFAULT "0", + `col51` decimal(19,4) SUM NULL, + `col52` datetime MIN NULL, + `col53` bigint MIN NULL, + `col54` datetime MAX NULL, + `col55` bigint MAX NULL, + `col56` tinyint MIN NULL, + `col57` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY +) ENGINE=OLAP +AGGREGATE KEY(`col1`, `col2`, `col3`, `col4`, `col5`, `col6`, `col7`, `col8`, `col9`, `col10`, `col11`, `col12`, `col13`, `col14`) +PARTITION BY RANGE(`col12`, `col11`) +(PARTITION p_default VALUES [("0", '1900-01-01 00:00:00'), ("99999", '2030-01-01 00:00:00'))) +DISTRIBUTED BY HASH(`col8`) BUCKETS 1 +PROPERTIES ( +"file_cache_ttl_seconds" = "0", +"is_being_synced" = "false", +"storage_medium" = "hdd", +"storage_format" = "V2", +"inverted_index_storage_format" = "V2", +"light_schema_change" = "true", +"disable_auto_compaction" = "false", +"enable_single_replica_compaction" = "false", +"group_commit_interval_ms" = "10000", +"group_commit_data_bytes" = "134217728", +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file diff --git a/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy new file mode 100644 index 00000000000000..184b3d5cf07da8 --- /dev/null +++ b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_multi_char_line_delimiter", "p2") { + def s3BucketName = getS3BucketName() + def s3Endpoint = getS3Endpoint() + def s3Region = getS3Region() + def ak = getS3AK() + def sk = getS3SK() + def tableName = "test_multi_char_line_delimiter" + def label = "test_multi_char_line_delimiter" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${s3BucketName}/regression/load/data/test_multi_char_line_delimiter*.csv") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "\tcolumn_separator" + LINES TERMINATED BY "\nline_delimiter" + FORMAT AS CSV + (`col1`,`col2`,`col3`,`col4`,`col5`,`col6`,`col7`,`col8`,`col9`,`col10`,`col11`,`col12`,`col13`,`col14`,`col15`,`col16`,`col17`,`col18`,`col19`,`col20`,`col21`,`col22`,`col23`,`col24`,`col25`,`col26`,`col27`,`col28`,`col29`,`col30`,`col31`,`col32`,`col33`,`col34`,`col35`,`col36`,`col37`,`col38`,`col39`,`col40`,`col41`,`col42`,`col43`,`col44`,`col45`,`col46`,`col47`,`col48`,`col49`,`col50`,`col51`,`col52`,`col53`,`col54`,`col55`,`col56`,`col57`) + SET(`col1`=`col1`,`col2`=`col2`,`col3`=`col3`,`col4`=`col4`,`col5`=`col5`,`col6`=`col6`,`col7`=`col7`,`col8`=`col8`,`col9`=`col9`,`col10`=`col10`,`col11`=`col11`,`col12`=`col12`,`col13`=`col13`,`col14`=`col14`,`col15`=`col15`,`col16`=`col16`,`col17`=`col17`,`col18`=`col18`,`col19`=`col19`,`col20`=`col20`,`col21`=`col21`,`col22`=`col22`,`col23`=`col23`,`col24`=`col24`,`col25`=`col25`,`col26`=`col26`,`col27`=`col27`,`col28`=`col28`,`col29`=`col29`,`col30`=`col30`,`col31`=`col31`,`col32`=`col32`,`col33`=`col33`,`col34`=`col34`,`col35`=`col35`,`col36`=`col36`,`col37`=`col37`,`col38`=`col38`,`col39`=`col39`,`col40`=`col40`,`col41`=`col41`,`col42`=`col42`,`col43`=`col43`,`col44`=`col44`,`col45`=`col45`,`col46`=`col46`,`col47`=`col47`,`col48`=`col48`,`col49`=`col49`,`col50`=`col50`,`col51`=`col51`,`col52`=`col52`,`col53`=`col53`,`col54`=`col54`,`col55`=`col55`,`col56`=`col56`,col57=bitmap_from_string(col57)) + ) + WITH S3 + ( + "s3.region" = "${s3Region}", + "s3.endpoint" = "${s3Endpoint}", + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}" + ) + PROPERTIES + ( + "timeout" = "3600", + "load_parallelism" = "4" + ); + """ + + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + def String[][] result = sql """ show load where label="$label"; """ + logger.info("Load status: " + result[0][2] + ", label: $label") + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + label) + break; + } + if (result[0][2].equals("CANCELLED")) { + assertTrue(false, "load failed: $result") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $label") + } + } + + def result = sql """ select count(*) from ${tableName}; """ + logger.info("result: ${result[0][0]}") + assertTrue(result[0][0] == 2060625, "load result is not correct") + sql """ DROP TABLE IF EXISTS ${tableName} """ +} \ No newline at end of file