Skip to content

Commit 0da40bc

Browse files
authored
branch-3.1: [fix](broker load)fix broker load fail when <column from path> column already exists in file #58579 (#58904)
bp #58579
1 parent c2a16b8 commit 0da40bc

File tree

2 files changed

+230
-1
lines changed

2 files changed

+230
-1
lines changed

be/src/vec/exec/scan/vfile_scanner.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,21 @@ Status VFileScanner::_get_next_reader() {
13141314
std::unordered_map<std::string, TypeDescriptor> name_to_col_type;
13151315
RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
13161316
for (const auto& [col_name, col_type] : name_to_col_type) {
1317-
_slot_lower_name_to_col_type.emplace(to_lower(col_name), col_type);
1317+
auto col_name_lower = to_lower(col_name);
1318+
if (_partition_col_descs.contains(col_name_lower)) {
1319+
/*
1320+
* `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1321+
* generate columns of the corresponding type, which records the columns existing in the file.
1322+
*
1323+
* When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1324+
* not match the slot type in `_output_tuple_desc`, causing an error when
1325+
* Serde `deserialize_one_cell_from_json` fills the partition values.
1326+
*
1327+
* So for partition column not need fill _slot_lower_name_to_col_type.
1328+
*/
1329+
continue;
1330+
}
1331+
_slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
13181332
}
13191333

13201334
if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {

regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,219 @@ suite("test_load_columns_from_path", "load_p0") {
202202
} finally {
203203
sql """ DROP TABLE ${tableName} """
204204
}
205+
206+
sql """ DROP TABLE IF EXISTS ${tableName} """
207+
208+
sql """
209+
CREATE TABLE ${tableName} (
210+
k1 INT,
211+
k2 INT,
212+
k3 INT
213+
)
214+
DUPLICATE KEY(`k1`)
215+
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
216+
PROPERTIES (
217+
"replication_allocation" = "tag.location.default: 1"
218+
);
219+
"""
220+
221+
label = UUID.randomUUID().toString().replace("-", "2")
222+
try {
223+
sql """
224+
LOAD LABEL ${label}
225+
(
226+
DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.parquet")
227+
INTO TABLE ${tableName}
228+
FORMAT AS "parquet"
229+
(k1, k2, k3)
230+
)
231+
WITH S3
232+
(
233+
"s3.access_key" = "${ak}",
234+
"s3.secret_key" = "${sk}",
235+
"s3.endpoint" = "${s3Endpoint}",
236+
"s3.region" = "${s3Region}"
237+
)
238+
"""
239+
240+
// Wait for load job to finish
241+
def maxRetry = 60
242+
def result = ""
243+
for (int i = 0; i < maxRetry; i++) {
244+
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
245+
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
246+
break
247+
}
248+
sleep(1000)
249+
}
250+
251+
// Check load job state
252+
assertEquals("FINISHED", result[0].State)
253+
254+
// Verify the loaded data
255+
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
256+
assertTrue(rowCount[0][0] > 0, "No data was loaded")
257+
258+
def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
259+
logger.info("path data 1: " + pathData)
260+
// 1 2 3
261+
assertEquals(1, pathData[0][0])
262+
assertEquals(2, pathData[0][1])
263+
assertEquals(3, pathData[0][2])
264+
265+
} finally {
266+
sql """ TRUNCATE TABLE ${tableName} """
267+
}
268+
269+
label = UUID.randomUUID().toString().replace("-", "2")
270+
271+
272+
try {
273+
sql """
274+
LOAD LABEL ${label}
275+
(
276+
DATA INFILE("s3://${s3BucketName}/load/k1=10/k2=20/test.orc")
277+
INTO TABLE ${tableName}
278+
FORMAT AS "orc"
279+
(k1, k3)
280+
COLUMNS FROM PATH AS (k2)
281+
)
282+
WITH S3
283+
(
284+
"s3.access_key" = "${ak}",
285+
"s3.secret_key" = "${sk}",
286+
"s3.endpoint" = "${s3Endpoint}",
287+
"s3.region" = "${s3Region}"
288+
)
289+
"""
290+
291+
// Wait for load job to finish
292+
def maxRetry = 60
293+
def result = ""
294+
for (int i = 0; i < maxRetry; i++) {
295+
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
296+
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
297+
break
298+
}
299+
sleep(1000)
300+
}
301+
302+
// Check load job state
303+
assertEquals("FINISHED", result[0].State)
304+
305+
// Verify the loaded data
306+
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
307+
assertTrue(rowCount[0][0] > 0, "No data was loaded")
308+
309+
def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
310+
logger.info("path data 2: " + pathData)
311+
// 1 20 3
312+
assertEquals(1, pathData[0][0])
313+
assertEquals(20, pathData[0][1])
314+
assertEquals(3, pathData[0][2])
315+
316+
} finally {
317+
sql """ TRUNCATE TABLE ${tableName} """
318+
}
319+
320+
label = UUID.randomUUID().toString().replace("-", "2")
321+
try {
322+
sql """
323+
LOAD LABEL ${label}
324+
(
325+
DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.parquet")
326+
INTO TABLE ${tableName}
327+
FORMAT AS "parquet"
328+
(k2)
329+
COLUMNS FROM PATH AS (k1,k3)
330+
)
331+
WITH S3
332+
(
333+
"s3.access_key" = "${ak}",
334+
"s3.secret_key" = "${sk}",
335+
"s3.endpoint" = "${s3Endpoint}",
336+
"s3.region" = "${s3Region}"
337+
)
338+
"""
339+
340+
// Wait for load job to finish
341+
def maxRetry = 60
342+
def result = ""
343+
for (int i = 0; i < maxRetry; i++) {
344+
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
345+
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
346+
break
347+
}
348+
sleep(1000)
349+
}
350+
351+
// Check load job state
352+
assertEquals("FINISHED", result[0].State)
353+
354+
// Verify the loaded data
355+
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
356+
assertTrue(rowCount[0][0] > 0, "No data was loaded")
357+
358+
def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
359+
logger.info("path data 2: " + pathData)
360+
// 10 2 30
361+
assertEquals(10, pathData[0][0])
362+
assertEquals(2, pathData[0][1])
363+
assertEquals(30, pathData[0][2])
364+
365+
} finally {
366+
sql """ TRUNCATE TABLE ${tableName} """
367+
}
368+
369+
370+
label = UUID.randomUUID().toString().replace("-", "2")
371+
try {
372+
sql """
373+
LOAD LABEL ${label}
374+
(
375+
DATA INFILE("s3://${s3BucketName}/load/k1=10/k3=30/test.orc")
376+
INTO TABLE ${tableName}
377+
FORMAT AS "orc"
378+
(k1,k2)
379+
COLUMNS FROM PATH AS (k3)
380+
)
381+
WITH S3
382+
(
383+
"s3.access_key" = "${ak}",
384+
"s3.secret_key" = "${sk}",
385+
"s3.endpoint" = "${s3Endpoint}",
386+
"s3.region" = "${s3Region}"
387+
)
388+
"""
389+
390+
// Wait for load job to finish
391+
def maxRetry = 60
392+
def result = ""
393+
for (int i = 0; i < maxRetry; i++) {
394+
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
395+
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
396+
break
397+
}
398+
sleep(1000)
399+
}
400+
401+
// Check load job state
402+
assertEquals("FINISHED", result[0].State)
403+
404+
// Verify the loaded data
405+
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
406+
assertTrue(rowCount[0][0] > 0, "No data was loaded")
407+
408+
def pathData = sql "SELECT * FROM ${tableName} LIMIT 1"
409+
logger.info("path data 2: " + pathData)
410+
assertEquals(1, pathData[0][0])
411+
assertEquals(2, pathData[0][1])
412+
assertEquals(30, pathData[0][2])
413+
414+
415+
// [[1, 2, 30]]
416+
} finally {
417+
sql """ TRUNCATE TABLE ${tableName} """
418+
}
419+
205420
}

0 commit comments

Comments
 (0)