-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support two clusters of bidirectional replication. #864
Conversation
1.surport ddl one-way synchronization 2.surport dml Mutual synchronization 3.surport cluster A Mutual synchronization B,cluster B Mutual synchronization C,cluster C binlog can sync to A or not sync to A by set channel_id
modify mark table name
1.add create mark table logic
1.fix bug
Cool!Great Work, thanks @freemindLi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks,
pls address the comment about static lint.
you can run make check
local.
pls sign the CLA.
pkg/loader/load.go
Outdated
} | ||
} | ||
return dmls | ||
} | ||
func (s *loaderImpl) execDMLs(dmls []*DML) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note this dmls
IS NOT the origin txn.
loaderImpl will split the origin txn of TiDB and exec them concurrently.
after splitting some part of dmls will not contains the mark table.
we can check and drop txn before
Line 366 in 0dd2e28
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite}) |
pkg/loader/load.go
Outdated
@@ -595,6 +668,11 @@ func (b *batchManager) execAccumulatedDMLs() (err error) { | |||
} | |||
|
|||
func (b *batchManager) execDDL(txn *Txn) error { | |||
|
|||
if !b.ddlSync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we handle sync or not sync outside loader in draienr/syncer.go
directly?
pkg/loader/executor.go
Outdated
return sync | ||
} | ||
|
||
func (e *executor) setsyncInfo(sync *syncInfo) *executor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems that don't need to return executor
drainer/config.go
Outdated
@@ -129,6 +132,9 @@ func NewConfig() *Config { | |||
fs.Int64Var(&cfg.InitialCommitTS, "initial-commit-ts", -1, "if drainer donesn't have checkpoint, use initial commitTS to initial checkpoint, will get a latest timestamp from pd if setting to be -1") | |||
fs.StringVar(&cfg.Compressor, "compressor", "", "use the specified compressor to compress payload between pump and drainer, only 'gzip' is supported now (default \"\", ie. compression disabled.)") | |||
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") | |||
fs.BoolVar(&cfg.SyncerCfg.MarkStatus, "mark-status", false, "set mark or not ") | |||
fs.BoolVar(&cfg.SyncerCfg.DdlSync, "ddl-sync", false, "sync ddl or not") | |||
fs.Int64Var(&cfg.SyncerCfg.ChannelId, "channel-id", 0, "sync channel id ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about adding more details about these three config
pkg/loader/model.go
Outdated
@@ -184,6 +184,23 @@ func (dml *DML) updateSQL() (sql string, args []interface{}) { | |||
return | |||
} | |||
|
|||
func updateMarkSQL(columns []string, Values map[string]interface{}) (string, []interface{}) { | |||
|
|||
//sql := fmt.Sprintf("REPLACE INTO %s(%s) VALUES(%s)",MarkTableName, buildColumnList(columns),holderString(len(columns))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can delete this line
@@ -65,6 +65,9 @@ type SyncerConfig struct { | |||
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` | |||
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` | |||
TxnBatch int `toml:"txn-batch" json:"txn-batch"` | |||
MarkStatus bool `toml:"mark-status" json:"mark-status"` | |||
DdlSync bool `toml:"ddl-sync" json:"ddl-sync"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about use https://github.com/pingcap/tidb-tools/blob/06b27bff7c2e29aac542b2e7197a52ae96221142/pkg/binlog-filter/filter.go#L85 to support more function? it is not necessary, we can do this later
@freemindLi #867 will allow when the column number of downstream table mismatch with current schema. you can help take a look too. |
What problem does this PR solve?
Support two clusters of bidirectional replication.
A <-> B
What is changed and how it works?
Check List
Tests
Code changes
Side effects
Related changes