Skip to content

Commit d7134e0

Browse files
committed
[fix](broker load)fix wildcard import failure caused by 0-byte metadata files
1 parent c7b7ad7 commit d7134e0

File tree

7 files changed

+267
-18
lines changed

7 files changed

+267
-18
lines changed

be/src/vec/exec/format/csv/csv_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
186186
_line_reader_eof(false),
187187
_skip_lines(0),
188188
_io_ctx(io_ctx) {
189-
_file_format_type = _params.format_type;
189+
_file_format_type = _range.__isset.format_type ? _range.format_type : _params.format_type;
190190
_is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
191191
if (_range.__isset.compress_type) {
192192
// for compatibility

fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,16 +261,14 @@ public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateCon
261261
TBrokerFileStatus fileStatus = fileStatuses.get(i);
262262
TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(),
263263
fileStatus.path);
264-
context.params.setFormatType(formatType);
265264
TFileCompressType compressType =
266265
Util.getOrInferCompressType(context.fileGroup.getFileFormatProperties().getCompressionType(),
267266
fileStatus.path);
268-
context.params.setCompressType(compressType);
269267
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
270268
context.fileGroup.getColumnNamesFromPath());
271269
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
272270
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath,
273-
columnsFromPathKeys);
271+
columnsFromPathKeys, formatType, compressType);
274272
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
275273
}
276274
scanRangeLocations.add(locations);
@@ -307,11 +305,9 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte
307305
// header_type
308306
TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(),
309307
fileStatus.path);
310-
context.params.setFormatType(formatType);
311308
TFileCompressType compressType =
312309
Util.getOrInferCompressType(context.fileGroup.getFileFormatProperties().getCompressionType(),
313310
fileStatus.path);
314-
context.params.setCompressType(compressType);
315311
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
316312
context.fileGroup.getColumnNamesFromPath());
317313
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
@@ -320,7 +316,7 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte
320316
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) {
321317
long rangeBytes = bytesPerInstance - curInstanceBytes;
322318
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
323-
columnsFromPath, columnsFromPathKeys);
319+
columnsFromPath, columnsFromPathKeys, formatType, compressType);
324320
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
325321
curFileOffset += rangeBytes;
326322

@@ -330,7 +326,7 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte
330326
curInstanceBytes = 0;
331327
} else {
332328
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath,
333-
columnsFromPathKeys);
329+
columnsFromPathKeys, formatType, compressType);
334330
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
335331
curFileOffset = 0;
336332
curInstanceBytes += leftBytes;
@@ -401,8 +397,11 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx
401397
}
402398

403399
private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes,
404-
List<String> columnsFromPath, List<String> columnsFromPathKeys) {
400+
List<String> columnsFromPath, List<String> columnsFromPathKeys,
401+
TFileFormatType formatType, TFileCompressType compressType) {
405402
TFileRangeDesc rangeDesc = new TFileRangeDesc();
403+
rangeDesc.setFormatType(formatType);
404+
rangeDesc.setCompressType(compressType);
406405
if (jobType == JobType.BULK_LOAD) {
407406
rangeDesc.setPath(fileStatus.path);
408407
rangeDesc.setStartOffset(curFileOffset);

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,12 @@ protected void getAllFileStatus() throws UserException {
102102
boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat();
103103
List<TBrokerFileStatus> filteredFileStatuses = Lists.newArrayList();
104104
for (TBrokerFileStatus fstatus : fileStatuses) {
105-
if (fstatus.getSize() == 0 && isBinaryFileFormat) {
105+
boolean isSuccessFile = fstatus.path.endsWith("/_SUCCESS")
106+
|| fstatus.path.endsWith("_SUCCESS");
107+
if (fstatus.getSize() == 0 && (isBinaryFileFormat || isSuccessFile)) {
106108
// For parquet or orc file, if it is an empty file, ignore it.
107109
// Because we can not read an empty parquet or orc file.
110+
// For _SUCCESS file, it is a metadata file, ignore it.
108111
if (LOG.isDebugEnabled()) {
109112
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
110113
.add("empty file", fstatus).build());

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,14 @@ public void createScanRangeLocationsUnsplittable(NereidsParamCreateContext conte
274274
TBrokerFileStatus fileStatus = fileStatuses.get(i);
275275
TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(),
276276
fileStatus.path);
277-
context.params.setFormatType(formatType);
278277
TFileCompressType compressType = Util.getOrInferCompressType(
279278
context.fileGroup.getFileFormatProperties().getCompressionType(),
280279
fileStatus.path);
281-
context.params.setCompressType(compressType);
282280
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
283281
context.fileGroup.getColumnNamesFromPath());
284282
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
285283
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath,
286-
columnsFromPathKeys);
284+
columnsFromPathKeys, formatType, compressType);
287285
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
288286
}
289287
scanRangeLocations.add(locations);
@@ -326,11 +324,9 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context
326324
// header_type
327325
TFileFormatType formatType = formatType(context.fileGroup.getFileFormatProperties().getFormatName(),
328326
fileStatus.path);
329-
context.params.setFormatType(formatType);
330327
TFileCompressType compressType = Util.getOrInferCompressType(
331328
context.fileGroup.getFileFormatProperties().getCompressionType(),
332329
fileStatus.path);
333-
context.params.setCompressType(compressType);
334330
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
335331
context.fileGroup.getColumnNamesFromPath());
336332
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
@@ -339,7 +335,7 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context
339335
if (tmpBytes > bytesPerInstance && jobType != FileGroupInfo.JobType.STREAM_LOAD) {
340336
long rangeBytes = bytesPerInstance - curInstanceBytes;
341337
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
342-
columnsFromPath, columnsFromPathKeys);
338+
columnsFromPath, columnsFromPathKeys, formatType, compressType);
343339
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
344340
curFileOffset += rangeBytes;
345341

@@ -349,7 +345,7 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context
349345
curInstanceBytes = 0;
350346
} else {
351347
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath,
352-
columnsFromPathKeys);
348+
columnsFromPathKeys, formatType, compressType);
353349
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
354350
curFileOffset = 0;
355351
curInstanceBytes += leftBytes;
@@ -420,8 +416,11 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx
420416
}
421417

422418
private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes,
423-
List<String> columnsFromPath, List<String> columnsFromPathKeys) {
419+
List<String> columnsFromPath, List<String> columnsFromPathKeys,
420+
TFileFormatType formatType, TFileCompressType compressType) {
424421
TFileRangeDesc rangeDesc = new TFileRangeDesc();
422+
rangeDesc.setFormatType(formatType);
423+
rangeDesc.setCompressType(compressType);
425424
if (jobType == FileGroupInfo.JobType.BULK_LOAD) {
426425
rangeDesc.setPath(fileStatus.path);
427426
rangeDesc.setStartOffset(curFileOffset);
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource;
19+
20+
import org.apache.doris.analysis.BrokerDesc;
21+
import org.apache.doris.common.UserException;
22+
import org.apache.doris.common.jmockit.Deencapsulation;
23+
import org.apache.doris.load.BrokerFileGroup;
24+
import org.apache.doris.planner.FileLoadScanNode;
25+
import org.apache.doris.thrift.TBrokerFileStatus;
26+
import org.apache.doris.thrift.TFileCompressType;
27+
import org.apache.doris.thrift.TFileFormatType;
28+
import org.apache.doris.thrift.TFileRangeDesc;
29+
import org.apache.doris.thrift.TFileScanRangeParams;
30+
import org.apache.doris.thrift.TScanRangeLocations;
31+
32+
import com.google.common.collect.Lists;
33+
import mockit.Expectations;
34+
import mockit.Injectable;
35+
import mockit.Mocked;
36+
import org.junit.Assert;
37+
import org.junit.Test;
38+
39+
import java.util.ArrayList;
40+
import java.util.List;
41+
42+
public class FileGroupInfoTest {
43+
44+
@Test
45+
public void testCreateScanRangeLocationsUnsplittable(
46+
@Injectable FileLoadScanNode.ParamCreateContext context,
47+
@Injectable FederationBackendPolicy backendPolicy,
48+
@Injectable BrokerFileGroup fileGroup,
49+
@Mocked BrokerDesc brokerDesc) throws UserException {
50+
51+
List<TBrokerFileStatus> fileStatuses = new ArrayList<>();
52+
TBrokerFileStatus lzoFile = new TBrokerFileStatus();
53+
lzoFile.path = "hdfs://localhost:8900/data.csv.lzo";
54+
lzoFile.size = 100;
55+
fileStatuses.add(lzoFile);
56+
57+
TBrokerFileStatus plainFile = new TBrokerFileStatus();
58+
plainFile.path = "hdfs://localhost:8900/data.csv";
59+
plainFile.size = 50;
60+
fileStatuses.add(plainFile);
61+
62+
FileGroupInfo fileGroupInfo = Deencapsulation.newInstance(FileGroupInfo.class, 1L, 1L, Lists.newArrayList(fileGroup), brokerDesc, 1);
63+
Deencapsulation.setField(fileGroupInfo, "fileStatuses", fileStatuses);
64+
Deencapsulation.setField(fileGroupInfo, "numInstances", 1);
65+
66+
TFileScanRangeParams params = new TFileScanRangeParams();
67+
context.params = params;
68+
context.fileGroup = fileGroup;
69+
70+
new Expectations() {
71+
{
72+
fileGroup.getFileFormatProperties().getFormatName();
73+
result = "csv";
74+
fileGroup.getFileFormatProperties().getCompressionType();
75+
result = null; // infer from path
76+
fileGroup.getColumnNamesFromPath();
77+
result = new ArrayList<String>();
78+
}
79+
};
80+
81+
List<TScanRangeLocations> scanRangeLocations = new ArrayList<>();
82+
fileGroupInfo.createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations);
83+
84+
Assert.assertEquals(1, scanRangeLocations.size());
85+
List<TFileRangeDesc> ranges = scanRangeLocations.get(0).getScanRange().getExtScanRange().getFileScanRange().getRanges();
86+
Assert.assertEquals(2, ranges.size());
87+
88+
// Check LZO file
89+
TFileRangeDesc lzoRange = ranges.get(0);
90+
Assert.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, lzoRange.getFormatType());
91+
Assert.assertEquals(TFileCompressType.LZO, lzoRange.getCompressType());
92+
93+
// Check Plain file
94+
TFileRangeDesc plainRange = ranges.get(1);
95+
Assert.assertEquals(TFileFormatType.FORMAT_CSV_PLAIN, plainRange.getFormatType());
96+
Assert.assertEquals(TFileCompressType.PLAIN, plainRange.getCompressType());
97+
98+
// Shared params should NOT be set (they are deprecated and we want to avoid overwriting)
99+
// Actually, in my implementation I removed the setFormatType/setCompressType from the loop.
100+
// They might still have default values or be set elsewhere, but they shouldn't be the LZO/PLAIN from the loop.
101+
Assert.assertFalse(params.isSetFormatType());
102+
Assert.assertFalse(params.isSetCompressType());
103+
}
104+
}

fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,50 @@ public void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus
8181
Assert.assertEquals(1, brokerPendingTaskAttachment.getFileNumByTable(aggKey));
8282
Assert.assertEquals(tBrokerFileStatus, brokerPendingTaskAttachment.getFileStatusByTable(aggKey).get(0).get(0));
8383
}
84+
85+
@Test
86+
public void testFilterSuccessFile(@Injectable BrokerLoadJob brokerLoadJob,
87+
@Injectable BrokerFileGroup brokerFileGroup,
88+
@Injectable BrokerDesc brokerDesc,
89+
@Mocked Env env) throws UserException {
90+
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
91+
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
92+
brokerFileGroups.add(brokerFileGroup);
93+
FileGroupAggKey aggKey = new FileGroupAggKey(1L, null);
94+
aggKeyToFileGroups.put(aggKey, brokerFileGroups);
95+
96+
TBrokerFileStatus successFile = new TBrokerFileStatus();
97+
successFile.path = "hdfs://localhost:8900/test_column/_SUCCESS";
98+
successFile.size = 0;
99+
successFile.isDir = false;
100+
101+
TBrokerFileStatus dataFile = new TBrokerFileStatus();
102+
dataFile.path = "hdfs://localhost:8900/test_column/data.csv";
103+
dataFile.size = 100;
104+
dataFile.isDir = false;
105+
106+
new Expectations() {
107+
{
108+
env.getNextId();
109+
result = 1L;
110+
brokerFileGroup.getFilePaths();
111+
result = "hdfs://localhost:8900/test_column/*";
112+
}
113+
};
114+
115+
new MockUp<BrokerUtil>() {
116+
@Mock
117+
public void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus> fileStatuses) {
118+
fileStatuses.add(successFile);
119+
fileStatuses.add(dataFile);
120+
}
121+
};
122+
123+
BrokerLoadPendingTask brokerLoadPendingTask = new BrokerLoadPendingTask(brokerLoadJob, aggKeyToFileGroups, brokerDesc, LoadTask.Priority.NORMAL);
124+
brokerLoadPendingTask.executeTask();
125+
BrokerPendingTaskAttachment brokerPendingTaskAttachment = Deencapsulation.getField(brokerLoadPendingTask, "attachment");
126+
// Only dataFile should be kept
127+
Assert.assertEquals(1, brokerPendingTaskAttachment.getFileNumByTable(aggKey));
128+
Assert.assertEquals(dataFile.path, brokerPendingTaskAttachment.getFileStatusByTable(aggKey).get(0).get(0).path);
129+
}
84130
}

0 commit comments

Comments
 (0)