-
Notifications
You must be signed in to change notification settings - Fork 188
syncer: support multiple ddls in single sharding part #177
Conversation
Codecov Report
@@ Coverage Diff @@
## master #177 +/- ##
===============================================
- Coverage 55.9038% 55.439% -0.4648%
===============================================
Files 122 122
Lines 14516 13826 -690
===============================================
- Hits 8115 7665 -450
+ Misses 5591 5389 -202
+ Partials 810 772 -38 |
a814184
to
be7f9d3
Compare
be7f9d3
to
1f54bf8
Compare
/run-all-tests |
/run-all-tests |
PTAL @GregoryIan @csuzhangxc |
syncer/sharding-meta/shardmeta.go
Outdated
return false | ||
} | ||
|
||
// NextShardingDDLFirstPos returns the first binlog position of next sharding DDL in sequence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current or next?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next shard DDL, can be also known as active DDL
syncer/sharding_group.go
Outdated
IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later | ||
|
||
sourceID string // associate dm-worker source ID | ||
schema string // schema name, set through task config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storageSchema
and storageTable
?
@@ -295,6 +303,8 @@ func (sg *ShardingGroup) UnresolvedTables() [][]string { | |||
sg.RLock() | |||
defer sg.RUnlock() | |||
|
|||
// TODO: if we have sharding ddl sequence, and partial ddls synced, we treat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description is not accurate enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, we can't forward the checkpoint of tables?
syncer/sharding_group.go
Outdated
@@ -360,18 +397,26 @@ func UnpackTableID(id string) (string, string) { | |||
type ShardingGroupKeeper struct { | |||
sync.RWMutex | |||
groups map[string]*ShardingGroup // target table ID -> ShardingGroup | |||
cfg *config.SubTaskConfig | |||
|
|||
schema string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
syncer/sharding_group.go
Outdated
@@ -412,7 +473,9 @@ func (k *ShardingGroupKeeper) ResetGroups() { | |||
k.RLock() | |||
defer k.RUnlock() | |||
for _, group := range k.groups { | |||
group.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put group.Lock
into function of group
?
sg.sources[source] = false | ||
} | ||
} | ||
|
||
return isResolving, sg.remain <= 0, sg.remain, nil | ||
return false, sg.remain <= 0, sg.remain, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we support replicating CREATE TABLE
statement for a fresh group now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is fresh group
meant for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the group not exists before the "create table" statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we support it before this pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic doesn't change, if create table
in a new sharding group, just create a new one directly
Lines 419 to 423 in 2ff99b5
if schemaGroup, ok := k.groups[schemaID]; !ok { | |
k.groups[schemaID] = NewShardingGroup(k.cfg.SourceID, k.shardMetaSchema, k.shardMetaTable, sourceIDs, meta, true) | |
} else { | |
schemaGroup.Merge(sourceIDs) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the false (needShardingHandle) in this return will ignore the statement? then no table will be created in the downstream, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not test, if so, the old code has this bug too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems strange for no tables in one shard. For example
- have two upstreams, each upstream has no shard tables, there exists no target table in downstream too. Then if one of upstream creates one shard table and applies sharding handling, send DDLInfo to DM-master. But DM-master has to wait for the other DM-master for this create table DDL. It seems we can't use common shard way for
create table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
en, seems a bug 😢
646c787
to
89de140
Compare
58d8435
to
452be69
Compare
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/run-all-tests |
2 similar comments
/run-all-tests |
/run-all-tests |
What problem does this PR solve?
support running sharding DDL in sequence in each sharding part, such as following
What is changed and how it works?
Firstly we introduce some new concepts
sequence sharding
: in a shard DDL scenario, each sharding has multiple DDLs in the same sequence. such as described in the above sectionactive DDL
: in sequence sharding, we can separate shard DDL into two groups based on whether this DDL has been synchronized to downstream. the DDL in the un-synced group with the smallest binlog position is the currentactive DDL
.active index
: we sort the shard DDL by binlog position in ascending order, the array index ofactive DDL
is theactive index
.ShardingMeta
: storage for sequence sharding information, each sharding group has oneShardingMeta
and eachShardingMeta
belongs to a sharding group (one-to-one correspondence).ShardingMeta
stores theactive index
, global sequence shard DDL and sequence shard DDL for each upstream source table.Secondly let's see the synchronization strategy
we have two sharding stage, the same as the original design:
global
stream, short forun-sync stage
.sharding re-sync
stream, short forre-sync stage
.for DML
for DDL
ShardingMeta
and checks whether it isactive DDL
with the following steps:active DDL
onlyafter adding DDL to
ShardingMeta
, we have the old fashion to processsharding group
Next we will focus on error handling
ShardingMeta
is stored in memory, and can be persisted into downstream TiDB. The persistent trigger time is the same as flushing checkpoint, we putShardingMeta
persistent SQL and checkpoint update into the same transaction. With this mechanism, we can achieve the following goals:ShardingMeta
persistent. after DM-worker/sync-unit restarts,ShardingMeta
will be re-construct from meta DB.ShardingMeta
stored in downstream meta DB is updated in the last sharding round, and the active index remains the currentactive index
.activeIdx
ofShardingMeta
will move forward to the next one and persist to downstream TiDB. after sync-unit restartsShardingMeta
will be re-construct from meta DB.What is not support
TODO:
sharding_group.go
Check List
Tests
Code changes
Side effects
Related changes