Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector[File] Optimize files commit order #5045

Merged
merged 1 commit into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> {
private final WriteStrategy writeStrategy;
protected final WriteStrategy writeStrategy;
private final FileSystemUtils fileSystemUtils;

@SuppressWarnings("checkstyle:MagicNumber")
Expand All @@ -67,7 +67,7 @@ public BaseFileSinkWriter(
List<String> transactions = findTransactionList(jobId, uuidPrefix);
FileSinkAggregatedCommitter fileSinkAggregatedCommitter =
new FileSinkAggregatedCommitter(fileSystemUtils);
HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
LinkedHashMap<String, FileSinkState> fileStatesMap = new LinkedHashMap<>();
fileSinkStates.forEach(
fileSinkState ->
fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import lombok.Data;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
Expand All @@ -34,7 +34,7 @@ public class FileAggregatedCommitInfo implements Serializable {
*
* <p>V is the target file path of the data file.
*/
private final Map<String, Map<String, String>> transactionMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change it?

private final LinkedHashMap<String, LinkedHashMap<String, String>> transactionMap;

/**
* Storage the partition information in map.
Expand All @@ -43,5 +43,5 @@ public class FileAggregatedCommitInfo implements Serializable {
*
* <p>V is the list of partition column's values.
*/
private final Map<String, List<String>> partitionDirAndValuesMap;
private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import lombok.Data;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
Expand All @@ -34,7 +34,7 @@ public class FileCommitInfo implements Serializable {
*
* <p>V is the target file path of the data file.
*/
private final Map<String, String> needMoveFiles;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

private final LinkedHashMap<String, String> needMoveFiles;

/**
* Storage the partition information in map.
Expand All @@ -43,7 +43,7 @@ public class FileCommitInfo implements Serializable {
*
* <p>V is the list of partition column's values.
*/
private final Map<String, List<String>> partitionDirAndValuesMap;
private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;

/** Storage the transaction directory */
private final String transactionDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -44,7 +44,7 @@ public List<FileAggregatedCommitInfo> commit(
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
for (Map.Entry<String, Map<String, String>> entry :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

for (Map.Entry<String, LinkedHashMap<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
Expand Down Expand Up @@ -77,13 +77,14 @@ public FileAggregatedCommitInfo combine(List<FileCommitInfo> commitInfos) {
if (commitInfos == null || commitInfos.size() == 0) {
return null;
}
Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>();
LinkedHashMap<String, LinkedHashMap<String, String>> aggregateCommitInfo =
new LinkedHashMap<>();
LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new LinkedHashMap<>();
commitInfos.forEach(
commitInfo -> {
Map<String, String> needMoveFileMap =
LinkedHashMap<String, String> needMoveFileMap =
aggregateCommitInfo.computeIfAbsent(
commitInfo.getTransactionDir(), k -> new HashMap<>());
commitInfo.getTransactionDir(), k -> new LinkedHashMap<>());
needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
if (commitInfo.getPartitionDirAndValuesMap() != null
&& !commitInfo.getPartitionDirAndValuesMap().isEmpty()) {
Expand All @@ -109,7 +110,7 @@ public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws E
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
for (Map.Entry<String, Map<String, String>> entry :
for (Map.Entry<String, LinkedHashMap<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
// rollback the file
for (Map.Entry<String, String> mvFileEntry :
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import lombok.Data;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
public class FileSinkState implements Serializable {
private final String transactionId;
private final String uuidPrefix;
private final Long checkpointId;
private final Map<String, String> needMoveFiles;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

private final Map<String, List<String>> partitionDirAndValuesMap;
private final LinkedHashMap<String, String> needMoveFiles;
private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
private final String transactionDir;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -76,9 +77,9 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
protected String uuidPrefix;

protected String transactionDirectory;
protected Map<String, String> needMoveFiles;
protected Map<String, String> beingWrittenFile = new HashMap<>();
private Map<String, List<String>> partitionDirAndValuesMap;
protected LinkedHashMap<String, String> needMoveFiles;
protected LinkedHashMap<String, String> beingWrittenFile = new LinkedHashMap<>();
private LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
protected SeaTunnelRowType seaTunnelRowType;

// Checkpoint id from engine is start with 1
Expand Down Expand Up @@ -111,13 +112,18 @@ public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIn
@Override
public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException {
if (currentBatchSize >= batchSize) {
this.partId++;
newFilePart();
currentBatchSize = 0;
beingWrittenFile.clear();
}
currentBatchSize++;
}

public synchronized void newFilePart() {
this.partId++;
beingWrittenFile.clear();
log.debug("new file part: {}", partId);
}

protected SeaTunnelRowType buildSchemaWithRowType(
SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) {
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
Expand Down Expand Up @@ -177,9 +183,9 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
* @return the map of partition directory
*/
@Override
public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
public LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
List<Integer> partitionFieldsIndexInRow = fileSinkConfig.getPartitionFieldsIndexInRow();
Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new LinkedHashMap<>(1);
if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
partitionDirAndValuesMap.put(BaseSinkConfig.NON_PARTITION, null);
return partitionDirAndValuesMap;
Expand Down Expand Up @@ -258,12 +264,15 @@ public String generateFileName(String transactionId) {
@Override
public Optional<FileCommitInfo> prepareCommit() {
this.finishAndCloseFile();
Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
Map<String, List<String>> copyMap =
LinkedHashMap<String, String> commitMap = new LinkedHashMap<>(this.needMoveFiles);
LinkedHashMap<String, List<String>> copyMap =
this.partitionDirAndValuesMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
Map.Entry::getKey,
e -> new ArrayList<>(e.getValue()),
(e1, e2) -> e1,
LinkedHashMap::new));
return Optional.of(new FileCommitInfo(commitMap, copyMap, transactionDirectory));
}

Expand Down Expand Up @@ -301,8 +310,8 @@ public void beginTransaction(Long checkpointId) {
this.checkpointId = checkpointId;
this.transactionId = getTransactionId(checkpointId);
this.transactionDirectory = getTransactionDir(this.transactionId);
this.needMoveFiles = new HashMap<>();
this.partitionDirAndValuesMap = new HashMap<>();
this.needMoveFiles = new LinkedHashMap<>();
this.partitionDirAndValuesMap = new LinkedHashMap<>();
}

private String getTransactionId(Long checkpointId) {
Expand All @@ -325,18 +334,21 @@ private String getTransactionId(Long checkpointId) {
*/
@Override
public List<FileSinkState> snapshotState(long checkpointId) {
Map<String, List<String>> commitMap =
LinkedHashMap<String, List<String>> commitMap =
this.partitionDirAndValuesMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
Map.Entry::getKey,
e -> new ArrayList<>(e.getValue()),
(e1, e2) -> e1,
LinkedHashMap::new));
ArrayList<FileSinkState> fileState =
Lists.newArrayList(
new FileSinkState(
this.transactionId,
this.uuidPrefix,
this.checkpointId,
new HashMap<>(this.needMoveFiles),
new LinkedHashMap<>(this.needMoveFiles),
commitMap,
this.getTransactionDir(transactionId)));
this.beingWrittenFile.clear();
Expand All @@ -363,7 +375,7 @@ public static String getTransactionDirPrefix(String tmpPath, String jobId, Strin
}

public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow) {
Map<String, List<String>> dataPartitionDirAndValuesMap =
LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap =
generatorPartitionDir(seaTunnelRow);
String beingWrittenFileKey = dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
// get filePath from beingWrittenFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
import lombok.NonNull;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.LinkedHashMap;

public class ExcelWriteStrategy extends AbstractWriteStrategy {
private final Map<String, ExcelGenerator> beingWrittenWriter;
private final LinkedHashMap<String, ExcelGenerator> beingWrittenWriter;

public ExcelWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
this.beingWrittenWriter = new HashMap<>();
this.beingWrittenWriter = new LinkedHashMap<>();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

public class JsonWriteStrategy extends AbstractWriteStrategy {
private final byte[] rowDelimiter;
private SerializationSchema serializationSchema;
private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
private final LinkedHashMap<String, FSDataOutputStream> beingWrittenOutputStream;
private final Map<String, Boolean> isFirstWrite;

public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
this.beingWrittenOutputStream = new HashMap<>();
this.beingWrittenOutputStream = new LinkedHashMap<>();
this.isFirstWrite = new HashMap<>();
this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class OrcWriteStrategy extends AbstractWriteStrategy {
private final Map<String, Writer> beingWrittenWriter;
private final LinkedHashMap<String, Writer> beingWrittenWriter;

public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
this.beingWrittenWriter = new HashMap<>();
this.beingWrittenWriter = new LinkedHashMap<>();
}

@Override
Expand Down
Loading