Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
shardddl: support combine ddls in one sql for optimistic sharding mode (
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Mar 8, 2021
1 parent cd9e901 commit 864aaa4
Show file tree
Hide file tree
Showing 17 changed files with 572 additions and 215 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ ErrMasterAdvertisePeerURLsNotValid,[code=38050:class=dm-master:scope=internal:le
ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in master configuration file."
ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "Message: source bound is changed too frequently, last old bound %s:, new bound %s, Workaround: Please try again later"
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
tiBefore = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tiAfter1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
info1 = optimism.NewInfo(taskName, sources[0], "foo-1", "bar-1", schema, table, DDLs1, tiBefore, tiAfter1)
info1 = optimism.NewInfo(taskName, sources[0], "foo-1", "bar-1", schema, table, DDLs1, tiBefore, []*model.TableInfo{tiAfter1})
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, false)
)

Expand Down
32 changes: 16 additions & 16 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,13 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`)
ti3 = ti1
i11 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, ti1)
i12 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs1, ti0, ti1)
i21 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs2, ti1, ti2)
i11 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i12 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2})
i23 = optimism.NewInfo(task, source2, "foo-2", "bar-3", downSchema, downTable,
[]string{`CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`}, ti2, ti2)
i31 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs3, ti2, ti3)
i33 = optimism.NewInfo(task, source2, "foo-2", "bar-3", downSchema, downTable, DDLs3, ti2, ti3)
[]string{`CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`}, ti2, []*model.TableInfo{ti2})
i31 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs3, ti2, []*model.TableInfo{ti3})
i33 = optimism.NewInfo(task, source2, "foo-2", "bar-3", downSchema, downTable, DDLs3, ti2, []*model.TableInfo{ti3})
)

st1.AddTable("foo", "bar-1", downSchema, downTable)
Expand Down Expand Up @@ -661,9 +661,9 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 DATETIME)`)
ti3 = ti0
i1 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, ti1)
i2 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs2, ti0, ti2)
i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs3, ti2, ti3)
i1 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i2 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2})
i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs3, ti2, []*model.TableInfo{ti3})
)

st1.AddTable("foo", "bar-1", downSchema, downTable)
Expand Down Expand Up @@ -756,10 +756,10 @@ func (t *testOptimist) TestOptimistLockMultipleTarget(c *C) {
DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"}
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
i11 = optimism.NewInfo(task, source, upSchema, upTables[0], downSchema, downTable1, DDLs, ti0, ti1)
i12 = optimism.NewInfo(task, source, upSchema, upTables[1], downSchema, downTable1, DDLs, ti0, ti1)
i21 = optimism.NewInfo(task, source, upSchema, upTables[2], downSchema, downTable2, DDLs, ti0, ti1)
i22 = optimism.NewInfo(task, source, upSchema, upTables[3], downSchema, downTable2, DDLs, ti0, ti1)
i11 = optimism.NewInfo(task, source, upSchema, upTables[0], downSchema, downTable1, DDLs, ti0, []*model.TableInfo{ti1})
i12 = optimism.NewInfo(task, source, upSchema, upTables[1], downSchema, downTable1, DDLs, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, source, upSchema, upTables[2], downSchema, downTable2, DDLs, ti0, []*model.TableInfo{ti1})
i22 = optimism.NewInfo(task, source, upSchema, upTables[3], downSchema, downTable2, DDLs, ti0, []*model.TableInfo{ti1})
)

sts.AddTable(upSchema, upTables[0], downSchema, downTable1)
Expand Down Expand Up @@ -941,9 +941,9 @@ func (t *testOptimist) TestOptimistInitSchema(c *C) {
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 INT)`)
i11 = optimism.NewInfo(task, source, upSchema, upTables[0], downSchema, downTable, DDLs1, ti0, ti1)
i12 = optimism.NewInfo(task, source, upSchema, upTables[1], downSchema, downTable, DDLs1, ti0, ti1)
i21 = optimism.NewInfo(task, source, upSchema, upTables[0], downSchema, downTable, DDLs2, ti1, ti2)
i11 = optimism.NewInfo(task, source, upSchema, upTables[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i12 = optimism.NewInfo(task, source, upSchema, upTables[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, source, upSchema, upTables[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2})
)

st.AddTable(upSchema, upTables[0], downSchema, downTable)
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,12 @@ description = ""
workaround = "Please confirm that you have not violated any restrictions in the upgrade documentation."
tags = ["internal", "high"]

[error.DM-dm-master-38054]
message = "inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-worker-40001]
message = "parse dm-worker config flag set"
description = ""
Expand Down
6 changes: 3 additions & 3 deletions pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee
grantAndPutKV := func(k, v string, ttl int64) (clientv3.LeaseID, error) {
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
lease, err := cli.Grant(cliCtx, ttl)
if err != nil {
return 0, err
lease, err2 := cli.Grant(cliCtx, ttl)
if err2 != nil {
return 0, err2
}
_, err = cli.Put(cliCtx, k, v, clientv3.WithLease(lease.ID))
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type Info struct {
DownTable string `json:"down-table"` // downstream/target table name
DDLs []string `json:"ddls"` // DDL statements

TableInfoBefore *model.TableInfo `json:"table-info-before"` // the tracked table schema before applying the DDLs
TableInfoAfter *model.TableInfo `json:"table-info-after"` // the tracked table schema after applying the DDLs
TableInfoBefore *model.TableInfo `json:"table-info-before"` // the tracked table schema before applying the DDLs
TableInfosAfter []*model.TableInfo `json:"table-info-after"` // the tracked table schema after applying the DDLs

// only used to report to the caller of the watcher, do not marsh it.
// if it's true, it means the Info has been deleted in etcd.
Expand All @@ -57,7 +57,7 @@ type Info struct {

// NewInfo creates a new Info instance.
func NewInfo(task, source, upSchema, upTable, downSchema, downTable string,
DDLs []string, tableInfoBefore, tableInfoAfter *model.TableInfo) Info {
DDLs []string, tableInfoBefore *model.TableInfo, tableInfosAfter []*model.TableInfo) Info {
return Info{
Task: task,
Source: source,
Expand All @@ -67,7 +67,7 @@ func NewInfo(task, source, upSchema, upTable, downSchema, downTable string,
DownTable: downTable,
DDLs: DDLs,
TableInfoBefore: tableInfoBefore,
TableInfoAfter: tableInfoAfter,
TableInfosAfter: tableInfosAfter,
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
tblI2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)
tblI3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`)
tblI4 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT, c3 INT)`)
i11 = NewInfo(task1, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c1 INT"}, tblI1, tblI2)
i12 = NewInfo(task1, source2, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c2 INT"}, tblI2, tblI3)
i21 = NewInfo(task2, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c3 INT"}, tblI3, tblI4)
i11 = NewInfo(task1, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c1 INT"}, tblI1, []*model.TableInfo{tblI2})
i12 = NewInfo(task1, source2, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c2 INT"}, tblI2, []*model.TableInfo{tblI3})
i21 = NewInfo(task2, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c3 INT"}, tblI3, []*model.TableInfo{tblI4})
)

// put the same key twice.
Expand Down
2 changes: 1 addition & 1 deletion pkg/shardddl/optimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, e
l = lk.locks[lockID]
}

newDDLs, err := l.TrySync(info.Source, info.UpSchema, info.UpTable, info.DDLs, info.TableInfoAfter, tts, info.Version)
newDDLs, err := l.TrySync(info.Source, info.UpSchema, info.UpTable, info.DDLs, info.TableInfosAfter, tts, info.Version)
return lockID, newDDLs, err
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/shardddl/optimism/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package optimism
import (
. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/util/mock"
)

Expand All @@ -42,9 +43,9 @@ func (t *testKeeper) TestLockKeeper(c *C) {
tiBefore = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tiAfter = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)

i11 = NewInfo(task1, source1, upSchema, upTable, downSchema, downTable, DDLs, tiBefore, tiAfter)
i12 = NewInfo(task1, source2, upSchema, upTable, downSchema, downTable, DDLs, tiBefore, tiAfter)
i21 = NewInfo(task2, source1, upSchema, upTable, downSchema, downTable, DDLs, tiBefore, tiAfter)
i11 = NewInfo(task1, source1, upSchema, upTable, downSchema, downTable, DDLs, tiBefore, []*model.TableInfo{tiAfter})
i12 = NewInfo(task1, source2, upSchema, upTable, downSchema, downTable, DDLs, tiBefore, []*model.TableInfo{tiAfter})
i21 = NewInfo(task2, source1, upSchema, upTable, downSchema, downTable, DDLs, tiBefore, []*model.TableInfo{tiAfter})

tts1 = []TargetTable{
newTargetTable(task1, source1, downSchema, downTable, map[string]map[string]struct{}{upSchema: {upTable: struct{}{}}}),
Expand Down Expand Up @@ -131,10 +132,10 @@ func (t *testKeeper) TestLockKeeperMultipleTarget(c *C) {
tiBefore = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tiAfter = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)

i11 = NewInfo(task, source, upSchema, upTables[0], downSchema, downTable1, DDLs, tiBefore, tiAfter)
i12 = NewInfo(task, source, upSchema, upTables[1], downSchema, downTable1, DDLs, tiBefore, tiAfter)
i21 = NewInfo(task, source, upSchema, upTables[0], downSchema, downTable2, DDLs, tiBefore, tiAfter)
i22 = NewInfo(task, source, upSchema, upTables[1], downSchema, downTable2, DDLs, tiBefore, tiAfter)
i11 = NewInfo(task, source, upSchema, upTables[0], downSchema, downTable1, DDLs, tiBefore, []*model.TableInfo{tiAfter})
i12 = NewInfo(task, source, upSchema, upTables[1], downSchema, downTable1, DDLs, tiBefore, []*model.TableInfo{tiAfter})
i21 = NewInfo(task, source, upSchema, upTables[0], downSchema, downTable2, DDLs, tiBefore, []*model.TableInfo{tiAfter})
i22 = NewInfo(task, source, upSchema, upTables[1], downSchema, downTable2, DDLs, tiBefore, []*model.TableInfo{tiAfter})

tts1 = []TargetTable{
newTargetTable(task, source, downSchema, downTable1, map[string]map[string]struct{}{
Expand Down
Loading

0 comments on commit 864aaa4

Please sign in to comment.