-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimistic: support start task with inconsistent upstream table schema #3903
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/hold |
/run-dm-integration-test |
/run-all-tests |
/run-all-tests |
/run-dm-integration-test |
/run-dm-integration-test |
1 similar comment
/run-dm-integration-test |
/run-dm-integration-test |
/run-dm-integration-test |
/run-all-tests |
/hold waiting @lichunzhu review |
Co-authored-by: lance6716 <lance6716@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will review tests later. Have two questions now:
- Should we delete obsolete init-schema for dm-clusters upgraded from lower versions?
- Should we flush table info for newly added tables?
// TODO: handle drop table | ||
continue | ||
} | ||
if !o.tk.SourceTableExist(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this too arbitrary? If we add some new tables while the scheduler didn't get at optimism.GetAllSourceTables
, will it cause a problem here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is same as origin logic
tiflow/dm/pkg/shardddl/optimism/keeper.go
Lines 62 to 69 in f917d36
// filter info which doesn't have SourceTable | |
// SourceTable will be changed after user update block-allow-list | |
// But old infos still remain in etcd. | |
// TODO: add a mechanism to remove all outdated infos in etcd. | |
if !lock.TableExist(info.Source, info.UpSchema, info.UpTable) { | |
delete(ifm[task][source][schema], table) | |
continue | |
} |
For create table while leader is transferred, there may be some problem. I will create a pr for create table with more tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dm/syncer/checkpoint.go
Outdated
sql2, arg := cp.genUpdateSQL(sourceSchema, sourceTable, location, nil, tiBytes, false) | ||
sqls = append(sqls, sql2) | ||
args = append(args, arg) | ||
batch := 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make 100 as a constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave a TODO in Line 81 in c9a821c
Yes. We still have some problem with newly create/drop table(e.g. #3823), I will create a new pr to solve them. |
/run-all-tests |
1 similar comment
/run-all-tests |
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
dm/tests/shardddl_optimistic/run.sh
Outdated
function random_restart() { | ||
mod=$(($RANDOM % 4)) | ||
if [[ "$mod" == "0" ]]; then | ||
restart_master | ||
elif [[ "$mod" == "1" ]]; then | ||
restart_worker1 | ||
elif [[ "$mod" == "2" ]]; then | ||
restart_worker2 | ||
else | ||
restart_task $cur/conf/double-source-optimistic.yaml | ||
fi | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think it's better to test all four cases for integration tests. It will be easier for us to trace which PR introduces the bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to test different combinations of them. e.g.(restart master and then restart worker), but too many combinations, so use randome. I add some log for trace in 41cf696
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
dm/tests/shardddl_optimistic/run.sh
Outdated
run_sql_source1 "alter table ${shardddl1}.${tb2} add column b varchar(10);" | ||
run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" | ||
run_sql_source2 "alter table ${shardddl1}.${tb2} add column b varchar(10);" | ||
|
||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(5,'aaa');" | ||
run_sql_source1 "insert into ${shardddl1}.${tb2} values(6,'bbb');" | ||
run_sql_source2 "insert into ${shardddl1}.${tb1} values(7,'ccc');" | ||
run_sql_source2 "insert into ${shardddl1}.${tb2} values(8,'ddd');" | ||
|
||
run_sql_source1 "alter table ${shardddl1}.${tb1} add column c text;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(9,'eee','eee');" | ||
run_sql_source1 "alter table ${shardddl1}.${tb2} drop column b;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb2} values(10);" | ||
run_sql_source2 "alter table ${shardddl1}.${tb1} add column c text;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb1} values(11,'fff','fff');" | ||
run_sql_source2 "alter table ${shardddl1}.${tb2} drop column b;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb2} values(12);" | ||
|
||
run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb}" "count(1): 12" | ||
|
||
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ | ||
"stop-task test -s mysql-replica-02" \ | ||
"\"result\": true" 2 | ||
|
||
run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(13,'ggg');" | ||
run_sql_source1 "alter table ${shardddl1}.${tb2} add column c text;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb2} values(14,'hhh');" | ||
|
||
run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb}" "count(1): 14" | ||
run_sql_tidb_with_retry "select count(1) from INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA='${shardddl}' AND TABLE_NAME='${tb}';" \ | ||
"count(1): 2" | ||
|
||
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ | ||
"start-task $cur/conf/double-source-optimistic.yaml -s mysql-replica-02" \ | ||
"\"result\": true" 2 | ||
|
||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(15,'iii');" | ||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(16,'jjj');" | ||
run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb1} values(17,'kkk');" | ||
run_sql_source2 "alter table ${shardddl1}.${tb2} add column c text;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb2} values(18,'lll');" | ||
|
||
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml | ||
} | ||
|
||
function DM_STOP_TASK_FOR_A_SOURCE_CASE() { | ||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1);" | ||
run_sql_source1 "insert into ${shardddl1}.${tb2} values(2);" | ||
run_sql_source2 "insert into ${shardddl1}.${tb1} values(3);" | ||
run_sql_source2 "insert into ${shardddl1}.${tb2} values(4);" | ||
|
||
run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" | ||
run_sql_source1 "alter table ${shardddl1}.${tb2} add column b varchar(10);" | ||
run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" | ||
run_sql_source2 "alter table ${shardddl1}.${tb2} add column b varchar(10);" | ||
|
||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(5,'aaa');" | ||
run_sql_source1 "insert into ${shardddl1}.${tb2} values(6,'bbb');" | ||
run_sql_source2 "insert into ${shardddl1}.${tb1} values(7,'ccc');" | ||
run_sql_source2 "insert into ${shardddl1}.${tb2} values(8,'ddd');" | ||
|
||
run_sql_source1 "alter table ${shardddl1}.${tb1} add column c text;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(9,'eee','eee');" | ||
run_sql_source1 "alter table ${shardddl1}.${tb2} drop column b;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb2} values(10);" | ||
run_sql_source2 "alter table ${shardddl1}.${tb1} add column c text;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb1} values(11,'fff','fff');" | ||
run_sql_source2 "alter table ${shardddl1}.${tb2} drop column b;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb2} values(12);" | ||
|
||
run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb}" "count(1): 12" | ||
|
||
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ | ||
"stop-task test -s mysql-replica-02" \ | ||
"\"result\": true" 2 | ||
|
||
run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(13,'ggg');" | ||
run_sql_source1 "alter table ${shardddl1}.${tb2} add column c text;" | ||
run_sql_source1 "insert into ${shardddl1}.${tb2} values(14,'hhh');" | ||
|
||
run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb}" "count(1): 14" | ||
run_sql_tidb_with_retry "select count(1) from INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA='${shardddl}' AND TABLE_NAME='${tb}';" \ | ||
"count(1): 2" | ||
|
||
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ | ||
"start-task $cur/conf/double-source-optimistic.yaml -s mysql-replica-02" \ | ||
"\"result\": true" 2 | ||
|
||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(15,'iii');" | ||
run_sql_source1 "insert into ${shardddl1}.${tb1} values(16,'jjj');" | ||
run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb1} values(17,'kkk');" | ||
run_sql_source2 "alter table ${shardddl1}.${tb2} add column c text;" | ||
run_sql_source2 "insert into ${shardddl1}.${tb2} values(18,'lll');" | ||
|
||
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two functions look exactly the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/unhold |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 41cf696
|
What problem does this PR solve?
Issue Number: close #3629
close #3786
close #3708
What is changed and how it works?
For DM-master:
joined
as init tableFor DM-worker:
Check List
Tests
Release note