From 43956d333d1a77b78d77e318f1663ba2926075f1 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 20 Aug 2019 19:11:37 +0800 Subject: [PATCH 1/4] change default cache count to workcount*batchsize --- drainer/config.go | 33 ++++++++++++++++----------------- drainer/syncer.go | 2 +- drainer/syncer_test.go | 3 ++- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/drainer/config.go b/drainer/config.go index b90251381..43f0ad215 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -51,28 +51,27 @@ const ( ) var ( - maxBinlogItemCount int - defaultBinlogItemCount = 16 << 12 supportedCompressors = [...]string{"gzip"} newZKFromConnectionString = zk.NewFromConnectionString ) // SyncerConfig is the Syncer's configuration. type SyncerConfig struct { - StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` - SQLMode mysql.SQLMode `toml:"-" json:"-"` - IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"` - IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` - IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` - TxnBatch int `toml:"txn-batch" json:"txn-batch"` - WorkerCount int `toml:"worker-count" json:"worker-count"` - To *dsync.DBConfig `toml:"to" json:"to"` - DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` - DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` - DestDBType string `toml:"db-type" json:"db-type"` - DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"` - SafeMode bool `toml:"safe-mode" json:"safe-mode"` - DisableCausality bool `toml:"disable-detect" json:"disable-detect"` + StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` + SQLMode mysql.SQLMode `toml:"-" json:"-"` + IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"` + IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` + IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` + TxnBatch int `toml:"txn-batch" json:"txn-batch"` + MaxCacheBinlogCount int `toml:"cache-binlog-count" json:"cache-binlog-count"` + WorkerCount int `toml:"worker-count" json:"worker-count"` + To *dsync.DBConfig `toml:"to" json:"to"` + DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` + DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` + DestDBType string `toml:"db-type" json:"db-type"` + DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"` + SafeMode bool `toml:"safe-mode" json:"safe-mode"` + DisableCausality bool `toml:"disable-detect" json:"disable-detect"` } // Config holds the configuration of drainer @@ -132,7 +131,7 @@ func NewConfig() *Config { fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless") fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(&cfg.SyncerCfg.DisableCausality, "disable-detect", false, "disable detect causality") - fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size") + fs.IntVar(&cfg.SyncerCfg.MaxCacheBinlogCount, "cache-binlog-count", cfg.SyncerCfg.WorkerCount*cfg.SyncerCfg.TxnBatch, "blurry count of binlogs in cache, limit cache size (default workerCount * batchSize)") fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced") fs.StringVar(new(string), "log-rotate", "", "DEPRECATED") diff --git a/drainer/syncer.go b/drainer/syncer.go index fe744b985..b00b51857 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -60,7 +60,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( syncer := new(Syncer) syncer.cfg = cfg syncer.cp = cp - syncer.input = make(chan *binlogItem, maxBinlogItemCount) + syncer.input = make(chan *binlogItem, cfg.MaxCacheBinlogCount) syncer.lastSyncTime = time.Now() syncer.shutdown = make(chan struct{}) syncer.closed = make(chan struct{}) diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index e9557dfc6..ebeaf05c1 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -60,7 +60,8 @@ func (s *syncerSuite) TestFilterTable(c *check.C) { func (s *syncerSuite) TestNewSyncer(c *check.C) { cfg := &SyncerConfig{ - DestDBType: "_intercept", + DestDBType: "_intercept", + MaxCacheBinlogCount: 320, } cpFile := c.MkDir() + "/checkpoint" From 2d93f29210b6d4d54084a1b14b1e679487fa0fad Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 23 Aug 2019 10:18:10 +0800 Subject: [PATCH 2/4] Revert "change default cache count to workcount*batchsize" This reverts commit 43956d333d1a77b78d77e318f1663ba2926075f1. --- drainer/config.go | 33 +++++++++++++++++---------------- drainer/syncer.go | 2 +- drainer/syncer_test.go | 3 +-- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/drainer/config.go b/drainer/config.go index 43f0ad215..b90251381 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -51,27 +51,28 @@ const ( ) var ( + maxBinlogItemCount int + defaultBinlogItemCount = 16 << 12 supportedCompressors = [...]string{"gzip"} newZKFromConnectionString = zk.NewFromConnectionString ) // SyncerConfig is the Syncer's configuration. type SyncerConfig struct { - StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` - SQLMode mysql.SQLMode `toml:"-" json:"-"` - IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"` - IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` - IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` - TxnBatch int `toml:"txn-batch" json:"txn-batch"` - MaxCacheBinlogCount int `toml:"cache-binlog-count" json:"cache-binlog-count"` - WorkerCount int `toml:"worker-count" json:"worker-count"` - To *dsync.DBConfig `toml:"to" json:"to"` - DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` - DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` - DestDBType string `toml:"db-type" json:"db-type"` - DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"` - SafeMode bool `toml:"safe-mode" json:"safe-mode"` - DisableCausality bool `toml:"disable-detect" json:"disable-detect"` + StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` + SQLMode mysql.SQLMode `toml:"-" json:"-"` + IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"` + IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` + IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` + TxnBatch int `toml:"txn-batch" json:"txn-batch"` + WorkerCount int `toml:"worker-count" json:"worker-count"` + To *dsync.DBConfig `toml:"to" json:"to"` + DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` + DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` + DestDBType string `toml:"db-type" json:"db-type"` + DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"` + SafeMode bool `toml:"safe-mode" json:"safe-mode"` + DisableCausality bool `toml:"disable-detect" json:"disable-detect"` } // Config holds the configuration of drainer @@ -131,7 +132,7 @@ func NewConfig() *Config { fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless") fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(&cfg.SyncerCfg.DisableCausality, "disable-detect", false, "disable detect causality") - fs.IntVar(&cfg.SyncerCfg.MaxCacheBinlogCount, "cache-binlog-count", cfg.SyncerCfg.WorkerCount*cfg.SyncerCfg.TxnBatch, "blurry count of binlogs in cache, limit cache size (default workerCount * batchSize)") + fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size") fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced") fs.StringVar(new(string), "log-rotate", "", "DEPRECATED") diff --git a/drainer/syncer.go b/drainer/syncer.go index b00b51857..fe744b985 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -60,7 +60,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) ( syncer := new(Syncer) syncer.cfg = cfg syncer.cp = cp - syncer.input = make(chan *binlogItem, cfg.MaxCacheBinlogCount) + syncer.input = make(chan *binlogItem, maxBinlogItemCount) syncer.lastSyncTime = time.Now() syncer.shutdown = make(chan struct{}) syncer.closed = make(chan struct{}) diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index ebeaf05c1..e9557dfc6 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -60,8 +60,7 @@ func (s *syncerSuite) TestFilterTable(c *check.C) { func (s *syncerSuite) TestNewSyncer(c *check.C) { cfg := &SyncerConfig{ - DestDBType: "_intercept", - MaxCacheBinlogCount: 320, + DestDBType: "_intercept", } cpFile := c.MkDir() + "/checkpoint" From bd5a18e5b45f2ba7bf6d5a8a10d1674b21fbcf0c Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 23 Aug 2019 10:21:40 +0800 Subject: [PATCH 3/4] modify defaultBinlogItemCount from 65536 to 512 --- drainer/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drainer/config.go b/drainer/config.go index b90251381..e57f212d2 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -52,7 +52,7 @@ const ( var ( maxBinlogItemCount int - defaultBinlogItemCount = 16 << 12 + defaultBinlogItemCount = 1 << 9 supportedCompressors = [...]string{"gzip"} newZKFromConnectionString = zk.NewFromConnectionString ) From 7bf56763a25cfc9e7df588a51cb3c33cf691279b Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 23 Aug 2019 10:59:22 +0800 Subject: [PATCH 4/4] change 1<<9 to 512 --- drainer/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drainer/config.go b/drainer/config.go index e57f212d2..4a3ea29da 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -52,7 +52,7 @@ const ( var ( maxBinlogItemCount int - defaultBinlogItemCount = 1 << 9 + defaultBinlogItemCount = 512 supportedCompressors = [...]string{"gzip"} newZKFromConnectionString = zk.NewFromConnectionString )