diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 3e7fca5af99498..816ced9c48cc29 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -186,7 +186,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte _line_reader_eof(false), _skip_lines(0), _io_ctx(io_ctx) { - _file_format_type = _params.format_type; + _file_format_type = _range.__isset.format_type ? _range.format_type : _params.format_type; _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO; if (_range.__isset.compress_type) { // for compatibility diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index d81ba7daa8419c..1625969acf9b4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -261,16 +261,14 @@ public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateCon TBrokerFileStatus fileStatus = fileStatuses.get(i); TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(), fileStatus.path); - context.params.setFormatType(formatType); TFileCompressType compressType = Util.getOrInferCompressType(context.fileGroup.getFileFormatProperties().getCompressionType(), fileStatus.path); - context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath, - columnsFromPathKeys); + columnsFromPathKeys, formatType, compressType); locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); } scanRangeLocations.add(locations); @@ -307,11 +305,9 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte // header_type TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(), fileStatus.path); - context.params.setFormatType(formatType); TFileCompressType compressType = Util.getOrInferCompressType(context.fileGroup.getFileFormatProperties().getCompressionType(), fileStatus.path); - context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); @@ -320,7 +316,7 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) { long rangeBytes = bytesPerInstance - curInstanceBytes; TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, - columnsFromPath, columnsFromPathKeys); + columnsFromPath, columnsFromPathKeys, formatType, compressType); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset += rangeBytes; @@ -330,7 +326,7 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte curInstanceBytes = 0; } else { TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath, - columnsFromPathKeys); + columnsFromPathKeys, formatType, compressType); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -401,8 +397,11 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx } private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes, - List columnsFromPath, List columnsFromPathKeys) { + List columnsFromPath, List columnsFromPathKeys, + TFileFormatType formatType, TFileCompressType compressType) { TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setFormatType(formatType); + rangeDesc.setCompressType(compressType); if (jobType == JobType.BULK_LOAD) { rangeDesc.setPath(fileStatus.path); rangeDesc.setStartOffset(curFileOffset); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index cdf09b65c37ef9..cd9e828cbb4d01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -102,9 +102,12 @@ protected void getAllFileStatus() throws UserException { boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat(); List filteredFileStatuses = Lists.newArrayList(); for (TBrokerFileStatus fstatus : fileStatuses) { - if (fstatus.getSize() == 0 && isBinaryFileFormat) { + boolean isSuccessFile = fstatus.path.endsWith("/_SUCCESS") + || fstatus.path.endsWith("_SUCCESS"); + if (fstatus.getSize() == 0 && (isBinaryFileFormat || isSuccessFile)) { // For parquet or orc file, if it is an empty file, ignore it. // Because we can not read an empty parquet or orc file. + // For _SUCCESS file, it is a metadata file, ignore it. if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) .add("empty file", fstatus).build()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java index b0862277a633ed..e6f77e5b5ff6ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java @@ -274,16 +274,14 @@ public void createScanRangeLocationsUnsplittable(NereidsParamCreateContext conte TBrokerFileStatus fileStatus = fileStatuses.get(i); TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(), fileStatus.path); - context.params.setFormatType(formatType); TFileCompressType compressType = Util.getOrInferCompressType( context.fileGroup.getFileFormatProperties().getCompressionType(), fileStatus.path); - context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath, - columnsFromPathKeys); + columnsFromPathKeys, formatType, compressType); locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); } scanRangeLocations.add(locations); @@ -326,11 +324,9 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context // header_type TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(), fileStatus.path); - context.params.setFormatType(formatType); TFileCompressType compressType = Util.getOrInferCompressType( context.fileGroup.getFileFormatProperties().getCompressionType(), fileStatus.path); - context.params.setCompressType(compressType); List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); List columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath(); @@ -339,7 +335,7 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context if (tmpBytes > bytesPerInstance && jobType != FileGroupInfo.JobType.STREAM_LOAD) { long rangeBytes = bytesPerInstance - curInstanceBytes; TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, - columnsFromPath, columnsFromPathKeys); + columnsFromPath, columnsFromPathKeys, formatType, compressType); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset += rangeBytes; @@ -349,7 +345,7 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context curInstanceBytes = 0; } else { TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath, - columnsFromPathKeys); + columnsFromPathKeys, formatType, compressType); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -420,8 +416,11 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx } private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes, - List columnsFromPath, List columnsFromPathKeys) { + List columnsFromPath, List columnsFromPathKeys, + TFileFormatType formatType, TFileCompressType compressType) { TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setFormatType(formatType); + rangeDesc.setCompressType(compressType); if (jobType == FileGroupInfo.JobType.BULK_LOAD) { rangeDesc.setPath(fileStatus.path); rangeDesc.setStartOffset(curFileOffset); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupInfoTest.java new file mode 100644 index 00000000000000..d5674fe622aafc --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupInfoTest.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.FileLoadScanNode; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TScanRangeLocations; + +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class FileGroupInfoTest { + + @Test + public void testCreateScanRangeLocationsUnsplittable( + @Injectable FileLoadScanNode.ParamCreateContext context, + @Injectable FederationBackendPolicy backendPolicy, + @Injectable BrokerFileGroup fileGroup, + @Injectable Table targetTable, + @Mocked BrokerDesc brokerDesc) throws UserException { + + List fileStatuses = new ArrayList<>(); + TBrokerFileStatus lzoFile = new TBrokerFileStatus(); + lzoFile.path = "hdfs://localhost:8900/data.csv.lzo"; + lzoFile.size = 100; + fileStatuses.add(lzoFile); + + TBrokerFileStatus plainFile = new TBrokerFileStatus(); + plainFile.path = "hdfs://localhost:8900/data.csv"; + plainFile.size = 50; + fileStatuses.add(plainFile); + + FileGroupInfo fileGroupInfo = new FileGroupInfo(1L, 1L, targetTable, brokerDesc, fileGroup, fileStatuses, 2, false, 1); + Deencapsulation.setField(fileGroupInfo, "numInstances", 1); + + TFileScanRangeParams params = new TFileScanRangeParams(); + context.params = params; + context.fileGroup = fileGroup; + + new Expectations() { + { + fileGroup.getFileFormatProperties().getFormatName(); + result = "csv"; + fileGroup.getFileFormatProperties().getCompressionType(); + result = TFileCompressType.UNKNOWN; // infer from path + fileGroup.getColumnNamesFromPath(); + result = new ArrayList(); + } + }; + + List scanRangeLocations = new ArrayList<>(); + fileGroupInfo.createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations); + + Assert.assertEquals(1, scanRangeLocations.size()); + List ranges = scanRangeLocations.get(0).getScanRange().getExtScanRange().getFileScanRange().getRanges(); + Assert.assertEquals(2, ranges.size()); + + // Check LZO file + TFileRangeDesc lzoRange = ranges.get(0); + Assert.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, lzoRange.getFormatType()); + Assert.assertEquals(TFileCompressType.LZOP, lzoRange.getCompressType()); + + // Check Plain file + TFileRangeDesc plainRange = ranges.get(1); + Assert.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, plainRange.getFormatType()); + Assert.assertEquals(TFileCompressType.PLAIN, plainRange.getCompressType()); + + // Shared params should NOT be set (they are deprecated and we want to avoid overwriting) + // Actually, in my implementation I removed the setFormatType/setCompressType from the loop. + // They might still have default values or be set elsewhere, but they shouldn't be the LZO/PLAIN from the loop. + Assert.assertFalse(params.isSetFormatType()); + Assert.assertFalse(params.isSetCompressType()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java index ee7a60ce0d08f6..209e44ce2e6487 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java @@ -46,6 +46,7 @@ public class BrokerLoadPendingTaskTest { @BeforeClass public static void setUp() { + tBrokerFileStatus.path = "hdfs://localhost:8900/test_column/file.csv"; tBrokerFileStatus.size = 1; } @@ -81,4 +82,50 @@ public void parseFile(String path, BrokerDesc brokerDesc, List> aggKeyToFileGroups = Maps.newHashMap(); + List brokerFileGroups = Lists.newArrayList(); + brokerFileGroups.add(brokerFileGroup); + FileGroupAggKey aggKey = new FileGroupAggKey(1L, null); + aggKeyToFileGroups.put(aggKey, brokerFileGroups); + + TBrokerFileStatus successFile = new TBrokerFileStatus(); + successFile.path = "hdfs://localhost:8900/test_column/_SUCCESS"; + successFile.size = 0; + successFile.isDir = false; + + TBrokerFileStatus dataFile = new TBrokerFileStatus(); + dataFile.path = "hdfs://localhost:8900/test_column/data.csv"; + dataFile.size = 100; + dataFile.isDir = false; + + new Expectations() { + { + env.getNextId(); + result = 1L; + brokerFileGroup.getFilePaths(); + result = "hdfs://localhost:8900/test_column/*"; + } + }; + + new MockUp() { + @Mock + public void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses) { + fileStatuses.add(successFile); + fileStatuses.add(dataFile); + } + }; + + BrokerLoadPendingTask brokerLoadPendingTask = new BrokerLoadPendingTask(brokerLoadJob, aggKeyToFileGroups, brokerDesc, LoadTask.Priority.NORMAL); + brokerLoadPendingTask.executeTask(); + BrokerPendingTaskAttachment brokerPendingTaskAttachment = Deencapsulation.getField(brokerLoadPendingTask, "attachment"); + // Only dataFile should be kept + Assert.assertEquals(1, brokerPendingTaskAttachment.getFileNumByTable(aggKey)); + Assert.assertEquals(dataFile.path, brokerPendingTaskAttachment.getFileStatusByTable(aggKey).get(0).get(0).path); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/load/NereidsFileGroupInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/load/NereidsFileGroupInfoTest.java new file mode 100644 index 00000000000000..c3de2ae4312de4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/load/NereidsFileGroupInfoTest.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.load; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.datasource.FederationBackendPolicy; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TScanRangeLocations; + +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +public class NereidsFileGroupInfoTest { + + @Test + public void testCreateScanRangeLocationsUnsplittable( + @Injectable FederationBackendPolicy backendPolicy, + @Injectable NereidsBrokerFileGroup fileGroup, + @Injectable Table targetTable, + @Mocked BrokerDesc brokerDesc) throws UserException { + + List fileStatuses = new ArrayList<>(); + TBrokerFileStatus lzoFile = new TBrokerFileStatus(); + lzoFile.path = "hdfs://localhost:8900/data.csv.lzo"; + lzoFile.size = 100; + fileStatuses.add(lzoFile); + + TBrokerFileStatus plainFile = new TBrokerFileStatus(); + plainFile.path = "hdfs://localhost:8900/data.csv"; + plainFile.size = 50; + fileStatuses.add(plainFile); + + NereidsFileGroupInfo fileGroupInfo = new NereidsFileGroupInfo(1L, 1L, targetTable, brokerDesc, fileGroup, fileStatuses, 2, false, 1); + Deencapsulation.setField(fileGroupInfo, "numInstances", 1); + + TFileScanRangeParams params = new TFileScanRangeParams(); + NereidsParamCreateContext context = new NereidsParamCreateContext(); + context.params = params; + context.fileGroup = fileGroup; + + new Expectations() { + { + fileGroup.getFileFormatProperties().getFormatName(); + result = "csv"; + fileGroup.getFileFormatProperties().getCompressionType(); + result = TFileCompressType.UNKNOWN; // infer from path + fileGroup.getColumnNamesFromPath(); + result = new ArrayList(); + } + }; + + List scanRangeLocations = new ArrayList<>(); + fileGroupInfo.createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations); + + Assertions.assertEquals(1, scanRangeLocations.size()); + List ranges = scanRangeLocations.get(0).getScanRange().getExtScanRange().getFileScanRange().getRanges(); + Assertions.assertEquals(2, ranges.size()); + + // Check LZO file + TFileRangeDesc lzoRange = ranges.get(0); + Assertions.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, lzoRange.getFormatType()); + Assertions.assertEquals(TFileCompressType.LZOP, lzoRange.getCompressType()); + + // Check Plain file + TFileRangeDesc plainRange = ranges.get(1); + Assertions.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, plainRange.getFormatType()); + Assertions.assertEquals(TFileCompressType.PLAIN, plainRange.getCompressType()); + + Assertions.assertFalse(params.isSetFormatType()); + Assertions.assertFalse(params.isSetCompressType()); + } +}