diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index bd70a2fdab7469..d6df5cfbb71964 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -1397,7 +1397,21 @@ Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema std::unordered_map name_to_col_type; RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols)); for (const auto& [col_name, col_type] : name_to_col_type) { - _slot_lower_name_to_col_type.emplace(to_lower(col_name), col_type); + auto col_name_lower = to_lower(col_name); + if (_partition_col_descs.contains(col_name_lower)) { + /* + * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to + * generate columns of the corresponding type, which records the columns existing in the file. + * + * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will + * not match the slot type in `_output_tuple_desc`, causing an error when + * Serde `deserialize_one_cell_from_json` fills the partition values. + * + * So for partition column not need fill _slot_lower_name_to_col_type. + */ + continue; + } + _slot_lower_name_to_col_type.emplace(col_name_lower, col_type); } if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) { diff --git a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy index e4c6a5d2eedb9d..46fc29b2948b07 100644 --- a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy +++ b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy @@ -202,4 +202,219 @@ suite("test_load_columns_from_path", "load_p0") { } finally { sql """ DROP TABLE ${tableName} """ } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE ${tableName} ( + k1 INT, + k2 INT, + k3 INT + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + label = UUID.randomUUID().toString().replace("-", "2") + try { + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.parquet") + INTO TABLE ${tableName} + FORMAT AS "parquet" + (k1, k2, k3) + ) + WITH S3 + ( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${s3Region}" + ) + """ + + // Wait for load job to finish + def maxRetry = 60 + def result = "" + for (int i = 0; i < maxRetry; i++) { + result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'" + if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") { + break + } + sleep(1000) + } + + // Check load job state + assertEquals("FINISHED", result[0].State) + + // Verify the loaded data + def rowCount = sql "SELECT COUNT(*) FROM ${tableName}" + assertTrue(rowCount[0][0] > 0, "No data was loaded") + + def pathData = sql "SELECT * FROM ${tableName} LIMIT 1" + logger.info("path data 1: " + pathData) + // 1 2 3 + assertEquals(1, pathData[0][0]) + assertEquals(2, pathData[0][1]) + assertEquals(3, pathData[0][2]) + + } finally { + sql """ TRUNCATE TABLE ${tableName} """ + } + + label = UUID.randomUUID().toString().replace("-", "2") + + + try { + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.orc") + INTO TABLE ${tableName} + FORMAT AS "orc" + (k1, k3) + COLUMNS FROM PATH AS (k2) + ) + WITH S3 + ( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${s3Region}" + ) + """ + + // Wait for load job to finish + def maxRetry = 60 + def result = "" + for (int i = 0; i < maxRetry; i++) { + result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'" + if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") { + break + } + sleep(1000) + } + + // Check load job state + assertEquals("FINISHED", result[0].State) + + // Verify the loaded data + def rowCount = sql "SELECT COUNT(*) FROM ${tableName}" + assertTrue(rowCount[0][0] > 0, "No data was loaded") + + def pathData = sql "SELECT * FROM ${tableName} LIMIT 1" + logger.info("path data 2: " + pathData) + // 1 20 3 + assertEquals(1, pathData[0][0]) + assertEquals(20, pathData[0][1]) + assertEquals(3, pathData[0][2]) + + } finally { + sql """ TRUNCATE TABLE ${tableName} """ + } + + label = UUID.randomUUID().toString().replace("-", "2") + try { + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.parquet") + INTO TABLE ${tableName} + FORMAT AS "parquet" + (k2) + COLUMNS FROM PATH AS (k1,k3) + ) + WITH S3 + ( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${s3Region}" + ) + """ + + // Wait for load job to finish + def maxRetry = 60 + def result = "" + for (int i = 0; i < maxRetry; i++) { + result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'" + if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") { + break + } + sleep(1000) + } + + // Check load job state + assertEquals("FINISHED", result[0].State) + + // Verify the loaded data + def rowCount = sql "SELECT COUNT(*) FROM ${tableName}" + assertTrue(rowCount[0][0] > 0, "No data was loaded") + + def pathData = sql "SELECT * FROM ${tableName} LIMIT 1" + logger.info("path data 2: " + pathData) + // 10 2 30 + assertEquals(10, pathData[0][0]) + assertEquals(2, pathData[0][1]) + assertEquals(30, pathData[0][2]) + + } finally { + sql """ TRUNCATE TABLE ${tableName} """ + } + + + label = UUID.randomUUID().toString().replace("-", "2") + try { + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.orc") + INTO TABLE ${tableName} + FORMAT AS "orc" + (k1,k2) + COLUMNS FROM PATH AS (k3) + ) + WITH S3 + ( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${s3Region}" + ) + """ + + // Wait for load job to finish + def maxRetry = 60 + def result = "" + for (int i = 0; i < maxRetry; i++) { + result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'" + if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") { + break + } + sleep(1000) + } + + // Check load job state + assertEquals("FINISHED", result[0].State) + + // Verify the loaded data + def rowCount = sql "SELECT COUNT(*) FROM ${tableName}" + assertTrue(rowCount[0][0] > 0, "No data was loaded") + + def pathData = sql "SELECT * FROM ${tableName} LIMIT 1" + logger.info("path data 2: " + pathData) + assertEquals(1, pathData[0][0]) + assertEquals(2, pathData[0][1]) + assertEquals(30, pathData[0][2]) + + + // [[1, 2, 30]] + } finally { + sql """ TRUNCATE TABLE ${tableName} """ + } + } \ No newline at end of file