diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index e90e209e97a..5418d4f8d01 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -19,7 +19,7 @@ import ( "encoding/json" "fmt" "math" - "math/rand" + "sync/atomic" "time" "unsafe" @@ -39,7 +39,9 @@ import ( ) const ( - defaultOutputChanSize = 128000 + // The buffer size of input channel of each mounter worker. + // 16 is large enough, because a channel exclusively belongs to a worker. + defaultInputChanSize = 16 ) type baseKVEntry struct { @@ -67,7 +69,11 @@ type rowKVEntry struct { // Mounter is used to parse SQL events from KV events type Mounter interface { Run(ctx context.Context) error - Input() chan<- *model.PolymorphicEvent + // AddEntry accepts `model.PolymorphicEvent` with `RawKVEntry` filled and + // decodes `RawKVEntry` into `RowChangedEvent`. + // It also close `model.PolymorphicEvent.finished` channel to notify callers + // that decoding is done. + AddEntry(ctx context.Context, event *model.PolymorphicEvent) error } type mounterImpl struct { @@ -76,6 +82,9 @@ type mounterImpl struct { tz *time.Location workerNum int enableOldValue bool + + // index is an atomic variable to dispatch input events to workers. + index int64 } // NewMounter creates a mounter @@ -85,7 +94,7 @@ func NewMounter(schemaStorage SchemaStorage, workerNum int, enableOldValue bool) } chs := make([]chan *model.PolymorphicEvent, workerNum) for i := 0; i < workerNum; i++ { - chs[i] = make(chan *model.PolymorphicEvent, defaultOutputChanSize) + chs[i] = make(chan *model.PolymorphicEvent, defaultInputChanSize) } return &mounterImpl{ schemaStorage: schemaStorage, @@ -100,17 +109,34 @@ const defaultMounterWorkerNum = 32 func (m *mounterImpl) Run(ctx context.Context) error { m.tz = util.TimezoneFromCtx(ctx) errg, ctx := errgroup.WithContext(ctx) - errg.Go(func() error { - m.collectMetrics(ctx) - return nil - }) for i := 0; i < m.workerNum; i++ { index := i errg.Go(func() error { return m.codecWorker(ctx, index) }) } - return errg.Wait() + + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID) + + flushMetricsInterval := 15 * time.Second + timer := time.NewTimer(flushMetricsInterval) + defer timer.Stop() + for { + select { + // ctx.Done returns when parent ctx done or error occurs in errg. + case <-ctx.Done(): + return errg.Wait() + case <-timer.C: + chSize := 0 + for _, ch := range m.rawRowChangedChs { + chSize += len(ch) + } + metricMounterInputChanSize.Set(float64(chSize)) + timer.Reset(flushMetricsInterval) + } + } } func (m *mounterImpl) codecWorker(ctx context.Context, index int) error { @@ -148,26 +174,13 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error { } } -func (m *mounterImpl) Input() chan<- *model.PolymorphicEvent { - return m.rawRowChangedChs[rand.Intn(m.workerNum)] -} - -func (m *mounterImpl) collectMetrics(ctx context.Context) { - captureAddr := util.CaptureAddrFromCtx(ctx) - changefeedID := util.ChangefeedIDFromCtx(ctx) - metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID) - - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second * 15): - chSize := 0 - for _, ch := range m.rawRowChangedChs { - chSize += len(ch) - } - metricMounterInputChanSize.Set(float64(chSize)) - } +func (m *mounterImpl) AddEntry(ctx context.Context, event *model.PolymorphicEvent) error { + index := atomic.AddInt64(&m.index, 1) % int64(m.workerNum) + select { + case <-ctx.Done(): + return ctx.Err() + case m.rawRowChangedChs[index] <- event: + return nil } } diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 90ecad4b672..3032c41fcaa 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -176,10 +176,9 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b // this separate goroutine to prevent blocking // the whole pipeline. msg.SetUpFinishedChan() - select { - case <-ctx.Done(): - return nil - case n.mounter.Input() <- msg: + err := n.mounter.AddEntry(ctx, msg) + if err != nil { + return errors.Trace(err) } commitTs := msg.CRTs @@ -198,7 +197,7 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b } // Must wait before accessing msg.Row - err := msg.WaitPrepare(ctx) + err = msg.WaitPrepare(ctx) if err != nil { if errors.Cause(err) != context.Canceled { ctx.Throw(err) diff --git a/cdc/sorter/unified/merger.go b/cdc/sorter/unified/merger.go index b3fe8cab196..74bfa9193d5 100644 --- a/cdc/sorter/unified/merger.go +++ b/cdc/sorter/unified/merger.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sorter" cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/notify" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -405,8 +404,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return nil } - resolvedTsNotifier := ¬ify.Notifier{} - defer resolvedTsNotifier.Close() + resolvedTsNotifierChan := make(chan struct{}, 1) errg, ctx := errgroup.WithContext(ctx) errg.Go(func() error { @@ -443,40 +441,46 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch if minTemp > minResolvedTs { atomic.StoreUint64(&minResolvedTs, minTemp) - resolvedTsNotifier.Notify() + select { + case resolvedTsNotifierChan <- struct{}{}: + default: + } } } }) errg.Go(func() error { - resolvedTsReceiver, err := resolvedTsNotifier.NewReceiver(time.Second * 1) - if err != nil { - if cerrors.ErrOperateOnClosedNotifier.Equal(err) { - // This won't happen unless `resolvedTsNotifier` has been closed, which is - // impossible at this point. - log.Panic("unexpected error", zap.Error(err)) - } - return errors.Trace(err) - } + resolvedTsTicker := time.NewTicker(time.Second * 1) - defer resolvedTsReceiver.Stop() + defer resolvedTsTicker.Stop() var lastResolvedTs uint64 + resolvedTsTickFunc := func() error { + curResolvedTs := atomic.LoadUint64(&minResolvedTs) + if curResolvedTs > lastResolvedTs { + err := onMinResolvedTsUpdate(curResolvedTs) + if err != nil { + return errors.Trace(err) + } + } else if curResolvedTs < lastResolvedTs { + log.Panic("resolved-ts regressed in sorter", + zap.Uint64("curResolvedTs", curResolvedTs), + zap.Uint64("lastResolvedTs", lastResolvedTs)) + } + return nil + } + for { select { case <-ctx.Done(): return ctx.Err() - case <-resolvedTsReceiver.C: - curResolvedTs := atomic.LoadUint64(&minResolvedTs) - if curResolvedTs > lastResolvedTs { - err := onMinResolvedTsUpdate(curResolvedTs) - if err != nil { - return errors.Trace(err) - } - } else if curResolvedTs < lastResolvedTs { - log.Panic("resolved-ts regressed in sorter", - zap.Uint64("curResolved-ts", curResolvedTs), - zap.Uint64("lastResolved-ts", lastResolvedTs)) + case <-resolvedTsTicker.C: + if err := resolvedTsTickFunc(); err != nil { + return err + } + case <-resolvedTsNotifierChan: + if err := resolvedTsTickFunc(); err != nil { + return err } } } diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index b2062ac7ece..fd7ed6555d0 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -220,27 +220,27 @@ func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err e s.reset() // reset previous status. // recover previous status from etcd. - err = s.recoverSources(etcdCli) + err = s.recoverSources() if err != nil { return err } - err = s.recoverSubTasks(etcdCli) + err = s.recoverSubTasks() if err != nil { return err } - err = s.recoverRelayConfigs(etcdCli) + err = s.recoverRelayConfigs() if err != nil { return err } var loadTaskRev int64 - loadTaskRev, err = s.recoverLoadTasks(etcdCli, false) + loadTaskRev, err = s.recoverLoadTasks(false) if err != nil { return err } var rev int64 - rev, err = s.recoverWorkersBounds(etcdCli) + rev, err = s.recoverWorkersBounds() if err != nil { return err } @@ -266,7 +266,7 @@ func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err e // starting to observe status of DM-worker instances. // TODO: handle fatal error from observeWorkerEvent //nolint:errcheck - s.observeWorkerEvent(ctx, etcdCli, rev1) + s.observeWorkerEvent(ctx, rev1) }(rev) s.wg.Add(1) @@ -275,7 +275,7 @@ func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err e // starting to observe load task. // TODO: handle fatal error from observeLoadTask //nolint:errcheck - s.observeLoadTask(ctx, etcdCli, rev1) + s.observeLoadTask(ctx, rev1) }(loadTaskRev) s.started.Store(true) // started now @@ -1578,14 +1578,14 @@ func (s *Scheduler) Started() bool { } // recoverSourceCfgs recovers history source configs and expectant relay stages from etcd. -func (s *Scheduler) recoverSources(cli *clientv3.Client) error { +func (s *Scheduler) recoverSources() error { // get all source configs. - cfgM, _, err := ha.GetSourceCfg(cli, "", 0) + cfgM, _, err := ha.GetSourceCfg(s.etcdCli, "", 0) if err != nil { return err } // get all relay stages. - stageM, _, err := ha.GetAllRelayStage(cli) + stageM, _, err := ha.GetAllRelayStage(s.etcdCli) if err != nil { return err } @@ -1602,14 +1602,14 @@ func (s *Scheduler) recoverSources(cli *clientv3.Client) error { } // recoverSubTasks recovers history subtask configs and expectant subtask stages from etcd. -func (s *Scheduler) recoverSubTasks(cli *clientv3.Client) error { +func (s *Scheduler) recoverSubTasks() error { // get all subtask configs. - cfgMM, _, err := ha.GetAllSubTaskCfg(cli) + cfgMM, _, err := ha.GetAllSubTaskCfg(s.etcdCli) if err != nil { return err } // get all subtask stages. - stageMM, _, err := ha.GetAllSubTaskStage(cli) + stageMM, _, err := ha.GetAllSubTaskStage(s.etcdCli) if err != nil { return err } @@ -1637,8 +1637,8 @@ func (s *Scheduler) recoverSubTasks(cli *clientv3.Client) error { // This function also removes conflicting relay schedule types, which means if a source has both `enable-relay` and // (source, worker) relay config, we remove the latter. // should be called after recoverSources. -func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error { - relayWorkers, _, err := ha.GetAllRelayConfig(cli) +func (s *Scheduler) recoverRelayConfigs() error { + relayWorkers, _, err := ha.GetAllRelayConfig(s.etcdCli) if err != nil { return err } @@ -1651,7 +1651,7 @@ func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error { } if sourceCfg.EnableRelay { // current etcd max-txn-op is 2048 - _, err2 := ha.DeleteRelayConfig(cli, utils.SetToSlice(workers)...) + _, err2 := ha.DeleteRelayConfig(s.etcdCli, utils.SetToSlice(workers)...) if err2 != nil { return err2 } @@ -1664,12 +1664,12 @@ func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error { } // recoverLoadTasks recovers history load workers from etcd. -func (s *Scheduler) recoverLoadTasks(cli *clientv3.Client, needLock bool) (int64, error) { +func (s *Scheduler) recoverLoadTasks(needLock bool) (int64, error) { if needLock { s.mu.Lock() defer s.mu.Unlock() } - loadTasks, rev, err := ha.GetAllLoadTask(cli) + loadTasks, rev, err := ha.GetAllLoadTask(s.etcdCli) if err != nil { return 0, err } @@ -1680,11 +1680,11 @@ func (s *Scheduler) recoverLoadTasks(cli *clientv3.Client, needLock bool) (int64 // recoverWorkersBounds recovers history DM-worker info and status from etcd. // and it also recovers the bound/unbound relationship. -func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { +func (s *Scheduler) recoverWorkersBounds() (int64, error) { // 1. get all history base info. // it should no new DM-worker registered between this call and the below `GetKeepAliveWorkers`, // because no DM-master leader are handling DM-worker register requests. - wim, _, err := ha.GetAllWorkerInfo(cli) + wim, _, err := ha.GetAllWorkerInfo(s.etcdCli) if err != nil { return 0, err } @@ -1692,18 +1692,18 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { // 2. get all history bound relationships. // it should no new bound relationship added between this call and the below `GetKeepAliveWorkers`, // because no DM-master leader are doing the scheduler. - sbm, _, err := ha.GetSourceBound(cli, "") + sbm, _, err := ha.GetSourceBound(s.etcdCli, "") if err != nil { return 0, err } - lastSourceBoundM, _, err := ha.GetLastSourceBounds(cli) + lastSourceBoundM, _, err := ha.GetLastSourceBounds(s.etcdCli) if err != nil { return 0, err } s.lastBound = lastSourceBoundM // 3. get all history offline status. - kam, rev, err := ha.GetKeepAliveWorkers(cli) + kam, rev, err := ha.GetKeepAliveWorkers(s.etcdCli) if err != nil { return 0, err } @@ -1767,7 +1767,7 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { for name := range sbm { invalidSourceBounds = append(invalidSourceBounds, name) } - _, err = ha.DeleteSourceBound(cli, invalidSourceBounds...) + _, err = ha.DeleteSourceBound(s.etcdCli, invalidSourceBounds...) if err != nil { return 0, err } @@ -1775,7 +1775,7 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { // 6. put trigger source bounds info to etcd to order dm-workers to start source if len(boundsToTrigger) > 0 { - _, err = ha.PutSourceBound(cli, boundsToTrigger...) + _, err = ha.PutSourceBound(s.etcdCli, boundsToTrigger...) if err != nil { return 0, err } @@ -1791,12 +1791,12 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { return rev, nil } -func (s *Scheduler) resetWorkerEv(cli *clientv3.Client) (int64, error) { +func (s *Scheduler) resetWorkerEv() (int64, error) { s.mu.Lock() defer s.mu.Unlock() rwm := s.workers - kam, rev, err := ha.GetKeepAliveWorkers(cli) + kam, rev, err := ha.GetKeepAliveWorkers(s.etcdCli) if err != nil { return 0, err } @@ -1856,7 +1856,7 @@ func (s *Scheduler) handleWorkerEv(ctx context.Context, evCh <-chan ha.WorkerEve } // nolint:dupl -func (s *Scheduler) observeWorkerEvent(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { +func (s *Scheduler) observeWorkerEvent(ctx context.Context, rev int64) error { var wg sync.WaitGroup for { workerEvCh := make(chan ha.WorkerEvent, 10) @@ -1870,7 +1870,7 @@ func (s *Scheduler) observeWorkerEvent(ctx context.Context, etcdCli *clientv3.Cl close(workerErrCh) wg.Done() }() - ha.WatchWorkerEvent(ctx1, etcdCli, rev+1, workerEvCh, workerErrCh) + ha.WatchWorkerEvent(ctx1, s.etcdCli, rev+1, workerEvCh, workerErrCh) }() err := s.handleWorkerEv(ctx1, workerEvCh, workerErrCh) cancel1() @@ -1884,7 +1884,7 @@ func (s *Scheduler) observeWorkerEvent(ctx context.Context, etcdCli *clientv3.Cl case <-ctx.Done(): return nil case <-time.After(500 * time.Millisecond): - rev, err = s.resetWorkerEv(etcdCli) + rev, err = s.resetWorkerEv() if err != nil { log.L().Error("resetWorkerEv is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum)) } @@ -2311,7 +2311,7 @@ func (s *Scheduler) SetWorkerClientForTest(name string, mockCli workerrpc.Client } // nolint:dupl -func (s *Scheduler) observeLoadTask(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { +func (s *Scheduler) observeLoadTask(ctx context.Context, rev int64) error { var wg sync.WaitGroup for { loadTaskCh := make(chan ha.LoadTask, 10) @@ -2325,7 +2325,7 @@ func (s *Scheduler) observeLoadTask(ctx context.Context, etcdCli *clientv3.Clien close(loadTaskErrCh) wg.Done() }() - ha.WatchLoadTask(ctx1, etcdCli, rev+1, loadTaskCh, loadTaskErrCh) + ha.WatchLoadTask(ctx1, s.etcdCli, rev+1, loadTaskCh, loadTaskErrCh) }() err := s.handleLoadTask(ctx1, loadTaskCh, loadTaskErrCh) cancel1() @@ -2339,7 +2339,7 @@ func (s *Scheduler) observeLoadTask(ctx context.Context, etcdCli *clientv3.Clien case <-ctx.Done(): return nil case <-time.After(500 * time.Millisecond): - rev, err = s.recoverLoadTasks(etcdCli, true) + rev, err = s.recoverLoadTasks(true) if err != nil { log.L().Error("resetLoadTask is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum)) } diff --git a/dm/dm/master/scheduler/scheduler_test.go b/dm/dm/master/scheduler/scheduler_test.go index cc22cbd43c6..f2d254bf0f0 100644 --- a/dm/dm/master/scheduler/scheduler_test.go +++ b/dm/dm/master/scheduler/scheduler_test.go @@ -986,7 +986,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) { wg.Add(1) go func() { defer wg.Done() - c.Assert(s.observeWorkerEvent(ctx2, etcdTestCli, startRev), IsNil) + c.Assert(s.observeWorkerEvent(ctx2, startRev), IsNil) }() // step 5.3: wait for scheduler to restart handleWorkerEvent, then start a new worker time.Sleep(time.Second) @@ -1008,7 +1008,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) { wg.Add(1) go func() { defer wg.Done() - c.Assert(s.observeWorkerEvent(ctx3, etcdTestCli, startRev), IsNil) + c.Assert(s.observeWorkerEvent(ctx3, startRev), IsNil) }() c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { bounds := s.BoundSources() @@ -1722,7 +1722,7 @@ func (t *testScheduler) TestWatchLoadTask(c *C) { wg.Add(1) go func() { defer wg.Done() - c.Assert(s.observeLoadTask(ctx1, etcdTestCli, startRev), IsNil) + c.Assert(s.observeLoadTask(ctx1, startRev), IsNil) }() // put task2, source1, worker1 @@ -1813,9 +1813,9 @@ func (t *testScheduler) TestWorkerHasDiffRelayAndBound(c *C) { go ha.KeepAlive(ctx, etcdTestCli, workerName1, keepAlive) // bootstrap - c.Assert(s.recoverSources(etcdTestCli), IsNil) - c.Assert(s.recoverRelayConfigs(etcdTestCli), IsNil) - _, err = s.recoverWorkersBounds(etcdTestCli) + c.Assert(s.recoverSources(), IsNil) + c.Assert(s.recoverRelayConfigs(), IsNil) + _, err = s.recoverWorkersBounds() c.Assert(err, IsNil) // check @@ -1876,9 +1876,9 @@ func (t *testScheduler) TestUpgradeCauseConflictRelayType(c *C) { go ha.KeepAlive(ctx, etcdTestCli, workerName2, keepAlive) // bootstrap - c.Assert(s.recoverSources(etcdTestCli), IsNil) - c.Assert(s.recoverRelayConfigs(etcdTestCli), IsNil) - _, err = s.recoverWorkersBounds(etcdTestCli) + c.Assert(s.recoverSources(), IsNil) + c.Assert(s.recoverRelayConfigs(), IsNil) + _, err = s.recoverWorkersBounds() c.Assert(err, IsNil) // check when the relay config is conflicting with source config, relay config should be deleted diff --git a/dm/loader/checkpoint.go b/dm/loader/checkpoint.go index c99afa2a34c..30575d258fc 100644 --- a/dm/loader/checkpoint.go +++ b/dm/loader/checkpoint.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/cputil" + fr "github.com/pingcap/tiflow/dm/pkg/func-rollback" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" @@ -101,7 +102,18 @@ type RemoteCheckPoint struct { } func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) { - db, dbConns, err := createConns(tctx, cfg, cfg.Name, cfg.SourceID, 1) + var err error + var db *conn.BaseDB + var dbConns []*DBConn + + rollbackHolder := fr.NewRollbackHolder("loader") + defer func() { + if err != nil { + rollbackHolder.RollbackReverseOrder() + } + }() + + db, dbConns, err = createConns(tctx, cfg, cfg.Name, cfg.SourceID, 1) if err != nil { return nil, err } @@ -116,6 +128,7 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s logger: tctx.L().WithFields(zap.String("component", "remote checkpoint")), } cp.restoringFiles.pos = make(map[string]map[string]FilePosSet) + rollbackHolder.Add(fr.FuncRollback{Name: "CloseRemoteCheckPoint", Fn: cp.Close}) err = cp.prepare(tctx) if err != nil { diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 47c18603fa8..3a47074999e 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -30,6 +30,7 @@ import ( tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/cputil" "github.com/pingcap/tiflow/dm/pkg/dumpling" + fr "github.com/pingcap/tiflow/dm/pkg/func-rollback" "github.com/pingcap/tiflow/dm/pkg/gtid" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/schema" @@ -228,7 +229,7 @@ type CheckPoint interface { DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error // IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint - IsOlderThanTablePoint(table *filter.Table, point binlog.Location, useLE bool) bool + IsOlderThanTablePoint(table *filter.Table, point binlog.Location, isDDL bool) bool // SaveGlobalPoint saves the global binlog stream's checkpoint // corresponding to Meta.Save @@ -411,17 +412,30 @@ func (cp *RemoteCheckPoint) Snapshot(isSyncFlush bool) *SnapshotInfo { } // Init implements CheckPoint.Init. -func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) error { +func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) (err error) { + var db *conn.BaseDB + var dbConns []*dbconn.DBConn + + rollbackHolder := fr.NewRollbackHolder("syncer") + defer func() { + if err != nil { + rollbackHolder.RollbackReverseOrder() + } + }() + checkPointDB := cp.cfg.To checkPointDB.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxCheckPointTimeout) - db, dbConns, err := dbconn.CreateConns(tctx, cp.cfg, &checkPointDB, 1) + db, dbConns, err = dbconn.CreateConns(tctx, cp.cfg, &checkPointDB, 1) if err != nil { - return err + return } cp.db = db cp.dbConn = dbConns[0] + rollbackHolder.Add(fr.FuncRollback{Name: "CloseRemoteCheckPoint", Fn: cp.Close}) - return cp.prepare(tctx) + err = cp.prepare(tctx) + + return } // Close implements CheckPoint.Close. @@ -561,14 +575,12 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche } // IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. -// For GTID replication, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are: -// - Query event e1, location is gset1 -// - Rows event e2, location is gset1 -// - XID event, location is gset2 -// We should note that e1 is not older than e2 -// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position. -// if useLE is true, we use less than or equal. -func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location, useLE bool) bool { +// This function is used to skip old binlog events. Table checkpoint is saved after dispatching a binlog event. +// - For GTID based and position based replication, DML handling is different. When using position based, each event has +// unique position so we have confident to skip event which is <= table checkpoint. When using GTID based, there may +// be more than one event with same GTID, so we can only skip event which is < table checkpoint. +// - DDL will not have unique position or GTID, so we can always skip events <= table checkpoint. +func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location, isDDL bool) bool { cp.RLock() defer cp.RUnlock() sourceSchema, sourceTable := table.Schema, table.Name @@ -583,7 +595,7 @@ func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location oldLocation := point.MySQLLocation() cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation)) - if useLE { + if isDDL || !cp.cfg.EnableGTID { return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) <= 0 } return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) < 0 diff --git a/dm/syncer/optimist.go b/dm/syncer/optimist.go index aa6a1b9b0db..6fed273462c 100644 --- a/dm/syncer/optimist.go +++ b/dm/syncer/optimist.go @@ -191,7 +191,7 @@ func (s *Syncer) handleQueryEventOptimistic(qec *queryEventContext) error { qec.shardingDDLInfo = trackInfos[0] job := newDDLJob(qec) - err = s.handleJobFunc(job) + _, err = s.handleJobFunc(job) if err != nil { return err } diff --git a/dm/syncer/sharding_group.go b/dm/syncer/sharding_group.go index d330642a9f2..2ff48dba4ff 100644 --- a/dm/syncer/sharding_group.go +++ b/dm/syncer/sharding_group.go @@ -80,6 +80,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/cputil" + fr "github.com/pingcap/tiflow/dm/pkg/func-rollback" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/dm/syncer/dbconn" @@ -427,17 +428,31 @@ func (k *ShardingGroupKeeper) AddGroup(targetTable *filter.Table, sourceIDs []st } // Init does initialization staff. -func (k *ShardingGroupKeeper) Init() error { +func (k *ShardingGroupKeeper) Init() (err error) { + var db *conn.BaseDB + var dbConns []*dbconn.DBConn + + rollbackHolder := fr.NewRollbackHolder("syncer") + defer func() { + if err != nil { + rollbackHolder.RollbackReverseOrder() + } + }() + k.clear() sgkDB := k.cfg.To sgkDB.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxCheckPointTimeout) - db, dbConns, err := dbconn.CreateConns(k.tctx, k.cfg, &sgkDB, 1) + db, dbConns, err = dbconn.CreateConns(k.tctx, k.cfg, &sgkDB, 1) if err != nil { - return err + return } k.db = db k.dbConn = dbConns[0] - return k.prepare() + rollbackHolder.Add(fr.FuncRollback{Name: "CloseShardingGroupKeeper", Fn: k.Close}) + + err = k.prepare() + + return } // clear clears all sharding groups. diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 7906719a19b..763d90efefb 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -219,7 +219,7 @@ type Syncer struct { isQueryEvent bool } - handleJobFunc func(*job) error + handleJobFunc func(*job) (bool, error) flushSeq int64 // `lower_case_table_names` setting of upstream db @@ -991,7 +991,7 @@ func (s *Syncer) checkShouldFlush() error { // TODO: move to syncer/job.go // handleJob will do many actions based on job type. -func (s *Syncer) handleJob(job *job) (err error) { +func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) { skipCheckFlush := false defer func() { if !skipCheckFlush && err == nil { @@ -1012,7 +1012,7 @@ func (s *Syncer) handleJob(job *job) (err error) { if waitXIDStatus(s.waitXIDJob.Load()) == waitComplete && job.tp != flush { s.tctx.L().Info("All jobs is completed before syncer close, the coming job will be reject", zap.Any("job", job)) - return nil + return } switch job.tp { @@ -1020,15 +1020,16 @@ func (s *Syncer) handleJob(job *job) (err error) { s.waitXIDJob.CAS(int64(waiting), int64(waitComplete)) s.saveGlobalPoint(job.location) s.isTransactionEnd = true - return nil + return case skip: s.updateReplicationJobTS(job, skipJobIdx) - return nil + return } // 2. send the job to queue s.addJob(job) + added2Queue = true // 3. after job is sent to queue @@ -1038,14 +1039,14 @@ func (s *Syncer) handleJob(job *job) (err error) { // caller s.isTransactionEnd = false skipCheckFlush = true - return nil + return case ddl: s.jobWg.Wait() // skip rest logic when downstream error if s.execError.Load() != nil { // nolint:nilerr - return nil + return } s.updateReplicationJobTS(job, ddlJobIdx) @@ -1058,7 +1059,7 @@ func (s *Syncer) handleJob(job *job) (err error) { failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { err = handleFlushCheckpointStage(3, val.(int), "before save checkpoint") if err != nil { - failpoint.Return(err) + failpoint.Return() } }) // save global checkpoint for DDL @@ -1078,19 +1079,22 @@ func (s *Syncer) handleJob(job *job) (err error) { failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { err = handleFlushCheckpointStage(4, val.(int), "before flush checkpoint") if err != nil { - failpoint.Return(err) + failpoint.Return() } }) skipCheckFlush = true - return s.flushCheckPoints() + err = s.flushCheckPoints() + return case flush: s.jobWg.Wait() skipCheckFlush = true - return s.flushCheckPoints() + err = s.flushCheckPoints() + return case asyncFlush: skipCheckFlush = true } - return err + // nolint:nakedret + return } func (s *Syncer) saveGlobalPoint(globalLocation binlog.Location) { @@ -2053,7 +2057,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } job := newXIDJob(currentLocation, startLocation, currentLocation) - err2 = s.handleJobFunc(job) + _, err2 = s.handleJobFunc(job) case *replication.GenericEvent: if e.Header.EventType == replication.HEARTBEAT_EVENT { // flush checkpoint even if there are no real binlog events @@ -2325,9 +2329,9 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err startTime := time.Now() for i := range dmls { job := newDMLJob(jobType, sourceTable, targetTable, dmls[i], &ec) - err = s.handleJobFunc(job) - if err != nil { - return err + added2Queue, err2 := s.handleJobFunc(job) + if err2 != nil || !added2Queue { + return err2 } } metrics.DispatchBinlogDurationHistogram.WithLabelValues(jobType.String(), s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) @@ -2692,7 +2696,7 @@ func (s *Syncer) handleQueryEventNoSharding(qec *queryEventContext) error { }) job := newDDLJob(qec) - err := s.handleJobFunc(job) + _, err := s.handleJobFunc(job) if err != nil { return err } @@ -2914,7 +2918,7 @@ func (s *Syncer) handleQueryEventPessimistic(qec *queryEventContext) error { }) job := newDDLJob(qec) - err = s.handleJobFunc(job) + _, err = s.handleJobFunc(job) if err != nil { return err } @@ -3299,7 +3303,8 @@ func (s *Syncer) closeDBs() { // make newJob's sql argument empty to distinguish normal sql and skips sql. func (s *Syncer) recordSkipSQLsLocation(ec *eventContext) error { job := newSkipJob(ec) - return s.handleJobFunc(job) + _, err := s.handleJobFunc(job) + return err } // flushJobs add a flush job and wait for all jobs finished. @@ -3308,7 +3313,8 @@ func (s *Syncer) flushJobs() error { flushJobSeq := s.getFlushSeq() s.tctx.L().Info("flush all jobs", zap.Stringer("global checkpoint", s.checkpoint), zap.Int64("flush job seq", flushJobSeq)) job := newFlushJob(s.cfg.WorkerCount, flushJobSeq) - return s.handleJobFunc(job) + _, err := s.handleJobFunc(job) + return err } func (s *Syncer) reSyncBinlog(tctx tcontext.Context, location binlog.Location) error { diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index c4347e18ea6..1eac0b668fa 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -1472,7 +1472,7 @@ func (s *Syncer) mockFinishJob(jobs []*expectJob) { } } -func (s *Syncer) addJobToMemory(job *job) error { +func (s *Syncer) addJobToMemory(job *job) (bool, error) { log.L().Info("add job to memory", zap.Stringer("job", job)) switch job.tp { @@ -1512,7 +1512,7 @@ func (s *Syncer) addJobToMemory(job *job) error { } } - return nil + return true, nil } func (s *Syncer) setupMockCheckpoint(c *C, checkPointDBConn *sql.Conn, checkPointMock sqlmock.Sqlmock) { diff --git a/dm/ui/yarn.lock b/dm/ui/yarn.lock index c300ccd5c83..55fc76b6156 100644 --- a/dm/ui/yarn.lock +++ b/dm/ui/yarn.lock @@ -2572,13 +2572,20 @@ is-ci@^3.0.1: dependencies: ci-info "^3.2.0" -is-core-module@^2.2.0, is-core-module@^2.8.0: +is-core-module@^2.2.0: version "2.8.0" resolved "https://registry.yarnpkg.com/is-core-module/-/is-core-module-2.8.0.tgz#0321336c3d0925e497fd97f5d95cb114a5ccd548" integrity sha512-vd15qHsaqrRL7dtH6QNuy0ndJmRDrS9HAM1CAiSifNUFv4x1a0CCVsj18hJ1mShxIG6T2i1sO78MkP56r0nYRw== dependencies: has "^1.0.3" +is-core-module@^2.8.0: + version "2.8.1" + resolved "https://registry.yarnpkg.com/is-core-module/-/is-core-module-2.8.1.tgz#f59fdfca701d5879d0a6b100a40aa1560ce27211" + integrity sha512-SdNCUs284hr40hFTFP6l0IfZ/RSrMXF3qgoRHd3/79unUTvrFO/JoXwkGm+5J/Oe3E/b5GsnG330uUNgRpu1PA== + dependencies: + has "^1.0.3" + is-date-object@^1.0.1: version "1.0.5" resolved "https://registry.yarnpkg.com/is-date-object/-/is-date-object-1.0.5.tgz#0841d5536e724c25597bf6ea62e1bd38298df31f" @@ -3475,7 +3482,7 @@ path-key@^3.0.0, path-key@^3.1.0: resolved "https://registry.yarnpkg.com/path-key/-/path-key-3.1.1.tgz#581f6ade658cbba65a0d3380de7753295054f375" integrity sha512-ojmeN0qd+y0jszEtoY48r0Peq5dwMEkIlCOu6Q5f41lfkswXuKtYrhgoTpLnyIcHm24Uhqx+5Tqm2InSwLhE6Q== -path-parse@^1.0.6: +path-parse@^1.0.6, path-parse@^1.0.7: version "1.0.7" resolved "https://registry.yarnpkg.com/path-parse/-/path-parse-1.0.7.tgz#fbc114b60ca42b30d9daf5858e4bd68bbedb6735" integrity sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw== @@ -4212,7 +4219,16 @@ resolve-options@^1.1.0: dependencies: value-or-function "^3.0.0" -resolve@^1.1.6, resolve@^1.20.0: +resolve@^1.1.6: + version "1.21.0" + resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.21.0.tgz#b51adc97f3472e6a5cf4444d34bc9d6b9037591f" + integrity sha512-3wCbTpk5WJlyE4mSOtDLhqQmGFi0/TD9VPwmiolnk8U0wRgMEktqCXd3vy5buTO3tljvalNvKrjHEfrd2WpEKA== + dependencies: + is-core-module "^2.8.0" + path-parse "^1.0.7" + supports-preserve-symlinks-flag "^1.0.0" + +resolve@^1.20.0: version "1.20.0" resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.20.0.tgz#629a013fb3f70755d6f0b7935cc1c2c5378b1975" integrity sha512-wENBPt4ySzg4ybFQW2TT1zMQucPK95HSh/nq2CFTZVOGut2+pQvSsgtda4d26YrYcr067wjbmzOG8byDPBX63A== @@ -4378,9 +4394,9 @@ shebang-regex@^3.0.0: integrity sha512-7++dFhtcx3353uBaq8DDR4NuxBetBzC7ZQOhmTQInHEd6bSrXdiEyzCvG07Z44UYdLShWUyXt5M/yhz8ekcb1A== shelljs@^0.8.4: - version "0.8.4" - resolved "https://registry.yarnpkg.com/shelljs/-/shelljs-0.8.4.tgz#de7684feeb767f8716b326078a8a00875890e3c2" - integrity sha512-7gk3UZ9kOfPLIAbslLzyWeGiEqx9e3rxwZM0KE6EL8GlGwjym9Mrlx5/p33bWTu9YG6vcS4MBxYZDHYr5lr8BQ== + version "0.8.5" + resolved "https://registry.yarnpkg.com/shelljs/-/shelljs-0.8.5.tgz#de055408d8361bed66c669d2f000538ced8ee20c" + integrity sha512-TiwcRcrkhHvbrZbnRcFYMLl30Dfov3HKqzp5tO5b4pt6G/SezKcYhmDg15zXVBswHmctSAQKznqNW2LO5tTDow== dependencies: glob "^7.0.0" interpret "^1.0.0" @@ -4660,6 +4676,11 @@ supports-color@^9.0.2: resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-9.2.1.tgz#599dc9d45acf74c6176e0d880bab1d7d718fe891" integrity sha512-Obv7ycoCTG51N7y175StI9BlAXrmgZrFhZOb0/PyjHBher/NmsdBgbbQ1Inhq+gIhz6+7Gb+jWF2Vqi7Mf1xnQ== +supports-preserve-symlinks-flag@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/supports-preserve-symlinks-flag/-/supports-preserve-symlinks-flag-1.0.0.tgz#6eda4bd344a3c94aea376d4cc31bc77311039e09" + integrity sha512-ot0WnXS9fgdkgIcePe6RHNk1WA8+muPa6cSjeR3V8K27q9BB1rTE3R1p7Hv0z1ZyAc8s6Vvv8DIyWf681MAt0w== + svg-path-properties@^0.2.1: version "0.2.2" resolved "https://registry.yarnpkg.com/svg-path-properties/-/svg-path-properties-0.2.2.tgz#b073d81be7292eae0e233ab8a83f58dc27113296" diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index 345601fee16..471cf1bf74d 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -180,8 +180,7 @@ func TestMessageClientBasics(t *testing.T) { sender.AssertExpectations(t) // Test point 7: Interrupt the connection - grpcStream.ExpectedCalls = nil - grpcStream.Calls = nil + grpcStream.ResetMock() sender.ExpectedCalls = nil sender.Calls = nil diff --git a/pkg/p2p/mock_grpc_client.go b/pkg/p2p/mock_grpc_client.go index 7b32816875f..f9121e716d2 100644 --- a/pkg/p2p/mock_grpc_client.go +++ b/pkg/p2p/mock_grpc_client.go @@ -15,6 +15,7 @@ package p2p import ( "context" + "sync" "sync/atomic" "github.com/pingcap/tiflow/proto/p2p" @@ -22,8 +23,8 @@ import ( "google.golang.org/grpc" ) -//nolint:unused type mockSendMessageClient struct { + mu sync.Mutex mock.Mock // embeds an empty interface p2p.CDCPeerToPeer_SendMessageClient @@ -41,13 +42,24 @@ func newMockSendMessageClient(ctx context.Context) *mockSendMessageClient { } func (s *mockSendMessageClient) Send(packet *p2p.MessagePacket) error { + s.mu.Lock() + defer s.mu.Unlock() + args := s.Called(packet) atomic.AddInt32(&s.msgCount, 1) return args.Error(0) } func (s *mockSendMessageClient) Recv() (*p2p.SendMessageResponse, error) { - args := s.Called() + var args mock.Arguments + func() { + // We use a deferred Unlock in case `s.Called()` panics. + s.mu.Lock() + defer s.mu.Unlock() + + args = s.MethodCalled("Recv") + }() + if err := args.Error(1); err != nil { return nil, err } @@ -66,12 +78,18 @@ func (s *mockSendMessageClient) Context() context.Context { return s.ctx } -//nolint:unused +func (s *mockSendMessageClient) ResetMock() { + s.mu.Lock() + defer s.mu.Unlock() + + s.ExpectedCalls = nil + s.Calls = nil +} + type mockCDCPeerToPeerClient struct { mock.Mock } -//nolint:unused func (c *mockCDCPeerToPeerClient) SendMessage( ctx context.Context, opts ...grpc.CallOption, ) (p2p.CDCPeerToPeer_SendMessageClient, error) {