diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index c563794dccbc35..d81ba7daa8419c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -268,7 +268,9 @@ public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateCon context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); - TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath); + List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); + TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath, + columnsFromPathKeys); locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); } scanRangeLocations.add(locations); @@ -312,12 +314,13 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); + List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); // Assign scan range locations only for broker load. // stream load has only one file, and no need to set multi scan ranges. if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) { long rangeBytes = bytesPerInstance - curInstanceBytes; TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, - columnsFromPath); + columnsFromPath, columnsFromPathKeys); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset += rangeBytes; @@ -326,7 +329,8 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte curLocations = newLocations(context.params, brokerDesc, backendPolicy); curInstanceBytes = 0; } else { - TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath); + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath, + columnsFromPathKeys); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -397,7 +401,7 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx } private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes, - List columnsFromPath) { + List columnsFromPath, List columnsFromPathKeys) { TFileRangeDesc rangeDesc = new TFileRangeDesc(); if (jobType == JobType.BULK_LOAD) { rangeDesc.setPath(fileStatus.path); @@ -405,6 +409,7 @@ private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus rangeDesc.setSize(rangeBytes); rangeDesc.setFileSize(fileStatus.size); rangeDesc.setColumnsFromPath(columnsFromPath); + rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); if (getFileType() == TFileType.FILE_HDFS) { URI fileUri = new Path(fileStatus.path).toUri(); rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java index f6b122ace6237e..b0862277a633ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java @@ -281,7 +281,9 @@ public void createScanRangeLocationsUnsplittable(NereidsParamCreateContext conte context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); - TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath); + List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); + TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath, + columnsFromPathKeys); locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); } scanRangeLocations.add(locations); @@ -331,12 +333,13 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); + List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); // Assign scan range locations only for broker load. // stream load has only one file, and no need to set multi scan ranges. if (tmpBytes > bytesPerInstance && jobType != FileGroupInfo.JobType.STREAM_LOAD) { long rangeBytes = bytesPerInstance - curInstanceBytes; TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, - columnsFromPath); + columnsFromPath, columnsFromPathKeys); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset += rangeBytes; @@ -345,7 +348,8 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context curLocations = newLocations(context.params, brokerDesc, backendPolicy); curInstanceBytes = 0; } else { - TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath); + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath, + columnsFromPathKeys); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -416,7 +420,7 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx } private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes, - List columnsFromPath) { + List columnsFromPath, List columnsFromPathKeys) { TFileRangeDesc rangeDesc = new TFileRangeDesc(); if (jobType == FileGroupInfo.JobType.BULK_LOAD) { rangeDesc.setPath(fileStatus.path); @@ -424,6 +428,7 @@ private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus rangeDesc.setSize(rangeBytes); rangeDesc.setFileSize(fileStatus.size); rangeDesc.setColumnsFromPath(columnsFromPath); + rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); if (getFileType() == TFileType.FILE_HDFS) { URI fileUri = new Path(fileStatus.path).toUri(); rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority()); diff --git a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy new file mode 100644 index 00000000000000..e4c6a5d2eedb9d --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_load_columns_from_path", "load_p0") { + def s3BucketName = getS3BucketName() + def s3Endpoint = getS3Endpoint() + def s3Region = getS3Region() + def ak = getS3AK() + def sk = getS3SK() + def tableName = "test_columns_from_path" + def label = UUID.randomUUID().toString().replace("-", "0") + def path = "s3://${s3BucketName}/load/product=p1/code=107020/dt=20250202/data.csv" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE ${tableName} ( + k1 INT, + k2 INT, + pd VARCHAR(20) NULL, + code INT NULL, + dt DATE + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + // test all three columns with set three + try { + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("${path}") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + FORMAT AS "CSV" + (k1, k2) + COLUMNS FROM PATH AS (product, code, dt) + SET + ( + pd = product, + code = code, + dt = dt + ) + ) + WITH S3 + ( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${s3Region}" + ) + """ + + // Wait for load job to finish + def maxRetry = 60 + def result = "" + for (int i = 0; i < maxRetry; i++) { + result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'" + if (result[0].State == "FINISHED" || result[0].State == "CANCELLED") { + break + } + sleep(1000) + } + + // Check load job state + assertEquals("FINISHED", result[0].State) + + // Verify the loaded data + def rowCount = sql "SELECT COUNT(*) FROM ${tableName}" + assertTrue(rowCount[0][0] > 0, "No data was loaded") + + // Verify columns from path are extracted correctly + def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1" + assertEquals("p1", pathData[0][0]) + assertEquals(107020, pathData[0][1]) + assertEquals("2025-02-02", pathData[0][2].toString()) + + } finally { + sql """ TRUNCATE TABLE ${tableName} """ + } + + // test all three columns with set non-same name column + label = UUID.randomUUID().toString().replace("-", "1") + try { + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("${path}") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + FORMAT AS "CSV" + (k1, k2) + COLUMNS FROM PATH AS (product, code, dt) + SET ( + pd = product + ) + ) + WITH S3 + ( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${s3Region}" + ) + """ + + // Wait for load job to finish + def maxRetry = 60 + def result = "" + for (int i = 0; i < maxRetry; i++) { + result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'" + if (result[0].State == "FINISHED" || result[0].State == "CANCELLED") { + break + } + sleep(1000) + } + + // Check load job state + assertEquals("FINISHED", result[0].State) + + // Verify the loaded data + def rowCount = sql "SELECT COUNT(*) FROM ${tableName}" + assertTrue(rowCount[0][0] > 0, "No data was loaded") + + // Verify columns from path are extracted correctly + def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1" + assertEquals("p1", pathData[0][0]) + assertEquals(107020, pathData[0][1]) + assertEquals("2025-02-02", pathData[0][2].toString()) + + } finally { + sql """ TRUNCATE TABLE ${tableName} """ + } + + // test extracting only one column from path (only product) + label = UUID.randomUUID().toString().replace("-", "2") + try { + sql """ + LOAD LABEL ${label} + ( + DATA INFILE("${path}") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "," + FORMAT AS "CSV" + (k1, k2) + COLUMNS FROM PATH AS (product) + SET + ( + pd = product + ) + ) + WITH S3 + ( + "s3.access_key" = "${ak}", + "s3.secret_key" = "${sk}", + "s3.endpoint" = "${s3Endpoint}", + "s3.region" = "${s3Region}" + ) + """ + + // Wait for load job to finish + def maxRetry = 60 + def result = "" + for (int i = 0; i < maxRetry; i++) { + result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'" + if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") { + break + } + sleep(1000) + } + + // Check load job state + assertEquals("FINISHED", result[0].State) + + // Verify the loaded data + def rowCount = sql "SELECT COUNT(*) FROM ${tableName}" + assertTrue(rowCount[0][0] > 0, "No data was loaded") + + // Verify only pd column is extracted from path, code and dt are loaded from CSV file + def pathData = sql "SELECT pd FROM ${tableName} LIMIT 1" + assertEquals("p1", pathData[0][0]) + // code and dt should be loaded from CSV file data, not from path + // The actual values depend on the CSV file content + + } finally { + sql """ DROP TABLE ${tableName} """ + } +} \ No newline at end of file