diff --git a/dm/syncer/opt_sharding_group.go b/dm/syncer/opt_sharding_group.go new file mode 100644 index 00000000000..ce4060de642 --- /dev/null +++ b/dm/syncer/opt_sharding_group.go @@ -0,0 +1,243 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "sync" + + "github.com/pingcap/tidb/util/filter" + "github.com/pingcap/tiflow/dm/config" + "github.com/pingcap/tiflow/dm/pkg/binlog" + tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/utils" + "go.uber.org/zap" +) + +// OptShardingGroup represents a optimistic sharding DDL sync group. +type OptShardingGroup struct { + sync.RWMutex + + // the conflict tableIDs hash set to quickly check whether this table is in conflict stage + // sourceTableID -> each table's conflicted ddl's startLocation + conflictTables map[string]binlog.Location + + firstConflictLocation binlog.Location + flavor string + enableGTID bool +} + +func NewOptShardingGroup(firstConflictLocation binlog.Location, flavor string, enableGTID bool) *OptShardingGroup { + return &OptShardingGroup{ + firstConflictLocation: firstConflictLocation, + flavor: flavor, + enableGTID: enableGTID, + conflictTables: make(map[string]binlog.Location, 1), + } +} + +func (s *OptShardingGroup) appendConflictTable(table *filter.Table, location binlog.Location) { + s.Lock() + defer s.Unlock() + s.conflictTables[table.String()] = location +} + +func (s *OptShardingGroup) inConflictStage(table *filter.Table) bool { + s.RLock() + defer s.RUnlock() + _, ok := s.conflictTables[utils.GenTableID(table)] + return ok +} + +func (s *OptShardingGroup) Remove(sourceTableIDs []string) { + s.Lock() + defer s.Unlock() + for _, sourceTbl := range sourceTableIDs { + delete(s.conflictTables, sourceTbl) + } +} + +// OptShardingGroupKeeper used to keep OptShardingGroup. +// It's used to keep sharding group meta data to make sure optimistic sharding resync redirection works correctly. +// +// newer +// │ ───────────────────────► time +// │ +// │ tb1 conflict DDL1 │ ▲ │ +// │ │ │ │ +// │ ... │ │ │ +// │ │ │ │ +// │ tb1 conflict DDL2 │ │ │ ▲ │ +// │ │ │ │ │ │ +// │ ... │ │ │ │ │ +// │ │ │ │ │ │ +// │ tb2 conflict DDL1 ▼ │ │ │ │ +// │ │ │ │ +// │ ... redirect │ │ │ +// │ │ │ │ +// │ tb2 conflict DDL2 ▼ │ │ +// │ │ +// │ ... redirect │ +// │ │ +// │ other dml events ▼ +// │ +// │ continue +// ▼ replicating +// +// newer +// binlog +// One redirection example is listed as above. +type OptShardingGroupKeeper struct { + sync.RWMutex + groups map[string]*OptShardingGroup // target table ID -> ShardingGroup + cfg *config.SubTaskConfig + tctx *tcontext.Context + // shardingReSyncs is used to save the shardingResyncs' redirect locations that are resolved but not finished + shardingReSyncs map[string]binlog.Location +} + +// NewOptShardingGroupKeeper creates a new OptShardingGroupKeeper. +func NewOptShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) *OptShardingGroupKeeper { + return &OptShardingGroupKeeper{ + groups: make(map[string]*OptShardingGroup), + cfg: cfg, + tctx: tctx.WithLogger(tctx.L().WithFields(zap.String("component", "optimistic shard group keeper"))), + shardingReSyncs: make(map[string]binlog.Location), + } +} + +func (k *OptShardingGroupKeeper) resolveGroup(targetTable *filter.Table) (map[string]binlog.Location, binlog.Location) { + targetTableID := utils.GenTableID(targetTable) + k.Lock() + defer k.Unlock() + group, ok := k.groups[targetTableID] + if !ok { + return nil, binlog.Location{} + } + delete(k.groups, targetTableID) + return group.conflictTables, group.firstConflictLocation +} + +func (k *OptShardingGroupKeeper) inConflictStage(sourceTable, targetTable *filter.Table) bool { + targetTableID := utils.GenTableID(targetTable) + k.RLock() + group, ok := k.groups[targetTableID] + k.RUnlock() + if !ok { + return false + } + + return group.inConflictStage(sourceTable) +} + +func (k *OptShardingGroupKeeper) tableInConflict(targetTable *filter.Table) bool { + targetTableID := utils.GenTableID(targetTable) + k.RLock() + defer k.RUnlock() + _, ok := k.groups[targetTableID] + return ok +} + +// appendConflictTable returns whether sourceTable is the first conflict table for targetTable. +func (k *OptShardingGroupKeeper) appendConflictTable(sourceTable, targetTable *filter.Table, + conflictLocation binlog.Location, flavor string, enableGTID bool, +) bool { + targetTableID := utils.GenTableID(targetTable) + k.Lock() + group, ok := k.groups[targetTableID] + if !ok { + group = NewOptShardingGroup(conflictLocation, flavor, enableGTID) + k.groups[targetTableID] = group + } + k.Unlock() + group.appendConflictTable(sourceTable, conflictLocation) + return !ok +} + +func (k *OptShardingGroupKeeper) addShardingReSync(shardingReSync *ShardingReSync) { + if shardingReSync != nil { + k.shardingReSyncs[shardingReSync.targetTable.String()] = shardingReSync.currLocation + } +} + +func (k *OptShardingGroupKeeper) removeShardingReSync(shardingReSync *ShardingReSync) { + if shardingReSync != nil { + delete(k.shardingReSyncs, shardingReSync.targetTable.String()) + } +} + +func (k *OptShardingGroupKeeper) getShardingResyncs() map[string]binlog.Location { + return k.shardingReSyncs +} + +func (k *OptShardingGroupKeeper) lowestFirstLocationInGroups() *binlog.Location { + k.RLock() + defer k.RUnlock() + var lowest *binlog.Location + for _, group := range k.groups { + if lowest == nil || binlog.CompareLocation(*lowest, group.firstConflictLocation, k.cfg.EnableGTID) > 0 { + lowest = &group.firstConflictLocation + } + } + for _, currLocation := range k.shardingReSyncs { + if lowest == nil || binlog.CompareLocation(*lowest, currLocation, k.cfg.EnableGTID) > 0 { + loc := currLocation // make sure lowest can point to correct variable + lowest = &loc + } + } + if lowest == nil { + return nil + } + loc := lowest.Clone() + return &loc +} + +// AdjustGlobalLocation adjusts globalLocation with sharding groups' lowest first point. +func (k *OptShardingGroupKeeper) AdjustGlobalLocation(globalLocation binlog.Location) binlog.Location { + lowestFirstLocation := k.lowestFirstLocationInGroups() + if lowestFirstLocation != nil && binlog.CompareLocation(*lowestFirstLocation, globalLocation, k.cfg.EnableGTID) < 0 { + return *lowestFirstLocation + } + return globalLocation +} + +func (k *OptShardingGroupKeeper) RemoveGroup(targetTable *filter.Table, sourceTableIDs []string) { + targetTableID := utils.GenTableID(targetTable) + + k.Lock() + defer k.Unlock() + if group, ok := k.groups[targetTableID]; ok { + group.Remove(sourceTableIDs) + if len(group.conflictTables) == 0 { + delete(k.groups, targetTableID) + } + } +} + +func (k *OptShardingGroupKeeper) RemoveSchema(schema string) { + k.Lock() + defer k.Unlock() + for targetTableID := range k.groups { + if targetTable := utils.UnpackTableID(targetTableID); targetTable.Schema == schema { + delete(k.groups, targetTableID) + } + } +} + +// Reset resets the keeper. +func (k *OptShardingGroupKeeper) Reset() { + k.Lock() + defer k.Unlock() + k.groups = make(map[string]*OptShardingGroup) + k.shardingReSyncs = make(map[string]binlog.Location) +} diff --git a/dm/syncer/opt_sharding_group_test.go b/dm/syncer/opt_sharding_group_test.go new file mode 100644 index 00000000000..9cd4eec7e6e --- /dev/null +++ b/dm/syncer/opt_sharding_group_test.go @@ -0,0 +1,165 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "context" + "testing" + + "github.com/pingcap/tiflow/dm/config" + "github.com/pingcap/tiflow/dm/pkg/binlog" + tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/schema" + "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/dm/syncer/shardddl" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type optShardingGroupSuite struct { + suite.Suite + cfg *config.SubTaskConfig +} + +func (s *optShardingGroupSuite) SetupSuite() { + s.cfg = &config.SubTaskConfig{ + SourceID: "mysql-replica-01", + MetaSchema: "test", + Name: "checkpoint_ut", + } + require.NoError(s.T(), log.InitLogger(&log.Config{})) +} + +func TestOptShardingGroupSuite(t *testing.T) { + suite.Run(t, new(optShardingGroupSuite)) +} + +func (s *optShardingGroupSuite) TestLowestFirstPosInOptGroups() { + s.T().Parallel() + var ( + db1tbl = "`db1`.`tbl`" + db2tbl = "`db2`.`tbl`" + db3tbl = "`db3`.`tbl`" + sourceTbls = []string{"`db1`.`tbl1`", "`db1`.`tbl2`", "`db2`.`tbl1`", "`db2`.`tbl2`", "`db3`.`tbl1`"} + targetTbls = []string{db1tbl, db1tbl, db2tbl, db2tbl, db3tbl} + positions = []binlog.Location{pos11, pos12, pos21, pos22, pos3} + ) + + k := NewOptShardingGroupKeeper(tcontext.Background(), s.cfg) + for i := range sourceTbls { + k.appendConflictTable(utils.UnpackTableID(sourceTbls[i]), utils.UnpackTableID(targetTbls[i]), positions[i], "", false) + } + + require.Equal(s.T(), pos21.Position, k.lowestFirstLocationInGroups().Position) + k.resolveGroup(utils.UnpackTableID(db2tbl)) + k.addShardingReSync(&ShardingReSync{ + targetTable: utils.UnpackTableID(db2tbl), + currLocation: pos21, + }) + // should still be pos21, because it's added to shardingReSyncs + require.Equal(s.T(), pos21.Position, k.lowestFirstLocationInGroups().Position) + k.removeShardingReSync(&ShardingReSync{targetTable: utils.UnpackTableID(db2tbl)}) + // should be pos11 now, pos21 is totally resolved + require.Equal(s.T(), pos11.Position, k.lowestFirstLocationInGroups().Position) + + // reset + k.Reset() + require.Len(s.T(), k.groups, 0) + require.Len(s.T(), k.shardingReSyncs, 0) +} + +func (s *optShardingGroupSuite) TestSync() { + s.T().Parallel() + var ( + db1tbl = "`db1`.`tbl`" + db2tbl = "`db2`.`tbl`" + db3tbl = "`db3`.`tbl`" + sourceTbls = []string{"`db1`.`tbl1`", "`db1`.`tbl2`", "`db2`.`tbl1`", "`db2`.`tbl2`", "`db3`.`tbl1`"} + targetTbls = []string{db1tbl, db1tbl, db2tbl, db2tbl, db3tbl} + positions = []binlog.Location{pos11, pos12, pos21, pos22, pos3} + logger = log.L() + err error + ) + + k := NewOptShardingGroupKeeper(tcontext.Background(), s.cfg) + for i := range sourceTbls { + k.appendConflictTable(utils.UnpackTableID(sourceTbls[i]), utils.UnpackTableID(targetTbls[i]), positions[i], "", false) + } + + shardingReSyncCh := make(chan *ShardingReSync, 10) + + syncer := Syncer{ + osgk: k, + tctx: tcontext.Background().WithLogger(logger), + optimist: shardddl.NewOptimist(&logger, nil, "", ""), + checkpoint: &mockCheckpoint{}, + } + syncer.schemaTracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, syncer.downstreamTrackConn, log.L()) + require.NoError(s.T(), err) + + // case 1: mock receive resolved stage from dm-master when syncing other tables + require.Equal(s.T(), pos21.Position, k.lowestFirstLocationInGroups().Position) + require.True(s.T(), k.tableInConflict(utils.UnpackTableID(db2tbl))) + require.True(s.T(), k.inConflictStage(utils.UnpackTableID(sourceTbls[3]), utils.UnpackTableID(db2tbl))) + syncer.resolveOptimisticDDL(&eventContext{ + shardingReSyncCh: &shardingReSyncCh, + endLocation: endPos3, + }, utils.UnpackTableID(sourceTbls[2]), utils.UnpackTableID(db2tbl)) + require.False(s.T(), k.tableInConflict(utils.UnpackTableID(db2tbl))) + require.False(s.T(), k.inConflictStage(utils.UnpackTableID(sourceTbls[3]), utils.UnpackTableID(db2tbl))) + require.Len(s.T(), shardingReSyncCh, 1) + shardingResync := <-shardingReSyncCh + expectedShardingResync := &ShardingReSync{ + currLocation: pos21, + latestLocation: endPos3, + targetTable: utils.UnpackTableID(db2tbl), + allResolved: true, + } + require.Equal(s.T(), expectedShardingResync, shardingResync) + // the ShardingResync is not removed from osgk, so lowest location is still pos21 + require.Equal(s.T(), pos21.Position, k.lowestFirstLocationInGroups().Position) + k.removeShardingReSync(shardingResync) + + // case 2: mock receive resolved stage from dm-master in handleQueryEventOptimistic + require.Equal(s.T(), pos11.Position, k.lowestFirstLocationInGroups().Position) + require.True(s.T(), k.tableInConflict(utils.UnpackTableID(db1tbl))) + require.True(s.T(), k.inConflictStage(utils.UnpackTableID(sourceTbls[0]), utils.UnpackTableID(db1tbl))) + syncer.resolveOptimisticDDL(&eventContext{ + shardingReSyncCh: &shardingReSyncCh, + endLocation: endPos12, + }, utils.UnpackTableID(sourceTbls[1]), utils.UnpackTableID(db1tbl)) + require.False(s.T(), k.tableInConflict(utils.UnpackTableID(db1tbl))) + require.False(s.T(), k.inConflictStage(utils.UnpackTableID(sourceTbls[0]), utils.UnpackTableID(db1tbl))) + require.Len(s.T(), shardingReSyncCh, 1) + shardingResync = <-shardingReSyncCh + expectedShardingResync = &ShardingReSync{ + currLocation: pos11, + latestLocation: endPos12, + targetTable: utils.UnpackTableID(db1tbl), + allResolved: true, + } + require.Equal(s.T(), expectedShardingResync, shardingResync) + require.Equal(s.T(), pos11.Position, k.lowestFirstLocationInGroups().Position) + k.removeShardingReSync(shardingResync) + + // case 3: mock drop table, should resolve conflict stage + require.Equal(s.T(), pos3.Position, k.lowestFirstLocationInGroups().Position) + require.True(s.T(), k.tableInConflict(utils.UnpackTableID(db3tbl))) + require.True(s.T(), k.inConflictStage(utils.UnpackTableID(sourceTbls[4]), utils.UnpackTableID(db3tbl))) + k.RemoveGroup(utils.UnpackTableID(db3tbl), []string{sourceTbls[4]}) + require.False(s.T(), k.tableInConflict(utils.UnpackTableID(db3tbl))) + require.False(s.T(), k.inConflictStage(utils.UnpackTableID(sourceTbls[4]), utils.UnpackTableID(db3tbl))) + require.Len(s.T(), shardingReSyncCh, 0) +} diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 16520d897cd..e53deacc3bc 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -546,6 +546,7 @@ func (s *Syncer) reset() { s.sgk.ResetGroups() s.pessimist.Reset() case config.ShardOptimistic: + s.osgk.Reset() s.optimist.Reset() } }