Skip to content

Commit

Permalink
Load: Parallelly load files into different target data partitions (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu authored Oct 24, 2024
1 parent 2eb060d commit dbb99bc
Showing 1 changed file with 40 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -452,27 +455,47 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
if (isClosed) {
throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}

for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
dataPartition2ModificationFile.entrySet()) {
entry.getValue().close();
}
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) {
TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
writer.endFile();

DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(generateResource(writer, progressIndex), true, isGeneratedByPipe);

// Metrics
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
databaseName ->
updateWritePointCountMetrics(
dataRegion, databaseName, getTsFileWritePointCount(writer), false));

final List<Map.Entry<DataPartitionInfo, TsFileIOWriter>> dataPartition2WriterList =
new ArrayList<>(dataPartition2Writer.entrySet());
Collections.shuffle(dataPartition2WriterList);

final AtomicReference<Exception> exception = new AtomicReference<>();
dataPartition2WriterList.parallelStream()
.forEach(
entry -> {
try {
final TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
writer.endFile();

final DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(
generateResource(writer, progressIndex), true, isGeneratedByPipe);

// Metrics
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
databaseName ->
updateWritePointCountMetrics(
dataRegion,
databaseName,
getTsFileWritePointCount(writer),
false));
} catch (final Exception e) {
exception.set(e);
}
});
if (exception.get() != null) {
throw new LoadFileException(exception.get());
}
}

Expand Down

0 comments on commit dbb99bc

Please sign in to comment.