Skip to content

Commit

Permalink
[bugfix](hive/iceberg)align with Hive insert overwrite table function…
Browse files Browse the repository at this point in the history
…ality (#39840) (#40724)

bp #39840
  • Loading branch information
wuwenchi authored Sep 12, 2024
1 parent 23b21fc commit 4b7b43b
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.doris.fs.remote.SwitchingFileSystem;
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TS3MPUPendingUpload;
import org.apache.doris.thrift.TUpdateMode;
Expand Down Expand Up @@ -63,6 +65,7 @@
import software.amazon.awssdk.services.s3.model.CompletedPart;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -86,6 +89,8 @@ public class HMSTransaction implements Transaction {
private final FileSystem fs;
private Optional<SummaryProfile> summaryProfile = Optional.empty();
private String queryId;
private boolean isOverwrite = false;
TFileType fileType;

private final Map<SimpleTableInfo, Action<TableAndMore>> tableActions = new HashMap<>();
private final Map<SimpleTableInfo, Map<List<String>, Action<PartitionAndMore>>>
Expand All @@ -96,6 +101,7 @@ public class HMSTransaction implements Transaction {
private HmsCommitter hmsCommitter;
private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList();
private String declaredIntentionsToWrite;
private boolean isMockedPartitionUpdate = false;

private static class UncompletedMpuPendingUpload {

Expand Down Expand Up @@ -173,9 +179,38 @@ public void rollback() {
public void beginInsertTable(HiveInsertCommandContext ctx) {
declaredIntentionsToWrite = ctx.getWritePath();
queryId = ctx.getQueryId();
isOverwrite = ctx.isOverwrite();
fileType = ctx.getFileType();
}

public void finishInsertTable(SimpleTableInfo tableInfo) {
Table table = getTable(tableInfo);
if (hivePartitionUpdates.isEmpty() && isOverwrite && table.getPartitionKeysSize() == 0) {
// use an empty hivePartitionUpdate to clean source table
isMockedPartitionUpdate = true;
THivePartitionUpdate emptyUpdate = new THivePartitionUpdate() {{
setUpdateMode(TUpdateMode.OVERWRITE);
setFileSize(0);
setRowCount(0);
setFileNames(Collections.emptyList());
if (fileType == TFileType.FILE_S3) {
setS3MpuPendingUploads(Lists.newArrayList(new TS3MPUPendingUpload()));
setLocation(new THiveLocationParams() {{
setWritePath(table.getSd().getLocation());
}
});
} else {
fs.makeDir(declaredIntentionsToWrite);
setLocation(new THiveLocationParams() {{
setWritePath(declaredIntentionsToWrite);
}
});
}
}
};
hivePartitionUpdates = Lists.newArrayList(emptyUpdate);
}

List<THivePartitionUpdate> mergedPUs = mergePartitions(hivePartitionUpdates);
for (THivePartitionUpdate pu : mergedPUs) {
if (pu.getS3MpuPendingUploads() != null) {
Expand All @@ -185,7 +220,6 @@ public void finishInsertTable(SimpleTableInfo tableInfo) {
}
}
}
Table table = getTable(tableInfo);
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : mergedPUs) {
TUpdateMode updateMode = pu.getUpdateMode();
Expand Down Expand Up @@ -1534,6 +1568,12 @@ public void wrapperAsyncRenameDirWithProfileSummary(Executor executor,

private void s3Commit(Executor fileSystemExecutor, List<CompletableFuture<?>> asyncFileSystemTaskFutures,
AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate hivePartitionUpdate, String path) {

List<TS3MPUPendingUpload> s3MpuPendingUploads = hivePartitionUpdate.getS3MpuPendingUploads();
if (isMockedPartitionUpdate) {
return;
}

S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) fs).fileSystem(path);
S3Client s3Client;
try {
Expand All @@ -1542,7 +1582,7 @@ private void s3Commit(Executor fileSystemExecutor, List<CompletableFuture<?>> as
throw new RuntimeException(e);
}

for (TS3MPUPendingUpload s3MPUPendingUpload : hivePartitionUpdate.getS3MpuPendingUploads()) {
for (TS3MPUPendingUpload s3MPUPendingUpload : s3MpuPendingUploads) {
asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
if (fileSystemTaskCancelled.get()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@
import com.google.common.collect.Lists;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.WriteResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -91,10 +96,15 @@ private void updateManifestAfterInsert(TUpdateMode updateMode) {
PartitionSpec spec = table.spec();
FileFormat fileFormat = IcebergUtils.getFileFormat(table);

//convert commitDataList to writeResult
WriteResult writeResult = IcebergWriterHelper
.convertToWriterResult(fileFormat, spec, commitDataList);
List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
List<WriteResult> pendingResults;
if (commitDataList.isEmpty()) {
pendingResults = Collections.emptyList();
} else {
//convert commitDataList to writeResult
WriteResult writeResult = IcebergWriterHelper
.convertToWriterResult(fileFormat, spec, commitDataList);
pendingResults = Lists.newArrayList(writeResult);
}

if (updateMode == TUpdateMode.APPEND) {
commitAppendTxn(table, pendingResults);
Expand Down Expand Up @@ -138,6 +148,22 @@ private void commitAppendTxn(Table table, List<WriteResult> pendingResults) {


private void commitReplaceTxn(Table table, List<WriteResult> pendingResults) {
if (pendingResults.isEmpty()) {
// such as : insert overwrite table `dst_tb` select * from `empty_tb`
// 1. if dst_tb is a partitioned table, it will return directly.
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied.
if (!table.spec().isPartitioned()) {
OverwriteFiles overwriteFiles = table.newOverwrite();
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file()));
} catch (IOException e) {
throw new RuntimeException(e);
}
overwriteFiles.commit();
}
return;
}

// commit replace partitions
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
for (WriteResult result : pendingResults) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.thrift.TFileType;

/**
* For Hive Table
*/
public class HiveInsertCommandContext extends BaseExternalTableInsertCommandContext {
private String writePath;
private String queryId;
private TFileType fileType;

public String getWritePath() {
return writePath;
Expand All @@ -39,4 +42,12 @@ public String getQueryId() {
public void setQueryId(String queryId) {
this.queryId = queryId;
}

public TFileType getFileType() {
return fileType;
}

public void setFileType(TFileType fileType) {
this.fileType = fileType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
context.setWritePath(storageLocation);
context.setFileType(fileType);
}
} else {
String writeTempPath = createTempPath(location);
Expand All @@ -139,6 +140,7 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
context.setWritePath(writeTempPath);
context.setFileType(fileType);
}
}
locationParams.setFileType(fileType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.nereids.trees.plans.commands.insert.IcebergInsertCommandContext;
import org.apache.doris.thrift.TFileContent;
import org.apache.doris.thrift.TIcebergCommitData;

Expand Down Expand Up @@ -199,7 +200,7 @@ public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo)
txn.finishInsert(tableInfo, Optional.empty());
txn.commit();

checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6");
checkSnapshotAddProperties(table.currentSnapshot().summary(), "6", "2", "6");
checkPushDownByPartitionForTs(table, "ts1");
checkPushDownByPartitionForTs(table, "ts2");
checkPushDownByPartitionForTs(table, "ts3");
Expand Down Expand Up @@ -287,7 +288,7 @@ public void testUnPartitionedTable() throws UserException {
ctd1.setFileSize(2);

TIcebergCommitData ctd2 = new TIcebergCommitData();
ctd2.setFilePath("f1.parquet");
ctd2.setFilePath("f2.parquet");
ctd2.setFileContent(TFileContent.DATA);
ctd2.setRowCount(4);
ctd2.setFileSize(4);
Expand All @@ -310,22 +311,31 @@ public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo)
txn.finishInsert(tableInfo, Optional.empty());
txn.commit();

checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6");
checkSnapshotAddProperties(table.currentSnapshot().summary(), "6", "2", "6");
}

private IcebergTransaction getTxn() {
return new IcebergTransaction(ops);
}

private void checkSnapshotProperties(Map<String, String> props,
String addRecords,
String addFileCnt,
String addFileSize) {
private void checkSnapshotAddProperties(Map<String, String> props,
String addRecords,
String addFileCnt,
String addFileSize) {
Assert.assertEquals(addRecords, props.get("added-records"));
Assert.assertEquals(addFileCnt, props.get("added-data-files"));
Assert.assertEquals(addFileSize, props.get("added-files-size"));
}

private void checkSnapshotTotalProperties(Map<String, String> props,
String totalRecords,
String totalFileCnt,
String totalFileSize) {
Assert.assertEquals(totalRecords, props.get("total-records"));
Assert.assertEquals(totalFileCnt, props.get("total-data-files"));
Assert.assertEquals(totalFileSize, props.get("total-files-size"));
}

private String numToYear(Integer num) {
Transform<Object, Integer> year = Transforms.year();
return year.toHumanString(Types.IntegerType.get(), num);
Expand Down Expand Up @@ -368,4 +378,75 @@ public void testTransform() {
Assert.assertEquals("2024-12-11", numToDay(dt));
}

@Test
public void testUnPartitionedTableOverwriteWithData() throws UserException {

testUnPartitionedTable();

ArrayList<TIcebergCommitData> ctdList = new ArrayList<>();
TIcebergCommitData ctd1 = new TIcebergCommitData();
ctd1.setFilePath("f3.parquet");
ctd1.setFileContent(TFileContent.DATA);
ctd1.setRowCount(6);
ctd1.setFileSize(6);

TIcebergCommitData ctd2 = new TIcebergCommitData();
ctd2.setFilePath("f4.parquet");
ctd2.setFileContent(TFileContent.DATA);
ctd2.setRowCount(8);
ctd2.setFileSize(8);

TIcebergCommitData ctd3 = new TIcebergCommitData();
ctd3.setFilePath("f5.parquet");
ctd3.setFileContent(TFileContent.DATA);
ctd3.setRowCount(10);
ctd3.setFileSize(10);

ctdList.add(ctd1);
ctdList.add(ctd2);
ctdList.add(ctd3);

Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition));
new MockUp<IcebergUtils>() {
@Mock
public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
return table;
}
};

IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition);
txn.beginInsert(tableInfo);
IcebergInsertCommandContext ctx = new IcebergInsertCommandContext();
ctx.setOverwrite(true);
txn.finishInsert(tableInfo, Optional.of(ctx));
txn.commit();

checkSnapshotTotalProperties(table.currentSnapshot().summary(), "24", "3", "24");
}

@Test
public void testUnpartitionedTableOverwriteWithoutData() throws UserException {

testUnPartitionedTableOverwriteWithData();

Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition));
new MockUp<IcebergUtils>() {
@Mock
public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) {
return table;
}
};

IcebergTransaction txn = getTxn();
SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition);
txn.beginInsert(tableInfo);
IcebergInsertCommandContext ctx = new IcebergInsertCommandContext();
ctx.setOverwrite(true);
txn.finishInsert(tableInfo, Optional.of(ctx));
txn.commit();

checkSnapshotTotalProperties(table.currentSnapshot().summary(), "0", "0", "0");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q0 --
1 1
1 2
1 3

-- !q1 --
1 1
1 2
1 3

-- !q2 --
1 1
1 2
1 3

-- !q3 --

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q0 --
1 1
1 2
1 3

-- !q1 --
1 1
1 2
1 3

-- !q2 --
1 1
1 2
1 3

-- !q3 --

Loading

0 comments on commit 4b7b43b

Please sign in to comment.