From eccd29a031f1ee32219e671ebf3081fc2466d4dc Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 16 Nov 2022 17:23:51 +0800 Subject: [PATCH 1/6] binlog: do not decode rows for blaock table --- dm/relay/local_reader.go | 8 ++- dm/syncer/binlogstream/streamer_controller.go | 2 +- dm/syncer/data_validator.go | 2 +- dm/syncer/syncer.go | 39 ++--------- dm/syncer/util.go | 53 +++++++++++++-- dm/tests/binlog_parse/conf/diff_config.toml | 29 ++++++++ dm/tests/binlog_parse/conf/dm-master.toml | 4 ++ dm/tests/binlog_parse/conf/dm-task.yaml | 42 ++++++++++++ dm/tests/binlog_parse/conf/dm-worker1.toml | 2 + dm/tests/binlog_parse/conf/source1.yaml | 12 ++++ dm/tests/binlog_parse/data/db1.increment.sql | 3 + dm/tests/binlog_parse/data/db1.increment1.sql | 3 + dm/tests/binlog_parse/data/db1.prepare.sql | 7 ++ dm/tests/binlog_parse/run.sh | 67 +++++++++++++++++++ dm/worker/subtask.go | 17 ----- dm/worker/subtask_test.go | 11 --- go.mod | 2 +- go.sum | 4 +- 18 files changed, 232 insertions(+), 75 deletions(-) create mode 100644 dm/tests/binlog_parse/conf/diff_config.toml create mode 100644 dm/tests/binlog_parse/conf/dm-master.toml create mode 100644 dm/tests/binlog_parse/conf/dm-task.yaml create mode 100644 dm/tests/binlog_parse/conf/dm-worker1.toml create mode 100644 dm/tests/binlog_parse/conf/source1.yaml create mode 100644 dm/tests/binlog_parse/data/db1.increment.sql create mode 100644 dm/tests/binlog_parse/data/db1.increment1.sql create mode 100644 dm/tests/binlog_parse/data/db1.prepare.sql create mode 100755 dm/tests/binlog_parse/run.sh diff --git a/dm/relay/local_reader.go b/dm/relay/local_reader.go index b2f3f248df7..2c8524fb244 100644 --- a/dm/relay/local_reader.go +++ b/dm/relay/local_reader.go @@ -43,9 +43,10 @@ var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may // BinlogReaderConfig is the configuration for BinlogReader. type BinlogReaderConfig struct { - RelayDir string - Timezone *time.Location - Flavor string + RelayDir string + Timezone *time.Location + Flavor string + RowsEventDecodeFunc func(*replication.RowsEvent, []byte) error } // BinlogReader is a binlog reader. @@ -82,6 +83,7 @@ func newBinlogReader(logger log.Logger, cfg *BinlogReaderConfig, relay Process) parser.SetVerifyChecksum(true) // use string representation of decimal, to replicate the exact value parser.SetUseDecimal(false) + parser.SetRowsEventDecodeFunc(cfg.RowsEventDecodeFunc) if cfg.Timezone != nil { parser.SetTimestampStringLocation(cfg.Timezone) } diff --git a/dm/syncer/binlogstream/streamer_controller.go b/dm/syncer/binlogstream/streamer_controller.go index 289ddd9e3c9..54641bb0eac 100644 --- a/dm/syncer/binlogstream/streamer_controller.go +++ b/dm/syncer/binlogstream/streamer_controller.go @@ -288,7 +288,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.currentBinlogType == RemoteBinlog { c.streamProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID} } else { - c.streamProducer = &localBinlogReader{c.relay.NewReader(tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} + c.streamProducer = &localBinlogReader{c.relay.NewReader(tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor, RowsEventDecodeFunc: c.syncCfg.RowsEventDecodeFunc}), c.enableGTID} } c.upstream, err = newLocationStream(c.streamProducer, location) diff --git a/dm/syncer/data_validator.go b/dm/syncer/data_validator.go index 5b7dd187d70..b6208bf4f29 100644 --- a/dm/syncer/data_validator.go +++ b/dm/syncer/data_validator.go @@ -304,7 +304,7 @@ func (v *DataValidator) initialize() error { return err } - v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone) + v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList) if err != nil { return err } diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index b0c3d358e6d..f53c09a0a3e 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -364,7 +364,12 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return } - s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone) + s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) + if err != nil { + return terror.ErrSyncerUnitGenBAList.Delegate(err) + } + + s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList) if err != nil { return err } @@ -392,11 +397,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) { s.tctx.L(), ) - s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) - if err != nil { - return terror.ErrSyncerUnitGenBAList.Delegate(err) - } - s.binlogFilter, err = bf.NewBinlogEvent(s.cfg.CaseSensitive, s.cfg.FilterRules) if err != nil { return terror.ErrSyncerUnitGenBinlogEventFilter.Delegate(err) @@ -3330,33 +3330,6 @@ func (s *Syncer) checkpointID() string { return strconv.FormatUint(uint64(s.cfg.ServerID), 10) } -// UpdateFromConfig updates config for `From`. -func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error { - s.Lock() - defer s.Unlock() - s.fromDB.BaseDB.Close() - - s.cfg.From = cfg.From - - var err error - s.cfg.From.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDMLConnectionTimeout) - s.fromDB, err = dbconn.NewUpStreamConn(&s.cfg.From) - if err != nil { - s.tctx.L().Error("fail to create baseConn connection", log.ShortError(err)) - return err - } - - s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone) - if err != nil { - return err - } - - if s.streamerController != nil { - s.streamerController.UpdateSyncCfg(s.syncCfg, s.fromDB) - } - return nil -} - // ShardDDLOperation returns the current pending to handle shard DDL lock operation. func (s *Syncer) ShardDDLOperation() *pessimism.Operation { return s.pessimist.PendingOperation() diff --git a/dm/syncer/util.go b/dm/syncer/util.go index 1cc0eb32fb1..18c92e662dd 100644 --- a/dm/syncer/util.go +++ b/dm/syncer/util.go @@ -37,11 +37,14 @@ import ( "go.uber.org/zap" ) -// the time layout for TiDB SHOW DDL statements. -const timeLayout = "2006-01-02 15:04:05" - -// everytime retrieve 10 new rows from TiDB history jobs. -const linesOfRows = 10 +const ( + // the time layout for TiDB SHOW DDL statements. + timeLayout = "2006-01-02 15:04:05" + // everytime retrieve 10 new rows from TiDB history jobs. + linesOfRows = 10 + // max capacity of the block/allow list. + maxCapacity = 100000 +) // getTableByDML gets table from INSERT/UPDATE/DELETE statement. func getTableByDML(dml ast.DMLNode) (*filter.Table, error) { @@ -136,7 +139,7 @@ func str2TimezoneOrFromDB(tctx *tcontext.Context, tzStr string, dbCfg *config.DB return loc, tzStr, nil } -func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location) (replication.BinlogSyncerConfig, error) { +func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter) (replication.BinlogSyncerConfig, error) { var tlsConfig *tls.Config var err error if cfg.From.Security != nil { @@ -153,6 +156,43 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati } } + var rowsEventDecodeFunc func(*replication.RowsEvent, []byte) error + if baList != nil { + // we don't track delete table events, so simply reset the cache if it's full + allowListCache := make(map[uint64]struct{}, maxCapacity) + blockListCache := make(map[uint64]struct{}, maxCapacity) + + rowsEventDecodeFunc = func(re *replication.RowsEvent, data []byte) error { + pos, err := re.DecodeHeader(data) + if err != nil { + return err + } + if _, ok := blockListCache[re.TableID]; ok { + return nil + } else if _, ok := allowListCache[re.TableID]; ok { + return re.DecodeData(pos, data) + } + + tb := &filter.Table{ + Schema: string(re.Table.Schema), + Name: string(re.Table.Table), + } + if skipByTable(baList, tb) { + if len(blockListCache) >= maxCapacity { + blockListCache = make(map[uint64]struct{}, maxCapacity) + } + blockListCache[re.TableID] = struct{}{} + return nil + } + + if len(allowListCache) >= maxCapacity { + allowListCache = make(map[uint64]struct{}, maxCapacity) + } + allowListCache[re.TableID] = struct{}{} + return re.DecodeData(pos, data) + } + } + syncCfg := replication.BinlogSyncerConfig{ ServerID: cfg.ServerID, Flavor: cfg.Flavor, @@ -162,6 +202,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati Password: cfg.From.Password, TimestampStringLocation: timezone, TLSConfig: tlsConfig, + RowsEventDecodeFunc: rowsEventDecodeFunc, } // when retry count > 1, go-mysql will retry sync from the previous GTID set in GTID mode, // which may get duplicate binlog event after retry success. so just set retry count = 1, and task diff --git a/dm/tests/binlog_parse/conf/diff_config.toml b/dm/tests/binlog_parse/conf/diff_config.toml new file mode 100644 index 00000000000..9424d91bc53 --- /dev/null +++ b/dm/tests/binlog_parse/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["binlog_parse.t?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/binlog_parse/conf/dm-master.toml b/dm/tests/binlog_parse/conf/dm-master.toml new file mode 100644 index 00000000000..7cecf59ad86 --- /dev/null +++ b/dm/tests/binlog_parse/conf/dm-master.toml @@ -0,0 +1,4 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" +auto-compaction-retention = "3s" diff --git a/dm/tests/binlog_parse/conf/dm-task.yaml b/dm/tests/binlog_parse/conf/dm-task.yaml new file mode 100644 index 00000000000..80c47c51797 --- /dev/null +++ b/dm/tests/binlog_parse/conf/dm-task.yaml @@ -0,0 +1,42 @@ +--- +name: test +task-mode: all +is-sharding: false +meta-schema: "dm_meta" + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["binlog_parse"] + do-tables: + - db-name: "binlog_parse" + tbl-name: "t1" + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/binlog_parse/conf/dm-worker1.toml b/dm/tests/binlog_parse/conf/dm-worker1.toml new file mode 100644 index 00000000000..7a72ea72bf8 --- /dev/null +++ b/dm/tests/binlog_parse/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/dm/tests/binlog_parse/conf/source1.yaml b/dm/tests/binlog_parse/conf/source1.yaml new file mode 100644 index 00000000000..3c2f9a0a36e --- /dev/null +++ b/dm/tests/binlog_parse/conf/source1.yaml @@ -0,0 +1,12 @@ +source-id: mysql-replica-01 +enable-gtid: true +relay-binlog-name: '' +relay-binlog-gtid: '' +enable-relay: false +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: false diff --git a/dm/tests/binlog_parse/data/db1.increment.sql b/dm/tests/binlog_parse/data/db1.increment.sql new file mode 100644 index 00000000000..ec7256257b5 --- /dev/null +++ b/dm/tests/binlog_parse/data/db1.increment.sql @@ -0,0 +1,3 @@ +use binlog_parse; +insert into t1 (id, created_time) values (3, '2022-01-03 00:00:01'), (4, '2022-01-04 00:00:01'); +insert into t2 (id, created_time) values (3, '2022-01-03 00:00:01'), (4, '2022-01-04 00:00:01'); \ No newline at end of file diff --git a/dm/tests/binlog_parse/data/db1.increment1.sql b/dm/tests/binlog_parse/data/db1.increment1.sql new file mode 100644 index 00000000000..e14570d5cbc --- /dev/null +++ b/dm/tests/binlog_parse/data/db1.increment1.sql @@ -0,0 +1,3 @@ +use binlog_parse; +insert into t1 (id, created_time) values (5, '2022-01-05 00:00:01'), (6, '2022-01-06 00:00:01'); +insert into t2 (id, created_time) values (5, '2022-01-05 00:00:01'), (6, '2022-01-06 00:00:01'); \ No newline at end of file diff --git a/dm/tests/binlog_parse/data/db1.prepare.sql b/dm/tests/binlog_parse/data/db1.prepare.sql new file mode 100644 index 00000000000..6d614ccfd68 --- /dev/null +++ b/dm/tests/binlog_parse/data/db1.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `binlog_parse`; +create database `binlog_parse` character set utf8; +use `binlog_parse`; +create table t1 (id int, created_time timestamp, primary key(`id`)) character set utf8; +create table t2 (id int, created_time timestamp(3), primary key(`id`)) character set utf8; +insert into t1 (id, created_time) values (1, '2022-01-01 00:00:01'), (2, '2022-01-02 00:00:01'); +insert into t2 (id, created_time) values (1, '2022-01-01 00:00:01'), (2, '2022-01-02 00:00:01'); \ No newline at end of file diff --git a/dm/tests/binlog_parse/run.sh b/dm/tests/binlog_parse/run.sh new file mode 100755 index 00000000000..64451c18099 --- /dev/null +++ b/dm/tests/binlog_parse/run.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +set -eu + +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME + +function run() { + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + echo "prepare data" + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + + echo "start task" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task.yaml --remove-meta" + + echo "check full phase" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "validation start test" \ + "\"result\": true" 1 + + echo "prepare incremental data" + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + + echo "check incremental phase" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_sql_tidb_with_retry "select count(1) from binlog_parse.t1;" "count(1): 4" + + # relay error in mariadb:10.0, success in mysql + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 2 \ + "\"source\": \"$SOURCE_ID1\"" 1 \ + "\"worker\": \"worker1\"" 1 + # "TCPReader get relay event with error" 1 + + echo "prepare incremental data 2" + run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "validation start test" \ + "\"result\": true" 1 + + echo "check incremental phase 2" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + read v1 +} + +cleanup_data $TEST_NAME +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 99d75a1a836..cb886c7d6fc 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -678,23 +678,6 @@ func (st *SubTask) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchem return syncUnit.OperateSchema(ctx, req) } -// UpdateFromConfig updates config for `From`. -func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error { - st.Lock() - defer st.Unlock() - - if sync, ok := st.currUnit.(*syncer.Syncer); ok { - err := sync.UpdateFromConfig(cfg) - if err != nil { - return err - } - } - - st.cfg.From = cfg.From - - return nil -} - // CheckUnit checks whether current unit is sync unit. func (st *SubTask) CheckUnit() bool { st.RLock() diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index 6b1eef9f693..ce3803460bb 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -313,17 +313,6 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { c.Assert(st.Result(), IsNil) c.Assert(st.CheckUnit(), IsFalse) - cfg1 := &config.SubTaskConfig{ - Name: "xxx", - Mode: config.ModeFull, - From: config.DBConfig{ - Host: "127.0.0.1", - }, - } - c.Assert(st.UpdateFromConfig(cfg1), IsNil) - c.Assert(st.cfg.From, DeepEquals, cfg1.From) - c.Assert(st.cfg.Name, Equals, "testSubtaskScene") - // pause twice c.Assert(st.Pause(), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Paused) diff --git a/go.mod b/go.mod index d352ed1e3fc..692ed1ee44d 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/gin-gonic/gin v1.7.4 github.com/glebarez/go-sqlite v1.17.3 github.com/glebarez/sqlite v1.4.6 - github.com/go-mysql-org/go-mysql v1.6.1-0.20220718092400-c855c26b37bd + github.com/go-mysql-org/go-mysql v1.6.1-0.20221116091419-49d58c4c3e4c github.com/go-ozzo/ozzo-validation/v4 v4.3.0 github.com/go-sql-driver/mysql v1.6.0 github.com/goccy/go-json v0.9.11 diff --git a/go.sum b/go.sum index 87c60524846..6819777e935 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNV github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= -github.com/go-mysql-org/go-mysql v1.6.1-0.20220718092400-c855c26b37bd h1:kgQrwjBbmoOZzzbF1CDDIYeuBfLA+wA10DgQl6J5qZ8= -github.com/go-mysql-org/go-mysql v1.6.1-0.20220718092400-c855c26b37bd/go.mod h1:GX0clmylJLdZEYAojPCDTCvwZxbTBrke93dV55715u0= +github.com/go-mysql-org/go-mysql v1.6.1-0.20221116091419-49d58c4c3e4c h1:jtEp5qZ5vqi/o+qjb7lmHrP4AgPba+tPhPnBdb3f8Fo= +github.com/go-mysql-org/go-mysql v1.6.1-0.20221116091419-49d58c4c3e4c/go.mod h1:EiZjua0ULRd4UsnLg3+x++T7aROM4Wo8IDWKars/QiI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= From c5511e714c3b1724f4c106c3d0b63f8a027866b1 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 17 Nov 2022 20:23:35 +0800 Subject: [PATCH 2/6] address comment --- dm/syncer/util.go | 1 + dm/tests/binlog_parse/run.sh | 4 ++-- dm/tests/others_integration_1.txt | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dm/syncer/util.go b/dm/syncer/util.go index 18c92e662dd..21e1c906aa5 100644 --- a/dm/syncer/util.go +++ b/dm/syncer/util.go @@ -159,6 +159,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati var rowsEventDecodeFunc func(*replication.RowsEvent, []byte) error if baList != nil { // we don't track delete table events, so simply reset the cache if it's full + // TODO: use LRU or CLOCK cache if needed. allowListCache := make(map[uint64]struct{}, maxCapacity) blockListCache := make(map[uint64]struct{}, maxCapacity) diff --git a/dm/tests/binlog_parse/run.sh b/dm/tests/binlog_parse/run.sh index 64451c18099..567b759b927 100755 --- a/dm/tests/binlog_parse/run.sh +++ b/dm/tests/binlog_parse/run.sh @@ -6,6 +6,8 @@ cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +# skip one tale, sync another table +# mariadb10.0 timestamp(3) will panic before dm v6.4.0 function run() { run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT @@ -54,8 +56,6 @@ function run() { echo "check incremental phase 2" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - - read v1 } cleanup_data $TEST_NAME diff --git a/dm/tests/others_integration_1.txt b/dm/tests/others_integration_1.txt index 011a45247f6..9960dc9b814 100644 --- a/dm/tests/others_integration_1.txt +++ b/dm/tests/others_integration_1.txt @@ -9,3 +9,5 @@ sequence_sharding_optimistic sequence_sharding_removemeta gtid only_dml +adjust_gtid +binlog_parse From 8d98b89b15668ab0f81ba823f6f0828f412d40cc Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 21 Nov 2022 14:10:38 +0800 Subject: [PATCH 3/6] fix it --- dm/tests/binlog_parse/run.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/tests/binlog_parse/run.sh b/dm/tests/binlog_parse/run.sh index 567b759b927..1da8497fe8c 100755 --- a/dm/tests/binlog_parse/run.sh +++ b/dm/tests/binlog_parse/run.sh @@ -26,7 +26,7 @@ function run() { "start-task $cur/conf/dm-task.yaml --remove-meta" echo "check full phase" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "validation start test" \ @@ -36,7 +36,7 @@ function run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 echo "check incremental phase" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 run_sql_tidb_with_retry "select count(1) from binlog_parse.t1;" "count(1): 4" @@ -55,7 +55,7 @@ function run() { "\"result\": true" 1 echo "check incremental phase 2" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 } cleanup_data $TEST_NAME From 71ce943ba571a79c37c3c0c6aef0f56f7a494eba Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 22 Nov 2022 14:07:30 +0800 Subject: [PATCH 4/6] fix it --- dm/tests/binlog_parse/conf/diff_config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/binlog_parse/conf/diff_config.toml b/dm/tests/binlog_parse/conf/diff_config.toml index 9424d91bc53..fb53c61d347 100644 --- a/dm/tests/binlog_parse/conf/diff_config.toml +++ b/dm/tests/binlog_parse/conf/diff_config.toml @@ -13,7 +13,7 @@ check-struct-only = false target-instance = "tidb0" - target-check-tables = ["binlog_parse.t?*"] + target-check-tables = ["binlog_parse.t1"] [data-sources] [data-sources.mysql1] From aed1214d666f519cea313e8487b88f996607b3f3 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 23 Nov 2022 17:37:14 +0800 Subject: [PATCH 5/6] address comment --- dm/tests/others_integration_1.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/dm/tests/others_integration_1.txt b/dm/tests/others_integration_1.txt index 9960dc9b814..18e2f414004 100644 --- a/dm/tests/others_integration_1.txt +++ b/dm/tests/others_integration_1.txt @@ -9,5 +9,4 @@ sequence_sharding_optimistic sequence_sharding_removemeta gtid only_dml -adjust_gtid binlog_parse From 9a3e5483bcd230703702fdeb70748711393ef382 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 24 Nov 2022 10:47:33 +0800 Subject: [PATCH 6/6] address comment --- dm/tests/others_integration_1.txt | 1 - dm/tests/others_integration_2.txt | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/others_integration_1.txt b/dm/tests/others_integration_1.txt index 18e2f414004..011a45247f6 100644 --- a/dm/tests/others_integration_1.txt +++ b/dm/tests/others_integration_1.txt @@ -9,4 +9,3 @@ sequence_sharding_optimistic sequence_sharding_removemeta gtid only_dml -binlog_parse diff --git a/dm/tests/others_integration_2.txt b/dm/tests/others_integration_2.txt index ad88593600f..1a008acbfe6 100644 --- a/dm/tests/others_integration_2.txt +++ b/dm/tests/others_integration_2.txt @@ -8,3 +8,4 @@ sql_mode http_proxies openapi duplicate_event +binlog_parse