Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: support multiple ddls in single sharding part #177

Merged
merged 40 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1f54bf8
syncer: support multiple ddls in single sharding part
amyangfei Jun 11, 2019
f1f3cac
add unit test to shard meta
amyangfei Jun 18, 2019
277fb02
refine test cases
amyangfei Jun 18, 2019
49ce3d6
support interrupt in sequence sharding
amyangfei Jun 18, 2019
c4a4075
refine check_safe_mode in integration test
amyangfei Jun 18, 2019
49675e9
add more unit test
amyangfei Jun 19, 2019
f24226e
remove some unused code
amyangfei Jun 19, 2019
90a87ce
Merge branch 'master' into sharding-ddl-refactor
amyangfei Jun 20, 2019
d3f6a2c
Merge branch 'master' into sharding-ddl-refactor
amyangfei Jun 21, 2019
aebc41d
Update syncer/checkpoint.go
amyangfei Jun 22, 2019
58595a4
address comment
amyangfei Jun 22, 2019
930977c
typo fix
amyangfei Jun 22, 2019
d1329c4
remove resync streamer
amyangfei Jun 22, 2019
60442fb
check shardingReSync before fetch from channel
amyangfei Jun 22, 2019
e02b5f5
address comment
amyangfei Jun 24, 2019
c8948af
address comment
amyangfei Jun 24, 2019
790059a
add test case with multiple ddls in a single QueryEvent
amyangfei Jun 24, 2019
fce6b42
refine some variable names
amyangfei Jun 24, 2019
d96d55b
address comment, refine some variable name
amyangfei Jun 24, 2019
ff08e6c
fix unit test
amyangfei Jun 24, 2019
151af21
address comment, remove sub task config from ShardingGroup
amyangfei Jun 24, 2019
865053c
fix ci
amyangfei Jun 24, 2019
860c5e0
address comment
amyangfei Jun 24, 2019
430d917
address comment
amyangfei Jun 24, 2019
7ea6d2c
add create/drop table restriction during sequence sharding
amyangfei Jun 24, 2019
cbdaff5
address comment
amyangfei Jun 25, 2019
1e2e959
refine error handling in closeShardingResync
amyangfei Jun 25, 2019
698f370
Merge branch 'master' into sharding-ddl-refactor
amyangfei Jun 26, 2019
f174066
Merge branch 'master' into sharding-ddl-refactor
amyangfei Jun 27, 2019
2ff99b5
address comment, re-construct code
amyangfei Jun 27, 2019
a00df3f
remove useless script
amyangfei Jun 27, 2019
7166f23
Merge branch 'master' into sharding-ddl-refactor
amyangfei Jun 28, 2019
89de140
address comments
amyangfei Jun 28, 2019
4bcaa5e
address comment
amyangfei Jun 28, 2019
369283b
refine sharding syncing check for rows event
amyangfei Jun 28, 2019
28d78b0
fix go check
amyangfei Jun 28, 2019
452be69
fix insync check
amyangfei Jun 28, 2019
6cad815
address comment
amyangfei Jun 28, 2019
70b942e
refine checkpoint save time
amyangfei Jun 28, 2019
eda8730
make a copy from binlog position
amyangfei Jun 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,12 @@ type CheckPoint interface {
// corresponding to Meta.Save
SaveGlobalPoint(pos mysql.Position)

// FlushGlobalPointsExcept flushes the global checkpoint and tables' checkpoints except exceptTables
// FlushGlobalPointsExcept flushes the global checkpoint and tables'
// checkpoints except exceptTables, it also flushes SQLs with Args providing
// by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only.
// @exceptTables: [[schema, table]... ]
// corresponding to Meta.Flush
FlushPointsExcept(exceptTables [][]string) error
FlushPointsExcept(exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error

// GlobalPoint returns the global binlog stream's checkpoint
// corresponding to to Meta.Pos
Expand Down Expand Up @@ -180,7 +182,7 @@ type RemoteCheckPoint struct {
db *Conn
schema string // schema name, set through task config
table string // table name, now it's task name
id string // checkpoint ID, now it is `server-id` used as MySQL slave
id string // checkpoint ID, now it is `source-id`

// source-schema -> source-table -> checkpoint
// used to filter the synced binlog when re-syncing for sharding group
Expand Down Expand Up @@ -330,7 +332,7 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position) {
}

// FlushPointsExcept implements CheckPoint.FlushPointsExcept
func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string) error {
func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error {
cp.RLock()
defer cp.RUnlock()

Expand Down Expand Up @@ -373,9 +375,12 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string) error {

points = append(points, point)
}

}
}
for i := range extraSQLs {
sqls = append(sqls, extraSQLs[i])
args = append(args, extraArgs[i])
}

err := cp.db.executeSQL(sqls, args, maxRetryCount)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectBegin()
s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, true, pos1.Name, pos1.Pos).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept(nil)
err = cp.FlushPointsExcept(nil, nil, nil)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint(), Equals, pos1)
c.Assert(cp.FlushedGlobalPoint(), Equals, pos1)
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectBegin()
s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, true, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept(nil)
err = cp.FlushPointsExcept(nil, nil, nil)
c.Assert(err, IsNil)
cp.Rollback()
c.Assert(cp.GlobalPoint(), Equals, pos2)
Expand Down Expand Up @@ -267,7 +267,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectBegin()
s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, schema, table, pos2.Name, pos2.Pos, false, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept(nil)
err = cp.FlushPointsExcept(nil, nil, nil)
c.Assert(err, IsNil)
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectBegin()
s.mock.ExpectExec(flushCheckPointSQL).WithArgs(cpid, "", "", pos2.Name, pos2.Pos, true, pos2.Name, pos2.Pos).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept([][]string{{schema, table}})
err = cp.FlushPointsExcept([][]string{{schema, table}}, nil, nil)
c.Assert(err, IsNil)
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
Expand Down
256 changes: 256 additions & 0 deletions syncer/sharding-meta/shardmeta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Copyright 2019 PingCAP, Inc.
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package shardmeta

import (
"encoding/json"
"fmt"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

const (
// MetaTableFormat is used in meta table name constructor
MetaTableFormat = "%s_syncer_sharding_meta"
)

// DDLItem records ddl information used in sharding sequence organization
type DDLItem struct {
FirstPos mysql.Position `json:"first-pos"` // first DDL's binlog Pos, not the End_log_pos of the event
DDLs []string `json:"ddls"` // DDLs, these ddls are in the same QueryEvent
Source string `json:"source"` // source table ID
}

// NewDDLItem creates a new DDLItem
func NewDDLItem(pos mysql.Position, ddls []string, source string) *DDLItem {
return &DDLItem{
FirstPos: pos,
DDLs: ddls,
Source: source,
}
}

// String returns the item's format string value
func (item *DDLItem) String() string {
return fmt.Sprintf("first-pos: %s ddls: %+v source: %s", item.FirstPos, item.DDLs, item.Source)
}

// ShardingSequence records a list of DDLItem
type ShardingSequence struct {
Items []*DDLItem `json:"items"`
}

// IsPrefixSequence checks whether a ShardingSequence is the prefix sequence of other.
func (seq *ShardingSequence) IsPrefixSequence(other *ShardingSequence) bool {
if len(seq.Items) > len(other.Items) {
return false
}
for idx := range seq.Items {
if !utils.CompareShardingDDLs(seq.Items[idx].DDLs, other.Items[idx].DDLs) {
return false
}
}
return true
}

// String returns the ShardingSequence's json string
func (seq *ShardingSequence) String() string {
jsonSeq, err := json.Marshal(seq.Items)
if err != nil {
log.Errorf("marshal ShardingSequence to json error %v", err)
}
return string(jsonSeq)
}

// ShardingMeta stores sharding ddl sequence
// including global sequence and each source's own sequence
// NOTE: sharding meta is not thread safe, it must be used in thread safe context
type ShardingMeta struct {
activeIdx int // the first unsynced DDL index
global *ShardingSequence // merged sharding sequence of all source tables
sources map[string]*ShardingSequence // source table ID -> its sharding sequence
schema string // schema name in downstream meta db
table string // table name used in downstream meta db
}

// NewShardingMeta creates a new ShardingMeta
func NewShardingMeta(schema, table string) *ShardingMeta {
return &ShardingMeta{
schema: schema,
table: table,
global: &ShardingSequence{Items: make([]*DDLItem, 0)},
sources: make(map[string]*ShardingSequence),
}
}

// RestoreFromData restores ShardingMeta from given data
func (meta *ShardingMeta) RestoreFromData(sourceTableID string, activeIdx int, isGlobal bool, data []byte) error {
items := make([]*DDLItem, 0)
err := json.Unmarshal(data, &items)
if err != nil {
return errors.Trace(err)
}
if isGlobal {
meta.global = &ShardingSequence{Items: items}
} else {
meta.sources[sourceTableID] = &ShardingSequence{Items: items}
}
meta.activeIdx = activeIdx
return nil
}

// ActiveIdx returns the activeIdx of sharding meta
func (meta *ShardingMeta) ActiveIdx() int {
return meta.activeIdx
}

func (meta *ShardingMeta) reinitialize() {
meta.activeIdx = 0
meta.global = &ShardingSequence{make([]*DDLItem, 0)}
meta.sources = make(map[string]*ShardingSequence)
}

// checkItemExists checks whether DDLItem exists in its source sequence
// if exists, return the index of DDLItem in source sequence.
// if not exists, return the next index in source sequence.
func (meta *ShardingMeta) checkItemExists(item *DDLItem) (int, bool) {
source, ok := meta.sources[item.Source]
if !ok {
return 0, false
}
for idx, ddlItem := range source.Items {
if item.FirstPos.Compare(ddlItem.FirstPos) == 0 {
return idx, true
}
}
return len(source.Items), false
}

// AddItem adds a new comming DDLItem into ShardingMeta
// 1. if DDLItem already exists in source sequence, check whether it is active DDL only
// 2. add the DDLItem into its related source sequence
// 3. if it is a new DDL in global sequence, add it into global sequence
// 4. check the source sequence is the prefix-sequence of global sequence, if not, return an error
// returns:
// active: whether the DDL will be processed in this round
func (meta *ShardingMeta) AddItem(item *DDLItem) (active bool, err error) {
index, exists := meta.checkItemExists(item)
if exists {
return index == meta.activeIdx, nil
}

if source, ok := meta.sources[item.Source]; !ok {
meta.sources[item.Source] = &ShardingSequence{Items: []*DDLItem{item}}
} else {
source.Items = append(source.Items, item)
}

found := false
for _, globalItem := range meta.global.Items {
if utils.CompareShardingDDLs(item.DDLs, globalItem.DDLs) {
found = true
break
}
}
if !found {
meta.global.Items = append(meta.global.Items, item)
}

global, source := meta.global, meta.sources[item.Source]
if !source.IsPrefixSequence(global) {
return false, errors.Errorf("detect inconsistent DDL sequence from source %+v, right DDL sequence should be %+v", source.Items, global.Items)
}

return index == meta.activeIdx, nil
}

// GetGlobalActiveDDL returns activeDDL in global sequence
func (meta *ShardingMeta) GetGlobalActiveDDL() *DDLItem {
if meta.activeIdx < len(meta.global.Items) {
return meta.global.Items[meta.activeIdx]
}
return nil
}

// GetGlobalItems returns global DDLItems
func (meta *ShardingMeta) GetGlobalItems() []*DDLItem {
return meta.global.Items
}

// GetActiveDDLItem returns the source table's active DDLItem
// if in DDL unsynced procedure, the active DDLItem means the syncing DDL
// if in re-sync procedure, the active DDLItem means the next syncing DDL in DDL syncing sequence, may be nil
func (meta *ShardingMeta) GetActiveDDLItem(tableSource string) *DDLItem {
source, ok := meta.sources[tableSource]
if !ok {
return nil
}
if meta.activeIdx < len(source.Items) {
return source.Items[meta.activeIdx]
}
return nil
}

// InSequenceSharding returns whether in sequence sharding
func (meta *ShardingMeta) InSequenceSharding() bool {
globalItemCount := len(meta.global.Items)
return globalItemCount > 0 && meta.activeIdx < globalItemCount
}

// ResolveShardingDDL resolves one sharding DDL and increase activeIdx
// if activeIdx equals to the length of global sharding sequence, it means all
// sharding DDL in this ShardingMeta sequence is resolved and will reinitialize
// the ShardingMeta, return true if all DDLs are resolved.
func (meta *ShardingMeta) ResolveShardingDDL() bool {
meta.activeIdx++
if meta.activeIdx == len(meta.global.Items) {
meta.reinitialize()
return true
}
return false
}

// ActiveDDLFirstPos returns the first binlog position of active DDL
func (meta *ShardingMeta) ActiveDDLFirstPos() (mysql.Position, error) {
if meta.activeIdx >= len(meta.global.Items) {
return mysql.Position{}, errors.Errorf("activeIdx %d larger than length of global DDLItems: %v", meta.activeIdx, meta.global.Items)
}
return meta.global.Items[meta.activeIdx].FirstPos, nil
}

// FlushData returns sharding meta flush SQL and args
func (meta *ShardingMeta) FlushData(sourceID, tableID string) ([]string, [][]interface{}) {
if len(meta.global.Items) == 0 {
sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` where source_id=? and target_table_id=?", meta.schema, meta.table)
args2 := []interface{}{sourceID, tableID}
return []string{sql2}, [][]interface{}{args2}
}
var (
sqls = make([]string, 1+len(meta.sources))
args = make([][]interface{}, 0, 1+len(meta.sources))
baseSQL = fmt.Sprintf("INSERT INTO `%s`.`%s` (`source_id`, `target_table_id`, `source_table_id`, `active_index`, `is_global`, `data`) VALUES(?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `data`=?, `active_index`=?", meta.schema, meta.table)
)
for i := range sqls {
sqls[i] = baseSQL
}
args = append(args, []interface{}{sourceID, tableID, "", meta.activeIdx, true, meta.global.String(), meta.global.String(), meta.activeIdx})
for source, seq := range meta.sources {
args = append(args, []interface{}{sourceID, tableID, source, meta.activeIdx, false, seq.String(), seq.String(), meta.activeIdx})
}
return sqls, args
}
Loading