Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,21 @@ Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema
std::unordered_map<std::string, DataTypePtr> name_to_col_type;
RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
for (const auto& [col_name, col_type] : name_to_col_type) {
_slot_lower_name_to_col_type.emplace(to_lower(col_name), col_type);
auto col_name_lower = to_lower(col_name);
if (_partition_col_descs.contains(col_name_lower)) {
/*
* `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
* generate columns of the corresponding type, which records the columns existing in the file.
*
* When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
* not match the slot type in `_output_tuple_desc`, causing an error when
* Serde `deserialize_one_cell_from_json` fills the partition values.
*
* So for partition column not need fill _slot_lower_name_to_col_type.
*/
continue;
}
_slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
}

if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,219 @@ suite("test_load_columns_from_path", "load_p0") {
} finally {
sql """ DROP TABLE ${tableName} """
}

sql """ DROP TABLE IF EXISTS ${tableName} """

sql """
CREATE TABLE ${tableName} (
k1 INT,
k2 INT,
k3 INT
)
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

label = UUID.randomUUID().toString().replace("-", "2")
try {
sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.parquet")
INTO TABLE ${tableName}
FORMAT AS "parquet"
(k1, k2, k3)
)
WITH S3
(
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"s3.endpoint" = "${s3Endpoint}",
"s3.region" = "${s3Region}"
)
"""

// Wait for load job to finish
def maxRetry = 60
def result = ""
for (int i = 0; i < maxRetry; i++) {
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
break
}
sleep(1000)
}

// Check load job state
assertEquals("FINISHED", result[0].State)

// Verify the loaded data
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
assertTrue(rowCount[0][0] > 0, "No data was loaded")

def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
logger.info("path data 1: " + pathData)
// 1 2 3
assertEquals(1, pathData[0][0])
assertEquals(2, pathData[0][1])
assertEquals(3, pathData[0][2])

} finally {
sql """ TRUNCATE TABLE ${tableName} """
}

label = UUID.randomUUID().toString().replace("-", "2")


try {
sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.orc")
INTO TABLE ${tableName}
FORMAT AS "orc"
(k1, k3)
COLUMNS FROM PATH AS (k2)
)
WITH S3
(
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"s3.endpoint" = "${s3Endpoint}",
"s3.region" = "${s3Region}"
)
"""

// Wait for load job to finish
def maxRetry = 60
def result = ""
for (int i = 0; i < maxRetry; i++) {
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
break
}
sleep(1000)
}

// Check load job state
assertEquals("FINISHED", result[0].State)

// Verify the loaded data
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
assertTrue(rowCount[0][0] > 0, "No data was loaded")

def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
logger.info("path data 2: " + pathData)
// 1 20 3
assertEquals(1, pathData[0][0])
assertEquals(20, pathData[0][1])
assertEquals(3, pathData[0][2])

} finally {
sql """ TRUNCATE TABLE ${tableName} """
}

label = UUID.randomUUID().toString().replace("-", "2")
try {
sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.parquet")
INTO TABLE ${tableName}
FORMAT AS "parquet"
(k2)
COLUMNS FROM PATH AS (k1,k3)
)
WITH S3
(
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"s3.endpoint" = "${s3Endpoint}",
"s3.region" = "${s3Region}"
)
"""

// Wait for load job to finish
def maxRetry = 60
def result = ""
for (int i = 0; i < maxRetry; i++) {
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
break
}
sleep(1000)
}

// Check load job state
assertEquals("FINISHED", result[0].State)

// Verify the loaded data
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
assertTrue(rowCount[0][0] > 0, "No data was loaded")

def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
logger.info("path data 2: " + pathData)
// 10 2 30
assertEquals(10, pathData[0][0])
assertEquals(2, pathData[0][1])
assertEquals(30, pathData[0][2])

} finally {
sql """ TRUNCATE TABLE ${tableName} """
}


label = UUID.randomUUID().toString().replace("-", "2")
try {
sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.orc")
INTO TABLE ${tableName}
FORMAT AS "orc"
(k1,k2)
COLUMNS FROM PATH AS (k3)
)
WITH S3
(
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"s3.endpoint" = "${s3Endpoint}",
"s3.region" = "${s3Region}"
)
"""

// Wait for load job to finish
def maxRetry = 60
def result = ""
for (int i = 0; i < maxRetry; i++) {
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
break
}
sleep(1000)
}

// Check load job state
assertEquals("FINISHED", result[0].State)

// Verify the loaded data
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
assertTrue(rowCount[0][0] > 0, "No data was loaded")

def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
logger.info("path data 2: " + pathData)
assertEquals(1, pathData[0][0])
assertEquals(2, pathData[0][1])
assertEquals(30, pathData[0][2])


// [[1, 2, 30]]
} finally {
sql """ TRUNCATE TABLE ${tableName} """
}

}
Loading