From 06322507540c64d11483c70cdff952144161b849 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 30 Oct 2019 13:44:10 +0800 Subject: [PATCH 1/4] drainer/*: get latest timestamp from pd when initial-commit-ts is -1 --- drainer/config.go | 2 +- drainer/server.go | 5 +++++ tests/kafka/run.sh | 6 +----- tests/reparo/run.sh | 4 +--- tests/restart/run.sh | 4 +--- tests/status/run.sh | 4 +--- 6 files changed, 10 insertions(+), 15 deletions(-) diff --git a/drainer/config.go b/drainer/config.go index c35a40a64..3054b530e 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -123,7 +123,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") fs.StringVar(&cfg.LogFile, "log-file", "", "log file path") - fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint") + fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1") fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)") fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") diff --git a/drainer/server.go b/drainer/server.go index 581273d1c..d98160bbd 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -116,6 +116,11 @@ func NewServer(cfg *Config) (*Server, error) { } latestTime := time.Now() + if cfg.InitialCommitTS == -1 { + log.Info("set InitialCommitTS", zap.Int64("ts", latestTS)) + cfg.InitialCommitTS = latestTS + } + cfg.SyncerCfg.To.ClusterID = clusterID pdCli.Close() diff --git a/tests/kafka/run.sh b/tests/kafka/run.sh index 40d8795c7..e95a77b68 100755 --- a/tests/kafka/run.sh +++ b/tests/kafka/run.sh @@ -2,13 +2,9 @@ set -e -# use latest ts as initial-commit-ts, so we can skip binlog by previous test case -ms=$(date +'%s') -ts=$(($ms*1000<<18)) - cd "$(dirname "$0")" -args="-initial-commit-ts=$ts" +args="-initial-commit-ts=-1" kafka_addr=${KAFKA_ADDRS-127.0.0.1:9092} diff --git a/tests/reparo/run.sh b/tests/reparo/run.sh index ec72adc21..661646a40 100755 --- a/tests/reparo/run.sh +++ b/tests/reparo/run.sh @@ -5,9 +5,7 @@ set -e cd "$(dirname "$0")" # use latest ts as initial-commit-ts, so we can skip binlog by previous test case -ms=$(date +'%s') -ts=$(($ms*1000<<18)) -args="-initial-commit-ts=$ts" +args="-initial-commit-ts=-1" down_run_sql "DROP DATABASE IF EXISTS tidb_binlog" run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`" diff --git a/tests/restart/run.sh b/tests/restart/run.sh index 158d5b2b0..cb76558f8 100755 --- a/tests/restart/run.sh +++ b/tests/restart/run.sh @@ -9,9 +9,7 @@ STATUS_LOG="${OUT_DIR}/status.log" # run drainer, and drainer's status should be online # use latest ts as initial-commit-ts, so we can skip binlog by previous test case -ms=$(date +'%s') -ts=$(($ms*1000<<18)) -args="-initial-commit-ts=$ts" +args="-initial-commit-ts=-1" down_run_sql "DROP DATABASE IF EXISTS tidb_binlog" rm -rf /tmp/tidb_binlog_test/data.drainer diff --git a/tests/status/run.sh b/tests/status/run.sh index 48f8fa94f..8203e47d8 100755 --- a/tests/status/run.sh +++ b/tests/status/run.sh @@ -9,9 +9,7 @@ OUT_DIR=/tmp/tidb_binlog_test STATUS_LOG="${OUT_DIR}/status.log" # use latest ts as initial-commit-ts, so we can skip binlog by previous test case -ms=$(date +'%s') -ts=$(($ms*1000<<18)) -args="-initial-commit-ts=$ts" +args="-initial-commit-ts=-1" down_run_sql "DROP DATABASE IF EXISTS tidb_binlog" rm -rf /tmp/tidb_binlog_test/data.drainer From aa57853cd1d081e3b86d42f50d56a3dde84b6bd1 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 30 Oct 2019 15:30:45 +0800 Subject: [PATCH 2/4] Update drainer/config.go Co-Authored-By: Eric Shen --- drainer/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drainer/config.go b/drainer/config.go index 3054b530e..b5aaa2c06 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -123,7 +123,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") fs.StringVar(&cfg.LogFile, "log-file", "", "log file path") - fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1") + fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1") fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)") fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") From ea3968b0cc0d73f6b05b1f719a0431aa4ceb55c1 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Sun, 3 Nov 2019 13:30:51 +0800 Subject: [PATCH 3/4] corect create db order don't rely on privier drainer instance --- tests/reparo/run.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/reparo/run.sh b/tests/reparo/run.sh index 661646a40..eb4a00e31 100755 --- a/tests/reparo/run.sh +++ b/tests/reparo/run.sh @@ -7,7 +7,6 @@ cd "$(dirname "$0")" # use latest ts as initial-commit-ts, so we can skip binlog by previous test case args="-initial-commit-ts=-1" down_run_sql "DROP DATABASE IF EXISTS tidb_binlog" -run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`" rm -rf /tmp/tidb_binlog_test/data.drainer @@ -15,6 +14,8 @@ run_drainer "$args" & GO111MODULE=on go build -o out +run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`" + ./out -config ./config.toml > ${OUT_DIR-/tmp}/$TEST_NAME.out 2>&1 sleep 5 From 2f3f40f8587e3b85aec107e6dbe5ec851a4ac839 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Sun, 3 Nov 2019 14:46:04 +0800 Subject: [PATCH 4/4] Fix err check --- tests/dailytest/case.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 8ab931ae8..9d961172f 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -211,7 +211,8 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { // run casePKAddDuplicateUK tr.run(func(src *sql.DB) { err := execSQLs(src, casePKAddDuplicateUK) - if err != nil && !strings.Contains(err.Error(), "Duplicate for key") { + // the add unique index will failed by duplicate entry + if err != nil && !strings.Contains(err.Error(), "Duplicate") { log.S().Fatal(err) } })