From b2d35d42503353abd3a5b9c65a56337f118a725f Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 19 Apr 2022 19:02:04 +0800 Subject: [PATCH] This is an automated cherry-pick of #5186 Signed-off-by: ti-chi-bot --- cdc/owner/ddl_sink.go | 271 ++++++++++++++ cdc/processor/pipeline/table_actor.go | 505 ++++++++++++++++++++++++++ cdc/sink/manager.go | 15 +- cdc/sink/mq.go | 5 +- 4 files changed, 789 insertions(+), 7 deletions(-) create mode 100644 cdc/owner/ddl_sink.go create mode 100644 cdc/processor/pipeline/table_actor.go diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go new file mode 100644 index 00000000000..4739b03b8ee --- /dev/null +++ b/cdc/owner/ddl_sink.go @@ -0,0 +1,271 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink" + cdcContext "github.com/pingcap/tiflow/pkg/context" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +const ( + defaultErrChSize = 1024 +) + +// DDLSink is a wrapper of the `Sink` interface for the owner +// DDLSink should send `DDLEvent` and `CheckpointTs` to downstream sink, +// If `SyncPointEnabled`, also send `syncPoint` to downstream. +type DDLSink interface { + // run the DDLSink + run(ctx cdcContext.Context, id model.ChangeFeedID, info *model.ChangeFeedInfo) + // emitCheckpointTs emits the checkpoint Ts to downstream data source + // this function will return after recording the checkpointTs specified in memory immediately + // and the recorded checkpointTs will be sent and updated to downstream data source every second + emitCheckpointTs(ts uint64, tableNames []model.TableName) + // emitDDLEvent emits DDL event and return true if the DDL is executed + // the DDL event will be sent to another goroutine and execute to downstream + // the caller of this function can call again and again until a true returned + emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) + emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error + // close the sink, cancel running goroutine. + close(ctx context.Context) error +} + +type ddlSinkImpl struct { + lastSyncPoint model.Ts + syncPointStore sink.SyncpointStore + + // It is used to record the checkpointTs and the names of the table at that time. + mu struct { + sync.Mutex + checkpointTs model.Ts + currentTableNames []model.TableName + } + ddlFinishedTs model.Ts + ddlSentTs model.Ts + + ddlCh chan *model.DDLEvent + errCh chan error + + sink sink.Sink + // `sinkInitHandler` can be helpful in unit testing. + sinkInitHandler ddlSinkInitHandler + + // cancel would be used to cancel the goroutine start by `run` + cancel context.CancelFunc + wg sync.WaitGroup +} + +func newDDLSink() DDLSink { + return &ddlSinkImpl{ + ddlCh: make(chan *model.DDLEvent, 1), + errCh: make(chan error, defaultErrChSize), + sinkInitHandler: ddlSinkInitializer, + cancel: func() {}, + } +} + +type ddlSinkInitHandler func(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error + +func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error { + filter, err := filter.NewFilter(info.Config) + if err != nil { + return errors.Trace(err) + } + + stdCtx := util.PutChangefeedIDInCtx(ctx, id) + stdCtx = util.PutRoleInCtx(stdCtx, util.RoleOwner) + s, err := sink.New(stdCtx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh) + if err != nil { + return errors.Trace(err) + } + a.sink = s + + if !info.SyncPointEnabled { + return nil + } + syncPointStore, err := sink.NewSyncpointStore(stdCtx, id, info.SinkURI) + if err != nil { + return errors.Trace(err) + } + a.syncPointStore = syncPointStore + + if err := a.syncPointStore.CreateSynctable(stdCtx); err != nil { + return errors.Trace(err) + } + return nil +} + +func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *model.ChangeFeedInfo) { + ctx, cancel := cdcContext.WithCancel(ctx) + s.cancel = cancel + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + start := time.Now() + if err := s.sinkInitHandler(ctx, s, id, info); err != nil { + log.Warn("ddl sink initialize failed", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Duration("duration", time.Since(start))) + ctx.Throw(err) + return + } + log.Info("ddl sink initialized, start processing...", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Duration("duration", time.Since(start))) + + // TODO make the tick duration configurable + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var lastCheckpointTs model.Ts + for { + select { + case <-ctx.Done(): + return + case err := <-s.errCh: + ctx.Throw(err) + return + default: + } + // `ticker.C` and `ddlCh` may can be triggered at the same time, it + // does not matter which one emit first, since TiCDC allow DDL with + // CommitTs equal to the last CheckpointTs be emitted later. + select { + case <-ctx.Done(): + return + case err := <-s.errCh: + ctx.Throw(err) + return + case <-ticker.C: + s.mu.Lock() + checkpointTs := s.mu.checkpointTs + if checkpointTs == 0 || checkpointTs <= lastCheckpointTs { + s.mu.Unlock() + continue + } + tables := s.mu.currentTableNames + s.mu.Unlock() + lastCheckpointTs = checkpointTs + if err := s.sink.EmitCheckpointTs(ctx, checkpointTs, tables); err != nil { + ctx.Throw(errors.Trace(err)) + return + } + case ddl := <-s.ddlCh: + log.Info("begin emit ddl event", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Any("DDL", ddl)) + err := s.sink.EmitDDLEvent(ctx, ddl) + failpoint.Inject("InjectChangefeedDDLError", func() { + err = cerror.ErrExecDDLFailed.GenWithStackByArgs() + }) + if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) { + log.Info("Execute DDL succeeded", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Bool("ignored", err != nil), + zap.Any("ddl", ddl)) + atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs) + continue + } + // If DDL executing failed, and the error can not be ignored, + // throw an error and pause the changefeed + log.Error("Execute DDL failed", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Error(err), + zap.Any("ddl", ddl)) + ctx.Throw(errors.Trace(err)) + return + } + } + }() +} + +func (s *ddlSinkImpl) emitCheckpointTs(ts uint64, tableNames []model.TableName) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.checkpointTs = ts + s.mu.currentTableNames = tableNames +} + +func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { + ddlFinishedTs := atomic.LoadUint64(&s.ddlFinishedTs) + if ddl.CommitTs <= ddlFinishedTs { + // the DDL event is executed successfully, and done is true + log.Info("ddl already executed", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Uint64("ddlFinishedTs", ddlFinishedTs), zap.Any("DDL", ddl)) + return true, nil + } + if ddl.CommitTs <= s.ddlSentTs { + log.Debug("ddl is not finished yet", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Uint64("ddlSentTs", s.ddlSentTs), zap.Any("DDL", ddl)) + // the DDL event is executing and not finished yet, return false + return false, nil + } + select { + case <-ctx.Done(): + return false, errors.Trace(ctx.Err()) + case s.ddlCh <- ddl: + s.ddlSentTs = ddl.CommitTs + log.Info("ddl is sent", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Uint64("ddlSentTs", s.ddlSentTs)) + default: + log.Warn("ddl chan full, send it the next round", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Uint64("ddlSentTs", s.ddlSentTs), + zap.Uint64("ddlFinishedTs", s.ddlFinishedTs), zap.Any("DDL", ddl)) + // if this hit, we think that ddlCh is full, + // just return false and send the ddl in the next round. + } + return false, nil +} + +func (s *ddlSinkImpl) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error { + if checkpointTs == s.lastSyncPoint { + return nil + } + s.lastSyncPoint = checkpointTs + // TODO implement async sink syncPoint + return s.syncPointStore.SinkSyncpoint(ctx, ctx.ChangefeedVars().ID, checkpointTs) +} + +func (s *ddlSinkImpl) close(ctx context.Context) (err error) { + s.cancel() + if s.sink != nil { + err = s.sink.Close(ctx) + } + if s.syncPointStore != nil { + err = s.syncPointStore.Close() + } + s.wg.Wait() + if err != nil && errors.Cause(err) != context.Canceled { + return err + } + return nil +} diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go new file mode 100644 index 00000000000..f0f424e4c1d --- /dev/null +++ b/cdc/processor/pipeline/table_actor.go @@ -0,0 +1,505 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" + "github.com/pingcap/tiflow/cdc/sink" + "github.com/pingcap/tiflow/cdc/sink/common" + "github.com/pingcap/tiflow/pkg/actor" + "github.com/pingcap/tiflow/pkg/actor/message" + serverConfig "github.com/pingcap/tiflow/pkg/config" + cdcContext "github.com/pingcap/tiflow/pkg/context" + cerror "github.com/pingcap/tiflow/pkg/errors" + pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var ( + _ TablePipeline = (*tableActor)(nil) + _ actor.Actor[pmessage.Message] = (*tableActor)(nil) + stopped = uint32(1) +) + +const sinkFlushInterval = 500 * time.Millisecond + +type tableActor struct { + actorID actor.ID + mb actor.Mailbox[pmessage.Message] + router *actor.Router[pmessage.Message] + // all goroutines in tableActor should be spawned from this wg + wg *errgroup.Group + // backend mounter + mounter entry.Mounter + // backend tableSink + tableSink sink.Sink + + pullerNode *pullerNode + sortNode *sorterNode + sinkNode *sinkNode + // contains all nodes except pullerNode + nodes []*ActorNode + + // states of table actor + started bool + stopped uint32 + stopLock sync.Mutex + // TODO: try to reduce these config fields below in the future + tableID int64 + markTableID int64 + cyclicEnabled bool + targetTs model.Ts + memoryQuota uint64 + replicaInfo *model.TableReplicaInfo + replicaConfig *serverConfig.ReplicaConfig + changefeedVars *cdcContext.ChangefeedVars + globalVars *cdcContext.GlobalVars + // these fields below are used in logs and metrics only + changefeedID string + tableName string + + // use to report error to processor + reportErr func(error) + // stopCtx use to stop table actor + stopCtx context.Context + // cancel use to cancel all goroutines spawned from table actor + cancel context.CancelFunc + + lastFlushSinkTime time.Time +} + +// NewTableActor creates a table actor and starts it. +func NewTableActor(cdcCtx cdcContext.Context, + mounter entry.Mounter, + tableID model.TableID, + tableName string, + replicaInfo *model.TableReplicaInfo, + sink sink.Sink, + targetTs model.Ts, +) (TablePipeline, error) { + config := cdcCtx.ChangefeedVars().Info.Config + cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled() + changefeedVars := cdcCtx.ChangefeedVars() + globalVars := cdcCtx.GlobalVars() + + actorID := globalVars.TableActorSystem.ActorID() + mb := actor.NewMailbox[pmessage.Message](actorID, defaultOutputChannelSize) + // Cancel should be able to release all sub-goroutines in this actor. + ctx, cancel := context.WithCancel(cdcCtx) + // All sub-goroutines should be spawn in this wait group. + wg, cctx := errgroup.WithContext(ctx) + + table := &tableActor{ + // all errors in table actor will be reported to processor + reportErr: cdcCtx.Throw, + mb: mb, + wg: wg, + cancel: cancel, + + tableID: tableID, + markTableID: replicaInfo.MarkTableID, + tableName: tableName, + cyclicEnabled: cyclicEnabled, + memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, + mounter: mounter, + replicaInfo: replicaInfo, + replicaConfig: config, + tableSink: sink, + targetTs: targetTs, + started: false, + + changefeedID: changefeedVars.ID, + changefeedVars: changefeedVars, + globalVars: globalVars, + router: globalVars.TableActorSystem.Router(), + actorID: actorID, + + stopCtx: cctx, + } + + startTime := time.Now() + log.Info("table actor starting", + zap.String("changefeed", table.changefeedID), + zap.String("tableName", tableName), + zap.Int64("tableID", tableID)) + if err := table.start(cctx); err != nil { + table.stop(err) + return nil, errors.Trace(err) + } + err := globalVars.TableActorSystem.System().Spawn(mb, table) + if err != nil { + return nil, errors.Trace(err) + } + log.Info("table actor started", + zap.String("changefeed", table.changefeedID), + zap.String("tableName", tableName), + zap.Int64("tableID", tableID), + zap.Duration("duration", time.Since(startTime))) + return table, nil +} + +// OnClose implements Actor interface. +// TODO: implements table actor stop here. +func (t *tableActor) OnClose() { +} + +func (t *tableActor) Poll(ctx context.Context, msgs []message.Message[pmessage.Message]) bool { + for i := range msgs { + if atomic.LoadUint32(&t.stopped) == stopped { + // No need to handle remaining messages. + return false + } + + var err error + switch msgs[i].Tp { + case message.TypeValue: + switch msgs[i].Value.Tp { + case pmessage.MessageTypeBarrier: + err = t.handleBarrierMsg(ctx, msgs[i].Value.BarrierTs) + case pmessage.MessageTypeTick: + err = t.handleTickMsg(ctx) + } + case message.TypeStop: + t.handleStopMsg(ctx) + } + if err != nil { + log.Error("failed to process message, stop table actor ", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Any("message", msgs[i]), + zap.Error(err)) + t.handleError(err) + break + } + + // process message for each node, pull message from parent node and then send it to next node + if err := t.handleDataMsg(ctx); err != nil { + log.Error("failed to process message, stop table actor ", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), zap.Error(err)) + t.handleError(err) + break + } + } + if atomic.LoadUint32(&t.stopped) == stopped { + log.Error("table actor removed", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID)) + return false + } + return true +} + +func (t *tableActor) handleDataMsg(ctx context.Context) error { + for _, n := range t.nodes { + if err := n.TryRun(ctx); err != nil { + return err + } + } + return nil +} + +func (t *tableActor) handleBarrierMsg(ctx context.Context, barrierTs model.Ts) error { + t.sortNode.updateBarrierTs(barrierTs) + return t.sinkNode.updateBarrierTs(ctx, barrierTs) +} + +func (t *tableActor) handleTickMsg(ctx context.Context) error { + // tick message flush the raw event to sink, follow the old pipeline implementation, batch flush the events every 500ms + if time.Since(t.lastFlushSinkTime) > sinkFlushInterval { + _, err := t.sinkNode.HandleMessage(ctx, pmessage.TickMessage()) + if err != nil { + return err + } + t.lastFlushSinkTime = time.Now() + } + return nil +} + +func (t *tableActor) handleStopMsg(ctx context.Context) { + // async stops sinkNode and tableSink + go func() { + _, err := t.sinkNode.HandleMessage(ctx, + pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), + ) + if err != nil { + t.handleError(err) + } + }() +} + +func (t *tableActor) start(sdtTableContext context.Context) error { + if t.started { + log.Panic("start an already started table", + zap.String("changefeedID", t.changefeedID), + zap.Int64("tableID", t.tableID), + zap.String("tableName", t.tableName)) + } + log.Debug("creating table flow controller", + zap.String("changefeedID", t.changefeedID), + zap.Int64("tableID", t.tableID), + zap.String("tableName", t.tableName), + zap.Uint64("quota", t.memoryQuota)) + + flowController := common.NewTableFlowController(t.memoryQuota) + sorterNode := newSorterNode(t.tableName, t.tableID, + t.replicaInfo.StartTs, flowController, + t.mounter, t.replicaConfig, + ) + t.sortNode = sorterNode + sortActorNodeContext := newContext(sdtTableContext, t.tableName, + t.globalVars.TableActorSystem.Router(), + t.actorID, t.changefeedVars, t.globalVars, t.reportErr) + if err := startSorter(t, sortActorNodeContext); err != nil { + log.Error("sorter fails to start", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + return err + } + + pullerNode := newPullerNode(t.tableID, t.replicaInfo, t.tableName, t.changefeedVars.ID) + pullerActorNodeContext := newContext(sdtTableContext, + t.tableName, + t.globalVars.TableActorSystem.Router(), + t.actorID, t.changefeedVars, t.globalVars, t.reportErr) + t.pullerNode = pullerNode + if err := startPuller(t, pullerActorNodeContext); err != nil { + log.Error("puller fails to start", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + return err + } + + messageFetchFunc, err := t.getSinkAsyncMessageHolder(sdtTableContext, sortActorNodeContext) + if err != nil { + return errors.Trace(err) + } + + actorSinkNode := newSinkNode(t.tableID, t.tableSink, + t.replicaInfo.StartTs, + t.targetTs, flowController) + actorSinkNode.initWithReplicaConfig(true, t.replicaConfig) + t.sinkNode = actorSinkNode + + // construct sink actor node, it gets message from sortNode or cyclicNode + var messageProcessFunc asyncMessageProcessorFunc = func( + ctx context.Context, msg pmessage.Message, + ) (bool, error) { + return actorSinkNode.HandleMessage(sdtTableContext, msg) + } + t.nodes = append(t.nodes, NewActorNode(messageFetchFunc, messageProcessFunc)) + + t.started = true + log.Info("table actor is started", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID)) + return nil +} + +func (t *tableActor) getSinkAsyncMessageHolder( + sdtTableContext context.Context, + sortActorNodeContext *actorNodeContext) (AsyncMessageHolder, error, +) { + var messageFetchFunc asyncMessageHolderFunc = func() *pmessage.Message { + return sortActorNodeContext.tryGetProcessedMessage() + } + // check if cyclic feature is enabled + if t.cyclicEnabled { + cyclicNode := newCyclicMarkNode(t.markTableID) + cyclicActorNodeContext := newCyclicNodeContext( + newContext(sdtTableContext, t.tableName, + t.globalVars.TableActorSystem.Router(), + t.actorID, t.changefeedVars, + t.globalVars, t.reportErr)) + if err := cyclicNode.Init(cyclicActorNodeContext); err != nil { + log.Error("failed to start cyclic node", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + return nil, err + } + + // construct cyclic actor node if it's enabled, it gets message from sortNode + var messageProcessFunc asyncMessageProcessorFunc = func( + ctx context.Context, msg pmessage.Message, + ) (bool, error) { + return cyclicNode.TryHandleDataMessage(cyclicActorNodeContext, msg) + } + t.nodes = append(t.nodes, NewActorNode(messageFetchFunc, messageProcessFunc)) + messageFetchFunc = func() *pmessage.Message { + return cyclicActorNodeContext.tryGetProcessedMessage() + } + } + return messageFetchFunc, nil +} + +// stop will set this table actor state to stopped and releases all goroutines spawned +// from this table actor +func (t *tableActor) stop(err error) { + log.Info("table actor begin to stop....", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName)) + t.stopLock.Lock() + defer t.stopLock.Unlock() + if atomic.LoadUint32(&t.stopped) == stopped { + log.Info("table actor is already stopped", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName)) + return + } + atomic.StoreUint32(&t.stopped, stopped) + if t.sortNode != nil { + // releaseResource will send a message to sorter router + t.sortNode.releaseResource(t.stopCtx, t.changefeedID) + } + t.cancel() + if t.sinkNode != nil { + if err := t.sinkNode.releaseResource(t.stopCtx); err != nil { + log.Warn("close sink failed", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName), + zap.Error(err)) + } + } + log.Info("table actor stopped", + zap.String("changefeed", t.changefeedID), + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) +} + +// handleError stops the table actor at first and then reports the error to processor +func (t *tableActor) handleError(err error) { + t.stop(err) + if !cerror.ErrTableProcessorStoppedSafely.Equal(err) { + t.reportErr(err) + } +} + +// ============ Implement TablePipline, must be threadsafe ============ + +// ResolvedTs returns the resolved ts in this table pipeline +func (t *tableActor) ResolvedTs() model.Ts { + // TODO: after TiCDC introduces p2p based resolved ts mechanism, TiCDC nodes + // will be able to cooperate replication status directly. Then we will add + // another replication barrier for consistent replication instead of reusing + // the global resolved-ts. + if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) { + return t.sinkNode.ResolvedTs() + } + return t.sortNode.ResolvedTs() +} + +// CheckpointTs returns the checkpoint ts in this table pipeline +func (t *tableActor) CheckpointTs() model.Ts { + return t.sinkNode.CheckpointTs() +} + +// UpdateBarrierTs updates the barrier ts in this table pipeline +func (t *tableActor) UpdateBarrierTs(ts model.Ts) { + msg := pmessage.BarrierMessage(ts) + err := t.router.Send(t.actorID, message.ValueMessage(msg)) + if err != nil { + log.Warn("send fails", + zap.Reflect("msg", msg), + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + } +} + +// AsyncStop tells the pipeline to stop, and returns true if the pipeline is already stopped. +func (t *tableActor) AsyncStop(targetTs model.Ts) bool { + // TypeStop stop the sinkNode only ,the processor stop the sink to release some resource + // and then stop the whole table pipeline by call Cancel + msg := message.StopMessage[pmessage.Message]() + err := t.router.Send(t.actorID, msg) + log.Info("send async stop signal to table", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Uint64("targetTs", targetTs)) + if err != nil { + if cerror.ErrMailboxFull.Equal(err) { + return false + } + if cerror.ErrActorNotFound.Equal(err) || cerror.ErrActorStopped.Equal(err) { + return true + } + log.Panic("send fails", zap.Reflect("msg", msg), zap.Error(err)) + } + return true +} + +// Workload returns the workload of this table pipeline +func (t *tableActor) Workload() model.WorkloadInfo { + // We temporarily set the value to constant 1 + return workload +} + +// Status returns the status of this table pipeline +func (t *tableActor) Status() TableStatus { + return t.sinkNode.Status() +} + +// ID returns the ID of source table and mark table +func (t *tableActor) ID() (tableID, markTableID int64) { + return t.tableID, t.markTableID +} + +// Name returns the quoted schema and table name +func (t *tableActor) Name() string { + return t.tableName +} + +// Cancel stops this table pipeline immediately and destroy all resources +// created by this table pipeline +func (t *tableActor) Cancel() { + // cancel wait group, release resource and mark the status as stopped + t.stop(nil) + // actor is closed, tick actor to remove this actor router + msg := pmessage.TickMessage() + if err := t.router.Send(t.mb.ID(), message.ValueMessage(msg)); err != nil { + log.Warn("fails to send Stop message", + zap.String("tableName", t.tableName), + zap.Int64("tableID", t.tableID), + zap.Error(err)) + } +} + +// Wait waits for table pipeline destroyed +func (t *tableActor) Wait() { + _ = t.wg.Wait() +} + +// for ut +var startPuller = func(t *tableActor, ctx *actorNodeContext) error { + return t.pullerNode.start(ctx, t.wg, true, t.sortNode) +} + +var startSorter = func(t *tableActor, ctx *actorNodeContext) error { + return t.sortNode.start(ctx, true, t.wg, t.actorID, t.router) +} diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 07ad794f6c4..a671aec6705 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -19,7 +19,6 @@ import ( "sort" "sync" "sync/atomic" - "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -87,6 +86,7 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) // Close closes the Sink manager and backend Sink, this method can be reentrantly called func (m *Manager) Close(ctx context.Context) error { +<<<<<<< HEAD tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID) if m.backendSink != nil { log.Info("sinkManager try close bufSink", @@ -94,13 +94,18 @@ func (m *Manager) Close(ctx context.Context) error { start := time.Now() if err := m.backendSink.Close(ctx); err != nil { log.Info("close bufSink failed", +======= + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() + tableSinkTotalRowsCountCounter.DeleteLabelValues(m.changefeedID) + if m.bufSink != nil { + if err := m.bufSink.Close(ctx); err != nil && errors.Cause(err) != context.Canceled { + log.Warn("close bufSink failed", +>>>>>>> f80a34f63 (sink(ticdc): asynchronously close the mq producers (#5186)) zap.String("changefeed", m.changefeedID), - zap.Duration("duration", time.Since(start))) + zap.Error(err)) return err } - log.Info("close bufSink success", - zap.String("changefeed", m.changefeedID), - zap.Duration("duration", time.Since(start))) } return nil } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index deb13f32988..0f561406031 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -265,9 +265,10 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { return errors.Trace(err) } +// Close the producer asynchronously, does not care closed successfully or not. func (k *mqSink) Close(ctx context.Context) error { - err := k.mqProducer.Close() - return errors.Trace(err) + go k.mqProducer.Close() + return nil } func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error {