diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index 32d52d7e7af100..e0c46dde920b6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -56,8 +56,10 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { private Map> taskGroups = Maps.newConcurrentMap(); // for one task group, there may be different requests about changing a partition to new. // but we only change one time and save the relations in partitionPairs. they're protected by taskLocks + @SerializedName(value = "taskLocks") private Map taskLocks = Maps.newConcurrentMap(); // > + @SerializedName(value = "partitionPairs") private Map> partitionPairs = Maps.newConcurrentMap(); public InsertOverwriteManager() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 992c512e4026b0..f3858e18d37447 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3597,7 +3597,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t @Override public TReplacePartitionResult replacePartition(TReplacePartitionRequest request) throws TException { - LOG.info("Receive create partition request: {}", request); + LOG.info("Receive replace partition request: {}", request); long dbId = request.getDbId(); long tableId = request.getTableId(); List partitionIds = request.getPartitionIds(); diff --git a/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out index 3cde86880473dc..51e06f3f7bfe85 100644 --- a/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out +++ b/regression-test/data/insert_overwrite_p0/insert_overwrite_auto_detect.out @@ -75,6 +75,19 @@ LIST SHANGHAI XXX +-- !sql -- +7654321 +BEIJING +LIST +SHANGHAI +XXX + +-- !sql -- +Beijing + +-- !sql -- +aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + -- !sql -- 2008-01-01 2008-02-02 diff --git a/regression-test/data/insert_overwrite_p1/test_iot_auto_detect_concurrent.out b/regression-test/data/insert_overwrite_p1/test_iot_auto_detect_concurrent.out new file mode 100644 index 00000000000000..3c8a25336daf5d --- /dev/null +++ b/regression-test/data/insert_overwrite_p1/test_iot_auto_detect_concurrent.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql3 -- +100 + +-- !sql4 -- +100 + +-- !sql5 -- +0 + diff --git a/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy index 367aaa9d536ec5..4cf5f28e15333f 100644 --- a/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy +++ b/regression-test/suites/insert_overwrite_p0/insert_overwrite_auto_detect.groovy @@ -16,10 +16,12 @@ // under the License. suite("test_iot_auto_detect") { + // only nereids now sql """set enable_nereids_planner = true""" sql """set enable_fallback_to_original_planner = false""" sql """set enable_nereids_dml = true""" + // range sql " drop table if exists range1; " sql """ create table range1( @@ -46,6 +48,7 @@ suite("test_iot_auto_detect") { sql " insert overwrite table range1 partition(*) values (-100), (-100), (333), (444), (555); " qt_sql " select * from range1 order by k0; " + // list sql " drop table if exists list1; " sql """ create table list1( @@ -72,6 +75,50 @@ suite("test_iot_auto_detect") { qt_sql " select * from list1 order by k0; " sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("SHANGHAI"), ("XXX"), ("LIST"), ("7654321"); """ qt_sql " select * from list1 order by k0; " + + // with label - transactions + sql """ insert overwrite table list1 partition(*) with label `txn1` values ("BEIJING"), ("7654321"); """ + sql """ insert overwrite table list1 partition(*) with label `txn2` values ("SHANGHAI"), ("LIST"); """ + sql """ insert overwrite table list1 partition(*) with label `txn3` values ("XXX"); """ + + def max_try_milli_secs = 10000 + while(max_try_milli_secs) { + def result = sql " show load where label like 'txn_' " + if(result[0][2] == "FINISHED" && result[1][2] == "FINISHED" && result[2][2] == "FINISHED" ) { + break + } else { + sleep(1000) // wait 1 second every time + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + log.info("result: ${result[0][2]}, ${result[1][2]}, ${result[2][2]}") + fail() + } + } + } + + qt_sql " select * from list1 order by k0; " + + // long partition value + sql " drop table if exists list_long; " + sql """ + create table list_long( + k0 varchar null + ) + partition by list (k0) + ( + PARTITION p1 values in (("Beijing"), ("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")), + PARTITION p2 values in (("nonono")) + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql """ insert into list_long values ("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); """ + sql """ insert overwrite table list_long partition(*) values ("Beijing"); """ + qt_sql " select * from list_long order by k0; " + sql """ insert overwrite table list_long partition(*) values ("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); """ + qt_sql " select * from list_long order by k0; " + + // miss partitions try { sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("invalid"); """ } catch (Exception e) { diff --git a/regression-test/suites/insert_overwrite_p1/ddl/test_iot_auto_detect_concurrent.sql b/regression-test/suites/insert_overwrite_p1/ddl/test_iot_auto_detect_concurrent.sql new file mode 100644 index 00000000000000..02c3b9037b401e --- /dev/null +++ b/regression-test/suites/insert_overwrite_p1/ddl/test_iot_auto_detect_concurrent.sql @@ -0,0 +1,108 @@ +create table test_concurrent_write( + k0 int null +) +partition by range (k0) +( + PARTITION p10 values less than (10), + PARTITION p20 values less than (20), + PARTITION p30 values less than (30), + PARTITION p40 values less than (40), + PARTITION p50 values less than (50), + PARTITION p60 values less than (60), + PARTITION p70 values less than (70), + PARTITION p80 values less than (80), + PARTITION p90 values less than (90), + PARTITION p100 values less than (100), + PARTITION p110 values less than (110), + PARTITION p120 values less than (120), + PARTITION p130 values less than (130), + PARTITION p140 values less than (140), + PARTITION p150 values less than (150), + PARTITION p160 values less than (160), + PARTITION p170 values less than (170), + PARTITION p180 values less than (180), + PARTITION p190 values less than (190), + PARTITION p200 values less than (200), + PARTITION p210 values less than (210), + PARTITION p220 values less than (220), + PARTITION p230 values less than (230), + PARTITION p240 values less than (240), + PARTITION p250 values less than (250), + PARTITION p260 values less than (260), + PARTITION p270 values less than (270), + PARTITION p280 values less than (280), + PARTITION p290 values less than (290), + PARTITION p300 values less than (300), + PARTITION p310 values less than (310), + PARTITION p320 values less than (320), + PARTITION p330 values less than (330), + PARTITION p340 values less than (340), + PARTITION p350 values less than (350), + PARTITION p360 values less than (360), + PARTITION p370 values less than (370), + PARTITION p380 values less than (380), + PARTITION p390 values less than (390), + PARTITION p400 values less than (400), + PARTITION p410 values less than (410), + PARTITION p420 values less than (420), + PARTITION p430 values less than (430), + PARTITION p440 values less than (440), + PARTITION p450 values less than (450), + PARTITION p460 values less than (460), + PARTITION p470 values less than (470), + PARTITION p480 values less than (480), + PARTITION p490 values less than (490), + PARTITION p500 values less than (500), + PARTITION p510 values less than (510), + PARTITION p520 values less than (520), + PARTITION p530 values less than (530), + PARTITION p540 values less than (540), + PARTITION p550 values less than (550), + PARTITION p560 values less than (560), + PARTITION p570 values less than (570), + PARTITION p580 values less than (580), + PARTITION p590 values less than (590), + PARTITION p600 values less than (600), + PARTITION p610 values less than (610), + PARTITION p620 values less than (620), + PARTITION p630 values less than (630), + PARTITION p640 values less than (640), + PARTITION p650 values less than (650), + PARTITION p660 values less than (660), + PARTITION p670 values less than (670), + PARTITION p680 values less than (680), + PARTITION p690 values less than (690), + PARTITION p700 values less than (700), + PARTITION p710 values less than (710), + PARTITION p720 values less than (720), + PARTITION p730 values less than (730), + PARTITION p740 values less than (740), + PARTITION p750 values less than (750), + PARTITION p760 values less than (760), + PARTITION p770 values less than (770), + PARTITION p780 values less than (780), + PARTITION p790 values less than (790), + PARTITION p800 values less than (800), + PARTITION p810 values less than (810), + PARTITION p820 values less than (820), + PARTITION p830 values less than (830), + PARTITION p840 values less than (840), + PARTITION p850 values less than (850), + PARTITION p860 values less than (860), + PARTITION p870 values less than (870), + PARTITION p880 values less than (880), + PARTITION p890 values less than (890), + PARTITION p900 values less than (900), + PARTITION p910 values less than (910), + PARTITION p920 values less than (920), + PARTITION p930 values less than (930), + PARTITION p940 values less than (940), + PARTITION p950 values less than (950), + PARTITION p960 values less than (960), + PARTITION p970 values less than (970), + PARTITION p980 values less than (980), + PARTITION p990 values less than (990), + PARTITION p1000 values less than (1000) +) +DISTRIBUTED BY HASH(`k0`) BUCKETS 1 +properties("replication_num" = "1"); diff --git a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy new file mode 100644 index 00000000000000..200dd874df9540 --- /dev/null +++ b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iot_auto_detect_concurrent") { + // only nereids now + sql """set enable_nereids_planner = true""" + sql """set enable_fallback_to_original_planner = false""" + sql """set enable_nereids_dml = true""" + + def db_name = "test_iot_auto_detect_concurrent" + def table_name = "test_concurrent_write" + + sql " create database if not exists test_iot_auto_detect_concurrent; " + sql " use test_iot_auto_detect_concurrent; " + sql " drop table if exists test_concurrent_write; " + sql new File("""${context.file.parent}/ddl/test_iot_auto_detect_concurrent.sql""").text + + def success_status = true + def load_data = { range, offset, expect_success -> + try { + sql " use test_iot_auto_detect_concurrent; " + sql """set enable_nereids_planner = true""" + sql """set enable_fallback_to_original_planner = false""" + sql """set enable_nereids_dml = true""" + sql """ insert overwrite table test_concurrent_write partition(*) + select number*10+${offset} from numbers("number" = "${range}"); + """ + } catch (Exception e) { + if (expect_success) { + success_status = false + log.info("fails one") + } + log.info("successfully catch the failed insert") + return + } + if (!expect_success) { + success_status = false + } + } + + def dropping = true + def drop_partition = { + sql " use test_iot_auto_detect_concurrent; " + while (dropping) { + try { + sql """ alter table test_concurrent_write + drop partition p10, drop partition p20, drop partition p30, drop partition p40, drop partition p50, + drop partition p60, drop partition p70, drop partition p80, drop partition p90, drop partition p100; + """ + } catch (Exception e) {} + } + } + + def result + + + /// same data and partitions + success_status = true + sql """ insert into test_concurrent_write select * from numbers("number" = "1000"); """ + def thread1 = Thread.start { load_data(100, 0, false) } + def thread2 = Thread.start { load_data(100, 0, false) } + def thread3 = Thread.start { load_data(100, 0, false) } + def thread4 = Thread.start { load_data(100, 0, false) } + def thread5 = Thread.start { load_data(100, 0, false) } + thread1.join() + thread2.join() + thread3.join() + thread4.join() + thread5.join() + // suppose result: success zero or one + if (success_status) { // success zero + result = sql " select count(k0) from test_concurrent_write; " + assertEquals(result[0][0], 1000) + result = sql " select count(distinct k0) from test_concurrent_write; " + assertEquals(result[0][0], 1000) + } else { // success one + result = sql " select count(k0) from test_concurrent_write; " + assertEquals(result[0][0], 100) + result = sql " select count(distinct k0) from test_concurrent_write; " + assertEquals(result[0][0], 100) + } + + + /// not same data/partitions + success_status = true + sql """ insert overwrite table test_concurrent_write select * from numbers("number" = "1000"); """ + def thread6 = Thread.start { load_data(50, 0, true) } // 0, 10 ... 490 + def thread7 = Thread.start { load_data(50, 500, true) } // 500, 10 ... 990 + thread6.join() + thread7.join() + // suppose result: Success to overwrite with a multiple of ten values + assertTrue(success_status) + qt_sql3 " select count(k0) from test_concurrent_write; " + qt_sql4 " select count(distinct k0) from test_concurrent_write; " + + + /// with drop partition concurrently + success_status = true + sql """ truncate table test_concurrent_write; """ + def thread10 = Thread.start { drop_partition() } + def thread8 = Thread.start { load_data(100, 0, false) } + def thread9 = Thread.start { load_data(100, 0, false) } + thread8.join() + thread9.join() + dropping = false // stop dropping + thread10.join() + // no success insert occur + assertTrue(success_status) // we concerned about this. no + qt_sql5 " select count(k0) from test_concurrent_write; " +} \ No newline at end of file