Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimistic: support start task with inconsistent upstream table schema #3903

Merged
merged 56 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
cf3881c
init logic
GMHDBJD Dec 15, 2021
3022cc5
remove old code
GMHDBJD Dec 16, 2021
d509b0c
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 17, 2021
25c1d59
remove meta when stop task
GMHDBJD Dec 17, 2021
7b0ad14
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 19, 2021
296fca9
fix sequence_sharding_optimistic
GMHDBJD Dec 19, 2021
f9a691f
remove old code
GMHDBJD Dec 19, 2021
48b58b7
fix lint
GMHDBJD Dec 19, 2021
462cc78
fix update ba rule test
GMHDBJD Dec 20, 2021
7034319
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 20, 2021
d61d3fe
fix ut
GMHDBJD Dec 20, 2021
fabbfd5
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 20, 2021
fdfe741
code review
GMHDBJD Dec 20, 2021
a7cce9b
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 20, 2021
6327cbc
fix lint
GMHDBJD Dec 20, 2021
b2e89e3
add DIFFERENT_SCHEMA_FULL_CASE
GMHDBJD Dec 20, 2021
c31815d
fix lint
GMHDBJD Dec 20, 2021
11a6908
add test
GMHDBJD Dec 20, 2021
d6988fd
add DIFFERENT_SCHEMA_INCREMENTAL_CASE
GMHDBJD Dec 20, 2021
9d1921b
add more integration test
GMHDBJD Dec 22, 2021
ff2fa16
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 22, 2021
ac35de9
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Dec 22, 2021
0af4f18
add unit test
GMHDBJD Dec 22, 2021
7c449ac
update fmt
GMHDBJD Dec 22, 2021
ee3284f
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 23, 2021
84d9aeb
code review
GMHDBJD Dec 23, 2021
9fb1d3a
remove downstream when stop task
GMHDBJD Dec 23, 2021
00171da
remove metadata after stop task succeed
GMHDBJD Dec 23, 2021
b05c11b
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Dec 23, 2021
4c3a20e
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Dec 23, 2021
fd2ab4f
fix fmt
GMHDBJD Dec 23, 2021
d179ffb
fix test
GMHDBJD Dec 24, 2021
3e9f372
Merge remote-tracking branch 'upstream/master' into supportInconsiste…
GMHDBJD Dec 24, 2021
54279fc
pre join table for new lock
GMHDBJD Dec 24, 2021
642cf8f
address comment
GMHDBJD Dec 24, 2021
6c8d3ca
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Dec 24, 2021
c013b51
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Dec 27, 2021
8c0443e
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Dec 28, 2021
89964f2
resolve conflict
GMHDBJD Jan 12, 2022
0c6d1bd
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Jan 18, 2022
8e2520e
address comment
GMHDBJD Jan 18, 2022
57cf50f
Merge remote-tracking branch 'upstream/master' into pr/GMHDBJD/3903
GMHDBJD Jan 18, 2022
9c7ba68
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Jan 18, 2022
818a5ed
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Jan 19, 2022
ece233f
address comment
GMHDBJD Jan 19, 2022
b04db38
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Jan 19, 2022
58f25f8
address comment
GMHDBJD Jan 20, 2022
1a8d472
address comment
GMHDBJD Jan 25, 2022
2b09cce
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Jan 25, 2022
7f296c3
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Jan 26, 2022
c9a821c
Update dm/pkg/shardddl/optimism/keeper.go
GMHDBJD Jan 26, 2022
5afac75
address comment
GMHDBJD Jan 26, 2022
8564fdf
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Jan 26, 2022
762fb91
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Feb 7, 2022
96aee90
Merge branch 'master' into supportInconsistentSchema
GMHDBJD Feb 7, 2022
41cf696
address comment
GMHDBJD Feb 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,22 @@ dm_generate_openapi: tools/bin/oapi-codegen
cd dm && ../tools/bin/oapi-codegen --config=openapi/spec/types-gen-cfg.yaml openapi/spec/dm.yaml
cd dm && ../tools/bin/oapi-codegen --config=openapi/spec/client-gen-cfg.yaml openapi/spec/dm.yaml

dm_unit_test: check_failpoint_ctl
define run_dm_unit_test
@echo "running unit test for packages:" $(1)
mkdir -p $(DM_TEST_DIR)
$(FAILPOINT_ENABLE)
@export log_level=error; \
$(GOTEST) -timeout 5m -covermode=atomic -coverprofile="$(DM_TEST_DIR)/cov.unit_test.out" $(DM_PACKAGES) \
$(GOTEST) -timeout 5m -covermode=atomic -coverprofile="$(DM_TEST_DIR)/cov.unit_test.out" $(1) \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)
endef

dm_unit_test: check_failpoint_ctl
$(call run_dm_unit_test,$(DM_PACKAGES))

# run unit test for the specified pkg only, like `make dm_unit_test_pkg PKG=github.com/pingcap/tiflow/dm/dm/master`
dm_unit_test_pkg: check_failpoint_ctl
$(call run_dm_unit_test,$(PKG))

dm_unit_test_in_verify_ci: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml
mkdir -p $(DM_TEST_DIR)
Expand Down
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "M
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v"
ErrMasterOptimisticDownstreamMetaNotFound,[code=38056:class=dm-master:scope=internal:level=high], "Message: downstream database config and meta for task %s not found"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
5 changes: 2 additions & 3 deletions dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ var (
ShardDDLOptimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/operation/")
// ShardDDLOptimismInitSchemaKeyAdapter is used to store the initial schema (before constructed the lock) of merged tables.
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name) -> table schema.
ShardDDLOptimismInitSchemaKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/init-schema/")
// TODO: prune in etcd when upgrade
// ShardDDLOptimismInitSchemaKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/init-schema/")
// ShardDDLOptimismDroppedColumnsKeyAdapter is used to store the columns that are not fully dropped
// k/v: Encode(lock-id, column-name, source-id, upstream-schema-name, upstream-table-name) -> int
// If we don't identify different upstream tables, we may report an error for tb2 in the following case.
Expand Down Expand Up @@ -112,8 +113,6 @@ func keyAdapterKeysLen(s KeyAdapter) int {
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
ShardDDLOptimismSourceTablesKeyAdapter, LoadTaskKeyAdapter, TaskCliArgsKeyAdapter:
return 2
case ShardDDLOptimismInitSchemaKeyAdapter:
return 3
case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter:
return 4
case ShardDDLOptimismDroppedColumnsKeyAdapter:
Expand Down
23 changes: 0 additions & 23 deletions dm/dm/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package common
import (
"net"
"path"
"strings"
"testing"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -61,11 +60,6 @@ func (t *testCommon) TestKeyAdapter(c *C) {
adapter: UpstreamSubTaskKeyAdapter,
want: "/dm-master/upstream/subtask/6d7973716c31/74657374",
},
{
keys: []string{"test", "target_db", "target_table"},
adapter: ShardDDLOptimismInitSchemaKeyAdapter,
want: "/dm-master/shardddl-optimism/init-schema/74657374/7461726765745f6462/7461726765745f7461626c65",
},
{
keys: []string{"test", "mysql_replica_01", "target_db", "target_table"},
adapter: ShardDDLOptimismInfoKeyAdapter,
Expand Down Expand Up @@ -108,11 +102,6 @@ func (t *testCommon) TestEncodeAsPrefix(c *C) {
adapter: UpstreamSubTaskKeyAdapter,
want: "/dm-master/upstream/subtask/6d7973716c31/",
},
{
keys: []string{"test", "target_db"},
adapter: ShardDDLOptimismInitSchemaKeyAdapter,
want: "/dm-master/shardddl-optimism/init-schema/74657374/7461726765745f6462/",
},
}

for _, ca := range testCases {
Expand All @@ -121,18 +110,6 @@ func (t *testCommon) TestEncodeAsPrefix(c *C) {
_, err := ca.adapter.Decode(encKey)
c.Assert(err, NotNil)
}

keys := []string{"test", "target_db", "target_table"}
fullEncodedKey := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys...)
prefixEncodedKey := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys[:len(keys)-1]...)
c.Assert(strings.HasPrefix(fullEncodedKey, prefixEncodedKey), IsTrue)

keys2 := []string{"test", "target_db_2", "target_table_2"}
fullEncodedKey2 := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys2...)
prefixEncodedKey2 := ShardDDLOptimismInitSchemaKeyAdapter.Encode(keys2[:len(keys2)-1]...)

c.Assert(strings.HasPrefix(fullEncodedKey, prefixEncodedKey2), IsFalse)
c.Assert(strings.HasPrefix(fullEncodedKey2, prefixEncodedKey), IsFalse)
}

func (t *testCommon) TestIsErrNetClosing(c *C) {
Expand Down
13 changes: 13 additions & 0 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,19 @@ func (s *Scheduler) getSubTaskCfgByTaskSource(task, source string) *config.SubTa
return &clone
}

// GetDownstreamMetaByTask gets downstream db config and meta config by task name.
func (s *Scheduler) GetDownstreamMetaByTask(task string) (*config.DBConfig, string) {
v, ok := s.subTaskCfgs.Load(task)
if !ok {
return nil, ""
}
cfgM := v.(map[string]config.SubTaskConfig)
for _, cfg := range cfgM {
return cfg.To.Clone(), cfg.MetaSchema
}
return nil, ""
}

// GetSubTaskCfgsByTask gets subtask configs' map by task name.
func (s *Scheduler) GetSubTaskCfgsByTask(task string) map[string]*config.SubTaskConfig {
v, ok := s.subTaskCfgs.Load(task)
Expand Down
16 changes: 16 additions & 0 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,15 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
c.Assert(s.AddSubTasks(false), IsNil) // can call without configs, return without error, but take no effect.
t.subTaskCfgNotExist(c, s, taskName1, sourceID1)
t.subTaskStageMatch(c, s, taskName1, sourceID1, pb.Stage_InvalidStage)
t.downstreamMetaNotExist(c, s, taskName1)
// start the task.
c.Assert(s.AddSubTasks(false, subtaskCfg1), IsNil)
c.Assert(terror.ErrSchedulerSubTaskExist.Equal(s.AddSubTasks(false, subtaskCfg1)), IsTrue) // add again.
// subtask config and stage exist.
t.subTaskCfgExist(c, s, subtaskCfg1)
t.subTaskStageMatch(c, s, taskName1, sourceID1, pb.Stage_Running)
t.downstreamMetaExist(c, s, taskName1, subtaskCfg1.To, subtaskCfg1.MetaSchema)
t.downstreamMetaNotExist(c, s, taskName2)

// update source config when task already started will failed
c.Assert(terror.ErrSchedulerSourceOpTaskExist.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)
Expand Down Expand Up @@ -629,6 +632,19 @@ func (t *testScheduler) subTaskCfgExist(c *C, s *Scheduler, expectCfg config.Sub
c.Assert(cfgM[expectCfg.Name], DeepEquals, expectCfg)
}

func (t *testScheduler) downstreamMetaNotExist(c *C, s *Scheduler, task string) {
dbConfig, metaConfig := s.GetDownstreamMetaByTask(task)
c.Assert(dbConfig, IsNil)
c.Assert(metaConfig, Equals, "")
}

func (t *testScheduler) downstreamMetaExist(c *C, s *Scheduler, task string, expectDBCfg config.DBConfig, expectMetaConfig string) {
dbConfig, metaConfig := s.GetDownstreamMetaByTask(task)
c.Assert(dbConfig, NotNil)
c.Assert(dbConfig, DeepEquals, &expectDBCfg)
c.Assert(metaConfig, Equals, expectMetaConfig)
}

func (t *testScheduler) workerNotExist(c *C, s *Scheduler, worker string) {
c.Assert(s.GetWorkerByName(worker), IsNil)
wm, _, err := ha.GetAllWorkerInfo(etcdTestCli)
Expand Down
16 changes: 14 additions & 2 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewServer(cfg *Config) *Server {
ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}),
}
server.pessimist = shardddl.NewPessimist(&logger, server.getTaskResources)
server.optimist = shardddl.NewOptimist(&logger)
server.optimist = shardddl.NewOptimist(&logger, server.scheduler.GetDownstreamMetaByTask)
server.closed.Store(true)
setUseTLS(&cfg.Security)

Expand Down Expand Up @@ -590,6 +590,18 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*

resp.Result = true
resp.Sources = s.getSourceRespsAfterOperation(ctx, req.Name, sources, []string{}, req)

if expect == pb.Stage_Stopped {
// delete meta data for optimist
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if len(req.Sources) == 0 {
err2 = s.optimist.RemoveMetaDataWithTask(req.Name)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
} else {
err2 = s.optimist.RemoveMetaDataWithTaskAndSources(req.Name, sources...)
}
if err2 != nil {
log.L().Error("failed to delete metadata for task", zap.String("task name", req.Name), log.ShortError(err2))
}
}
return resp, nil
}
Comment on lines +593 to 606
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix #3629

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems for a task of (source1, source2), source1 drop column/add index, stop-task -s source2, there's no chance to drop column/add index.

we can open another known issue or treat it as a limitation


Expand Down Expand Up @@ -1558,7 +1570,7 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
if err != nil {
return err
}
err = s.optimist.RemoveMetaData(taskName)
err = s.optimist.RemoveMetaDataWithTask(taskName)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
server.scheduler, _ = t.testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))
server.pessimist = shardddl.NewPessimist(&logger, func(task string) []string { return sources })
server.optimist = shardddl.NewOptimist(&logger)
server.optimist = shardddl.NewOptimist(&logger, server.scheduler.GetDownstreamMetaByTask)

var (
DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
Expand Down Expand Up @@ -1045,7 +1045,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
server.scheduler, _ = t.testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))
server.pessimist = shardddl.NewPessimist(&logger, func(task string) []string { return sources })
server.optimist = shardddl.NewOptimist(&logger)
server.optimist = shardddl.NewOptimist(&logger, server.scheduler.GetDownstreamMetaByTask)

var (
p = parser.New()
Expand Down
Loading