diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index df16b8f1be205c..cb01ff90339c12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; @@ -294,7 +295,7 @@ public void recordRunningTableOrException(DatabaseIf db, TableIf table) { // If executed in parallel, it may cause problems such as not being able to find temporary partitions. // But in terms of external table, we don't care the internal logic of execution, // so there's no need to keep records - if (!(table instanceof OlapTable)) { + if (!(table instanceof MTMV)) { return; } long dbId = db.getId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index 49e3e091d577fd..35bcaa0001ebef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -103,9 +103,16 @@ public static void replacePartition(TableIf olapTable, List partitionNam * @return */ public static List generateTempPartitionNames(List partitionNames) { + long threadId = Thread.currentThread().getId(); + // Adding thread ID as a prefix is to avoid mutual interference + // when different threads perform insert overwrite on the same partition simultaneously. + // Even if the insert overwrite execution fails/cancels, + // the generated temporary partition will be deleted, + // so there will be no problem generating temporary partitions with the same name in a single thread + String prefix = "iot_temp_" + threadId + "_"; List tempPartitionNames = new ArrayList(partitionNames.size()); for (String partitionName : partitionNames) { - String tempPartitionName = "iot_temp_" + partitionName; + String tempPartitionName = prefix + partitionName; if (tempPartitionName.length() > 50) { tempPartitionName = tempPartitionName.substring(0, 30) + Math.abs(Objects.hash(tempPartitionName)) + "_" + System.currentTimeMillis(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java index 026f821352246e..607d79b38caeac 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java @@ -18,6 +18,7 @@ package org.apache.doris.insertoverwrite; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; @@ -40,6 +41,9 @@ public class InsertOverwriteManagerTest { @Mocked private HMSExternalTable hmsExternalTable; + @Mocked + private MTMV mtmv; + @Before public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException { @@ -69,18 +73,26 @@ public void setUp() hmsExternalTable.getName(); minTimes = 0; result = "hmsTable"; + + mtmv.getId(); + minTimes = 0; + result = 4L; + + mtmv.getName(); + minTimes = 0; + result = "mtmv1"; } }; } @Test - public void testParallel() { + public void testMTMVParallel() { InsertOverwriteManager manager = new InsertOverwriteManager(); - manager.recordRunningTableOrException(db, table); + manager.recordRunningTableOrException(db, mtmv); Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class, - () -> manager.recordRunningTableOrException(db, table)); - manager.dropRunningRecord(db.getId(), table.getId()); - Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, table)); + () -> manager.recordRunningTableOrException(db, mtmv)); + manager.dropRunningRecord(db.getId(), mtmv.getId()); + Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, mtmv)); } @Test @@ -90,4 +102,12 @@ public void testHmsTableParallel() { Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, hmsExternalTable)); manager.dropRunningRecord(db.getId(), hmsExternalTable.getId()); } + + @Test + public void testOlapTableParallel() { + InsertOverwriteManager manager = new InsertOverwriteManager(); + manager.recordRunningTableOrException(db, table); + Assertions.assertDoesNotThrow(() -> manager.recordRunningTableOrException(db, table)); + manager.dropRunningRecord(db.getId(), table.getId()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java new file mode 100644 index 00000000000000..947e876c53c8b5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import com.google.common.collect.Lists; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.List; + +public class InsertOverwriteUtilTest { + + @Test + public void testGenerateTempPartitionNames() { + String regex = "^iot_temp_[0-9]+_p1$"; + List res = InsertOverwriteUtil.generateTempPartitionNames(Lists.newArrayList("p1")); + String tempP1Name = res.get(0); + Assertions.assertTrue(tempP1Name.matches(regex)); + } +} diff --git a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy index 0ce026fb99b184..e796edfe5bb1d2 100644 --- a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy +++ b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy @@ -26,7 +26,6 @@ suite("test_iot_auto_detect_concurrent") { sql new File("""${context.file.parent}/ddl/test_iot_auto_detect_concurrent.sql""").text def success_status = true - def err_msg = "" def load_data = { range, offset, expect_success -> try { sql " use test_iot_auto_detect_concurrent; " @@ -38,7 +37,6 @@ suite("test_iot_auto_detect_concurrent") { success_status = false log.info("fails one") } - err_msg = e.getMessage() log.info("successfully catch the failed insert") return } @@ -100,14 +98,10 @@ suite("test_iot_auto_detect_concurrent") { thread6.join() thread7.join() // suppose result: Success to overwrite with a multiple of ten values - if (!success_status) { - // Not allowed running Insert Overwrite on same table - assertTrue(err_msg.contains('same table')) - } else { - // The execution was fast, resulting in no concurrent execution - qt_sql3 " select count(k0) from test_concurrent_write; " - qt_sql4 " select count(distinct k0) from test_concurrent_write; " - } + assertTrue(success_status) + qt_sql3 " select count(k0) from test_concurrent_write; " + qt_sql4 " select count(distinct k0) from test_concurrent_write; " + /// with drop partition concurrently success_status = true