Skip to content

Commit

Permalink
drainer/*: get latest timestamp from pd when initial-commit-ts is -1 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Nov 5, 2019
1 parent 5e079f8 commit b764f6b
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 17 deletions.
2 changes: 1 addition & 1 deletion drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", 0, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint")
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1")
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)")
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
Expand Down
5 changes: 5 additions & 0 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func NewServer(cfg *Config) (*Server, error) {
}
latestTime := time.Now()

if cfg.InitialCommitTS == -1 {
log.Info("set InitialCommitTS", zap.Int64("ts", latestTS))
cfg.InitialCommitTS = latestTS
}

cfg.SyncerCfg.To.ClusterID = clusterID
pdCli.Close()

Expand Down
3 changes: 2 additions & 1 deletion tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) {
// run casePKAddDuplicateUK
tr.run(func(src *sql.DB) {
err := execSQLs(src, casePKAddDuplicateUK)
if err != nil && !strings.Contains(err.Error(), "Duplicate for key") {
// the add unique index will failed by duplicate entry
if err != nil && !strings.Contains(err.Error(), "Duplicate") {
log.S().Fatal(err)
}
})
Expand Down
6 changes: 1 addition & 5 deletions tests/kafka/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@

set -e

# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
ms=$(date +'%s')
ts=$(($ms*1000<<18))

cd "$(dirname "$0")"

args="-initial-commit-ts=$ts"
args="-initial-commit-ts=-1"

kafka_addr=${KAFKA_ADDRS-127.0.0.1:9092}

Expand Down
7 changes: 3 additions & 4 deletions tests/reparo/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ set -e
cd "$(dirname "$0")"

# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
ms=$(date +'%s')
ts=$(($ms*1000<<18))
args="-initial-commit-ts=$ts"
args="-initial-commit-ts=-1"
down_run_sql "DROP DATABASE IF EXISTS tidb_binlog"
run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`"

rm -rf /tmp/tidb_binlog_test/data.drainer

run_drainer "$args" &

GO111MODULE=on go build -o out

run_sql "CREATE DATABASE IF NOT EXISTS \`reparo_test\`"

./out -config ./config.toml > ${OUT_DIR-/tmp}/$TEST_NAME.out 2>&1

sleep 5
Expand Down
4 changes: 1 addition & 3 deletions tests/restart/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ STATUS_LOG="${OUT_DIR}/status.log"

# run drainer, and drainer's status should be online
# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
ms=$(date +'%s')
ts=$(($ms*1000<<18))
args="-initial-commit-ts=$ts"
args="-initial-commit-ts=-1"
down_run_sql "DROP DATABASE IF EXISTS tidb_binlog"
rm -rf /tmp/tidb_binlog_test/data.drainer

Expand Down
4 changes: 1 addition & 3 deletions tests/status/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ OUT_DIR=/tmp/tidb_binlog_test
STATUS_LOG="${OUT_DIR}/status.log"

# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
ms=$(date +'%s')
ts=$(($ms*1000<<18))
args="-initial-commit-ts=$ts"
args="-initial-commit-ts=-1"
down_run_sql "DROP DATABASE IF EXISTS tidb_binlog"
rm -rf /tmp/tidb_binlog_test/data.drainer

Expand Down

0 comments on commit b764f6b

Please sign in to comment.