diff --git a/dm/ctl/master/operate_schema.go b/dm/ctl/master/operate_schema.go index 71b15fd1b3..ce6813588e 100644 --- a/dm/ctl/master/operate_schema.go +++ b/dm/ctl/master/operate_schema.go @@ -33,7 +33,7 @@ func NewOperateSchemaCmd() *cobra.Command { } cmd.Flags().StringP("database", "d", "", "database name of the table") cmd.Flags().StringP("table", "t", "", "table name") - cmd.Flags().Bool("flush", false, "flush the table info and checkpoint immediately") + cmd.Flags().Bool("flush", true, "flush the table info and checkpoint immediately") cmd.Flags().Bool("sync", false, "sync the table info to master to resolve shard ddl lock, only for optimistic mode now") return cmd } @@ -110,9 +110,6 @@ func operateSchemaCmd(cmd *cobra.Command, _ []string) error { if err != nil { return err } - if flush && op != pb.SchemaOp_SetSchema { - return errors.New("--flush flag is only used to set schema") - } sync, err := cmd.Flags().GetBool("sync") if err != nil { return err diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 33529cc22a..f6f5656a9b 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -562,8 +562,13 @@ func (cp *RemoteCheckPoint) FlushPointWithTableInfo(tctx *tcontext.Context, sour sqls := make([]string, 0, 1) args := make([][]interface{}, 0, 10) + point := newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID) - point := cp.points[sourceSchema][sourceTable] + if tablePoints, ok := cp.points[sourceSchema]; ok { + if p, ok2 := tablePoints[sourceTable]; ok2 { + point = p + } + } tiBytes, err := json.Marshal(ti) if err != nil { diff --git a/tests/_utils/env_variables b/tests/_utils/env_variables index 255b76917f..4b9844f3e5 100755 --- a/tests/_utils/env_variables +++ b/tests/_utils/env_variables @@ -1,5 +1,6 @@ MYSQL_HOST1=${MYSQL_HOST1:-127.0.0.1} MYSQL_HOST2=${MYSQL_HOST2:-127.0.0.1} +TIDB_HOST=${TIDB_HOST:-127.0.0.1} MYSQL_PORT1=${MYSQL_PORT1:-3306} MYSQL_PORT2=${MYSQL_PORT2:-3307} MYSQL_PASSWORD1=${MYSQL_PASSWORD1:-123456} diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index 8f7aae98ef..73eaef1545 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -276,4 +276,9 @@ function init_cluster(){ sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 -} \ No newline at end of file +} + +function get_master_status() { + arr=$(echo "show master status;" | MYSQL_PWD=123456 mysql -uroot -h127.0.0.1 -P3306 | awk 'NR==2') + echo $arr +} diff --git a/tests/downstream_more_column/conf/dm-master.toml b/tests/downstream_more_column/conf/dm-master.toml new file mode 100644 index 0000000000..7cecf59ad8 --- /dev/null +++ b/tests/downstream_more_column/conf/dm-master.toml @@ -0,0 +1,4 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" +auto-compaction-retention = "3s" diff --git a/tests/downstream_more_column/conf/dm-task-incremental.yaml b/tests/downstream_more_column/conf/dm-task-incremental.yaml new file mode 100644 index 0000000000..977dcd671b --- /dev/null +++ b/tests/downstream_more_column/conf/dm-task-incremental.yaml @@ -0,0 +1,45 @@ +--- +name: test +task-mode: incremental +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +ignore-checking-items: ["table_schema"] + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" # compatible with deprecated config + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + meta: + binlog-gtid: binlog-gtid-placeholder + +black-white-list: # compatible with deprecated config + instance: + do-dbs: ["downstream_more_column"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/tests/downstream_more_column/conf/dm-task.yaml b/tests/downstream_more_column/conf/dm-task.yaml new file mode 100644 index 0000000000..71f636cd5b --- /dev/null +++ b/tests/downstream_more_column/conf/dm-task.yaml @@ -0,0 +1,42 @@ +--- +name: test +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" # compatible with deprecated config + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: # compatible with deprecated config + instance: + do-dbs: ["downstream_more_column"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/tests/downstream_more_column/conf/dm-worker1.toml b/tests/downstream_more_column/conf/dm-worker1.toml new file mode 100644 index 0000000000..7a72ea72bf --- /dev/null +++ b/tests/downstream_more_column/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/tests/downstream_more_column/conf/source1.yaml b/tests/downstream_more_column/conf/source1.yaml new file mode 100644 index 0000000000..c3b5818014 --- /dev/null +++ b/tests/downstream_more_column/conf/source1.yaml @@ -0,0 +1,10 @@ +source-id: mysql-replica-01 +flavor: '' +enable-gtid: true +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: false diff --git a/tests/downstream_more_column/data/db1.increment.sql b/tests/downstream_more_column/data/db1.increment.sql new file mode 100644 index 0000000000..af35b48ee2 --- /dev/null +++ b/tests/downstream_more_column/data/db1.increment.sql @@ -0,0 +1,3 @@ +use downstream_more_column; +insert into t1 values(111, 111, 111); +insert into t1 values(222, 222, 222); diff --git a/tests/downstream_more_column/data/db1.increment2.sql b/tests/downstream_more_column/data/db1.increment2.sql new file mode 100644 index 0000000000..01e7460f4b --- /dev/null +++ b/tests/downstream_more_column/data/db1.increment2.sql @@ -0,0 +1,3 @@ +use downstream_more_column; +insert into t1 values(1111, 1111, 1111); +insert into t1 values(2222, 2222, 2222); diff --git a/tests/downstream_more_column/data/db1.prepare.sql b/tests/downstream_more_column/data/db1.prepare.sql new file mode 100644 index 0000000000..5d3fa25ed7 --- /dev/null +++ b/tests/downstream_more_column/data/db1.prepare.sql @@ -0,0 +1,6 @@ +drop database if exists `downstream_more_column`; +create database `downstream_more_column`; +use `downstream_more_column`; +create table t1 (c1 int, c2 int, c3 int, primary key(c1)); +insert into t1 values(1, 1, 1); +insert into t1 values(2, 2, 2); diff --git a/tests/downstream_more_column/data/schema.sql b/tests/downstream_more_column/data/schema.sql new file mode 100644 index 0000000000..13c94c5715 --- /dev/null +++ b/tests/downstream_more_column/data/schema.sql @@ -0,0 +1 @@ +CREATE TABLE `t1` ( `c1` int(11) DEFAULT NULL, `c2` int(11) DEFAULT NULL, `c3` int(11) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/tests/downstream_more_column/data/tidb.prepare.sql b/tests/downstream_more_column/data/tidb.prepare.sql new file mode 100644 index 0000000000..b06d7eea64 --- /dev/null +++ b/tests/downstream_more_column/data/tidb.prepare.sql @@ -0,0 +1,4 @@ +drop database if exists `downstream_more_column`; +create database `downstream_more_column`; +use `downstream_more_column`; +create table t1 (a bigint PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int); diff --git a/tests/downstream_more_column/run.sh b/tests/downstream_more_column/run.sh new file mode 100755 index 0000000000..39a98ee501 --- /dev/null +++ b/tests/downstream_more_column/run.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +set -eu + +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +db="downstream_more_column" +tb="t1" + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + # create table in tidb with AUTO_INCREMENT + run_sql_file $cur/data/tidb.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + # start DM task in all mode + # schemaTracker create table from dump data + dmctl_start_task_standalone "$cur/conf/dm-task.yaml" "--remove-meta" + # check full load data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1<100;" "count(1): 2" + + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + # check incremental data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1>100 and c1<1000;" "count(1): 2" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test" \ + "\"result\": true" 2 + + # start DM task in incremental mode + # schemaTracker create table from downstream + master_status=($(get_master_status)) + cp $cur/conf/dm-task-incremental.yaml $WORK_DIR/dm-task-incremental.yaml + sed -i "s/binlog-gtid-placeholder/${master_status[2]}/g" $WORK_DIR/dm-task-incremental.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task-incremental.yaml --remove-meta" \ + "\"result\": true" 2 + + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + # coulmn count doesn't match + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "Column count doesn't match value count" 1 + + # operate-schema: flush checkpoint default + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-schema set -s mysql-replica-01 test -d ${db} -t ${tb} $cur/data/schema.sql" \ + "\"result\": true" 2 + check_log_contain_with_retry 'flush table info' $WORK_DIR/worker1/log/dm-worker.log + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task test" \ + "\"result\": true" 2 + + # check incremental data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1>1000 and c1<10000;" "count(1): 2" +} + +cleanup_data downstream_more_column +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/others_integration.txt b/tests/others_integration.txt index 4de516b833..f3633b173f 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -6,4 +6,5 @@ drop_column_with_index gtid only_dml adjust_gtid -load_task \ No newline at end of file +load_task +downstream_more_column \ No newline at end of file