Skip to content

Commit

Permalink
Merge branch 'master' into fixSchemaFlush
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Jun 28, 2021
2 parents 7c7239a + c2a7514 commit e7a66cd
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 13 deletions.
15 changes: 4 additions & 11 deletions syncer/sharding-meta/shardmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (meta *ShardingMeta) checkItemExists(item *DDLItem) (int, bool) {
// AddItem adds a new coming DDLItem into ShardingMeta
// 1. if DDLItem already exists in source sequence, check whether it is active DDL only
// 2. add the DDLItem into its related source sequence
// 3. if it is a new DDL in global sequence, add it into global sequence
// 3. if it is a new DDL in global sequence, which means len(source.Items) > len(global.Items), add it into global sequence
// 4. check the source sequence is the prefix-sequence of global sequence, if not, return an error
// returns:
// active: whether the DDL will be processed in this round
Expand All @@ -188,18 +188,11 @@ func (meta *ShardingMeta) AddItem(item *DDLItem) (active bool, err error) {
source.Items = append(source.Items, item)
}

found := false
for _, globalItem := range meta.global.Items {
if utils.CompareShardingDDLs(item.DDLs, globalItem.DDLs) {
found = true
break
}
}
if !found {
meta.global.Items = append(meta.global.Items, item)
global, source := meta.global, meta.sources[item.Source]
if len(source.Items) > len(global.Items) {
global.Items = append(global.Items, item)
}

global, source := meta.global, meta.sources[item.Source]
if !source.IsPrefixSequence(global) {
return false, terror.ErrSyncUnitDDLWrongSequence.Generate(source.Items, global.Items)
}
Expand Down
92 changes: 90 additions & 2 deletions tests/shardddl1_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ function DM_035_CASE() {
run_sql_source2 "alter table ${shardddl1}.${tb1} add new_col2 int;"
run_sql_source2 "alter table ${shardddl1}.${tb1} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(4,4,4);"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(5,5,5);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5,5);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(6);"
run_sql_source2 "alter table ${shardddl1}.${tb2} add new_col2 int;"
run_sql_source2 "alter table ${shardddl1}.${tb2} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(7,7,7);"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(8,8,8);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(8,8,8);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(9,9,9);"
run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb};" "count(1): 9"
}
Expand All @@ -405,9 +405,97 @@ function DM_035() {
run_case 035 "double-source-optimistic" "init_table 111 211 212" "clean_table" "optimistic"
}

function DM_SAME_DDL_TWICE_CASE() {
# source1.tb1 add column
run_sql_source1 "alter table ${shardddl1}.${tb1} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(2);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(3);"

# source2.tb1 add column
run_sql_source2 "alter table ${shardddl1}.${tb1} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(4,4);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(6);"
# source2.tb1 drop column
run_sql_source2 "alter table ${shardddl1}.${tb1} drop new_col1;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(7,7);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(8);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(9);"
# source2.tb1 add column back
run_sql_source2 "alter table ${shardddl1}.${tb1} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(10,10);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(11,11);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(12);"
# source2.tb1 drop column again
run_sql_source2 "alter table ${shardddl1}.${tb1} drop new_col1;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(13,13);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(14);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(15);"
# source2.tb1 add column back again
run_sql_source2 "alter table ${shardddl1}.${tb1} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(16,16);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(17,17);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(18);"

# source2.tb2 add column
run_sql_source2 "alter table ${shardddl1}.${tb2} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(19,19);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(20,20);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(21,21);"
# source2.tb2 drop column
run_sql_source2 "alter table ${shardddl1}.${tb2} drop new_col1;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(22,22);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(23,23);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(24);"
# source2.tb2 add column back
run_sql_source2 "alter table ${shardddl1}.${tb2} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(25,25);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(26,26);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(27,27);"
# source2.tb2 drop column again
run_sql_source2 "alter table ${shardddl1}.${tb2} drop new_col1;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(28,28);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(29,29);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(30);"
# source2.tb2 add column back again
run_sql_source2 "alter table ${shardddl1}.${tb2} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(31,31);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(32,32);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(33,33);"

# source1.tb1 drop column
run_sql_source1 "alter table ${shardddl1}.${tb1} drop new_col1;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(34);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(35,35);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(36,36);"
# source1.tb1 add column back
run_sql_source1 "alter table ${shardddl1}.${tb1} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(37,37);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(38,38);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(39,39);"
# source1.tb1 drop column again
run_sql_source1 "alter table ${shardddl1}.${tb1} drop new_col1;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(40);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(41,41);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(42,42);"
# source1.tb1 add column back again
run_sql_source1 "alter table ${shardddl1}.${tb1} add new_col1 int;"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(43,43);"
run_sql_source2 "insert into ${shardddl1}.${tb1} values(44,44);"
run_sql_source2 "insert into ${shardddl1}.${tb2} values(45,45);"

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

function DM_SAME_DDL_TWICE() {
run_case SAME_DDL_TWICE "double-source-pessimistic" "init_table 111 211 212" "clean_table" "pessimistic"
}

function run() {
init_cluster
init_database
DM_SAME_DDL_TWICE
start=6
end=35
except=(024 025 029)
Expand Down

0 comments on commit e7a66cd

Please sign in to comment.