From 73c3ec042eb2f490198b2301261439eecb53866d Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 10 Nov 2024 19:42:38 +0800 Subject: [PATCH 1/4] update --- downstreamadapter/dispatcher/dispatcher.go | 12 +++--- .../event_dispatcher_manager.go | 42 ++++++++++++------- downstreamadapter/sink/kafka_sink.go | 14 ++++--- downstreamadapter/sink/mysql_sink.go | 10 +++-- downstreamadapter/sink/sink.go | 7 ++-- 5 files changed, 52 insertions(+), 33 deletions(-) diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index de57b85f6..a469f9834 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -110,6 +110,8 @@ type Dispatcher struct { // Dispatcher will be ready when it receives the handshake event from eventService. // If false, the dispatcher will drop the event it received. isReady atomic.Bool + + errCh chan error } func NewDispatcher( @@ -123,7 +125,8 @@ func NewDispatcher( filter filter.Filter, schemaID int64, schemaIDToDispatchers *SchemaIDToDispatchers, - syncPointInfo *syncpoint.SyncPointInfo) *Dispatcher { + syncPointInfo *syncpoint.SyncPointInfo, + errCh chan error) *Dispatcher { dispatcher := &Dispatcher{ changefeedID: changefeedID, id: id, @@ -141,6 +144,7 @@ func NewDispatcher( schemaID: schemaID, schemaIDToDispatchers: schemaIDToDispatchers, resendTaskMap: newResendTaskMap(), + errCh: errCh, } dispatcher.startTs.Store(startTs) @@ -181,8 +185,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat if action.Action == heartbeatpb.Action_Write { err := d.sink.WriteBlockEvent(pendingEvent, d.tableProgress) if err != nil { - // TODO: handle error - log.Error("write block event failed", zap.Error(err)) + d.errCh <- err return } } else { @@ -405,8 +408,7 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) { if !d.shouldBlock(event) { err := d.sink.WriteBlockEvent(event, d.tableProgress) if err != nil { - // TODO: handle error - log.Error("write block event failed", zap.Error(err)) + d.errCh <- err } if event.GetNeedAddedTables() != nil || event.GetNeedDroppedTables() != nil { message := &heartbeatpb.TableSpanBlockStatus{ diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 4d8709d4c..2c0ab32bc 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -89,6 +89,10 @@ type EventDispatcherManager struct { // TODO: changefeed update config syncPointConfig *syncpoint.SyncPointConfig + // collect the error in all the dispatchers and sink module + // report the error to the maintainer + errCh chan error + tableEventDispatcherCount prometheus.Gauge metricCreateDispatcherDuration prometheus.Observer metricCheckpointTs prometheus.Gauge @@ -112,6 +116,7 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID, statusesChan: make(chan *heartbeatpb.TableSpanStatus, 8192), blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), dispatcherActionChan: make(chan common.DispatcherAction, 1024*1024), + errCh: make(chan error, 16), cancel: cancel, config: cfConfig, schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(), @@ -162,21 +167,26 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID, manager.wg.Add(1) go func() { defer manager.wg.Done() - err := manager.sink.Run() - if err != nil && errors.Cause(err) != context.Canceled { - log.Error("Sink Run Meets Error", - zap.String("changefeedID", manager.changefeedID.String()), - zap.Error(err)) - // report error to maintainer - var message heartbeatpb.HeartBeatRequest - message.ChangefeedID = manager.changefeedID.ToPB() - message.Err = &heartbeatpb.RunningError{ - Time: time.Now().String(), - Node: appcontext.GetID(), - Code: string(apperror.ErrorCode(err)), - Message: err.Error(), + select { + case <-ctx.Done(): + return + case err := <-manager.errCh: + if errors.Cause(err) != context.Canceled { + log.Error("Event Dispatcher Manager Meets Error", + zap.String("changefeedID", manager.changefeedID.String()), + zap.Error(err)) + + // report error to maintainer + var message heartbeatpb.HeartBeatRequest + message.ChangefeedID = manager.changefeedID.ToPB() + message.Err = &heartbeatpb.RunningError{ + Time: time.Now().String(), + Node: appcontext.GetID(), + Code: string(apperror.ErrorCode(err)), + Message: err.Error(), + } + manager.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: manager.GetMaintainerID(), Request: &message}) } - manager.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: manager.GetMaintainerID(), Request: &message}) } }() @@ -200,7 +210,7 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID, } func (e *EventDispatcherManager) InitSink(ctx context.Context) error { - sink, err := sink.NewSink(ctx, e.config, e.changefeedID) + sink, err := sink.NewSink(ctx, e.config, e.changefeedID, e.errCh) if err != nil { return err } @@ -299,7 +309,7 @@ func (e *EventDispatcherManager) NewDispatcher(id common.DispatcherID, tableSpan e.changefeedID, id, tableSpan, e.sink, startTs, e.dispatcherActionChan, e.blockStatusesChan, - e.filter, schemaID, e.schemaIDToDispatchers, &syncPointInfo) + e.filter, schemaID, e.schemaIDToDispatchers, &syncPointInfo, e.errCh) // lazy create heartBeatTask when event dispatcher manager has dispatchers if e.heartBeatTask == nil { diff --git a/downstreamadapter/sink/kafka_sink.go b/downstreamadapter/sink/kafka_sink.go index 5ad61e7f2..ec2422a3c 100644 --- a/downstreamadapter/sink/kafka_sink.go +++ b/downstreamadapter/sink/kafka_sink.go @@ -55,13 +55,14 @@ type KafkaSink struct { statistics *metrics.Statistics errgroup *errgroup.Group + errCh chan error } func (s *KafkaSink) SinkType() SinkType { return KafkaSinkType } -func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig) (*KafkaSink, error) { +func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig, errCh chan error) (*KafkaSink, error) { errGroup, ctx := errgroup.WithContext(ctx) topic, err := helper.GetTopic(sinkURI) if err != nil { @@ -157,7 +158,7 @@ func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI ddlProducer := producer.NewKafkaDDLProducer(ctx, changefeedID, ddlSyncProducer) ddlWorker := worker.NewKafkaDDLWorker(ctx, changefeedID, protocol, ddlProducer, encoder, eventRouter, topicManager, statistics, errGroup) - return &KafkaSink{ + sink := &KafkaSink{ changefeedID: changefeedID, dmlWorker: dmlWorker, ddlWorker: ddlWorker, @@ -165,13 +166,16 @@ func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI topicManager: topicManager, statistics: statistics, errgroup: errGroup, - }, nil + errCh: errCh, + } + go sink.run() + return sink, nil } -func (s *KafkaSink) Run() error { +func (s *KafkaSink) run() { s.dmlWorker.Run() s.ddlWorker.Run() - return s.errgroup.Wait() + s.errCh <- s.errgroup.Wait() } func (s *KafkaSink) AddDMLEvent(event *commonEvent.DMLEvent, tableProgress *types.TableProgress) { diff --git a/downstreamadapter/sink/mysql_sink.go b/downstreamadapter/sink/mysql_sink.go index 2188cfee7..b88be0c63 100644 --- a/downstreamadapter/sink/mysql_sink.go +++ b/downstreamadapter/sink/mysql_sink.go @@ -47,9 +47,10 @@ type MysqlSink struct { db *sql.DB errgroup *errgroup.Group statistics *metrics.Statistics + errCh chan error } -func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerCount int, config *config.ChangefeedConfig, sinkURI *url.URL) (*MysqlSink, error) { +func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerCount int, config *config.ChangefeedConfig, sinkURI *url.URL, errCh chan error) (*MysqlSink, error) { errgroup, ctx := errgroup.WithContext(ctx) mysqlSink := MysqlSink{ changefeedID: changefeedID, @@ -57,6 +58,7 @@ func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerC workerCount: workerCount, errgroup: errgroup, statistics: metrics.NewStatistics(changefeedID, "TxnSink"), + errCh: errCh, } cfg, db, err := mysql.NewMysqlConfigAndDB(ctx, changefeedID, sinkURI) @@ -71,14 +73,16 @@ func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerC mysqlSink.ddlWorker = worker.NewMysqlDDLWorker(ctx, db, cfg, mysqlSink.changefeedID, errgroup, mysqlSink.statistics) mysqlSink.db = db + mysqlSink.run() + return &mysqlSink, nil } -func (s *MysqlSink) Run() error { +func (s *MysqlSink) run() { for i := 0; i < s.workerCount; i++ { s.dmlWorker[i].Run() } - return s.errgroup.Wait() + s.errCh <- s.errgroup.Wait() } func (s *MysqlSink) SinkType() SinkType { diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index eaa1556c1..22f4d0503 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -42,10 +42,9 @@ type Sink interface { CheckStartTs(tableId int64, startTs uint64) (int64, error) Close(removeDDLTsItem bool) error SinkType() SinkType - Run() error } -func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID common.ChangeFeedID) (Sink, error) { +func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID common.ChangeFeedID, errCh chan error) (Sink, error) { sinkURI, err := url.Parse(config.SinkURI) if err != nil { return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) @@ -53,9 +52,9 @@ func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID scheme := sink.GetScheme(sinkURI) switch scheme { case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: - return NewMysqlSink(ctx, changefeedID, 16, config, sinkURI) + return NewMysqlSink(ctx, changefeedID, 16, config, sinkURI, errCh) case sink.KafkaScheme, sink.KafkaSSLScheme: - return NewKafkaSink(ctx, changefeedID, sinkURI, config.SinkConfig) + return NewKafkaSink(ctx, changefeedID, sinkURI, config.SinkConfig, errCh) } return nil, nil } From 54365c920420202a9b4e215805235506a225d596 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 10 Nov 2024 20:29:54 +0800 Subject: [PATCH 2/4] update --- .../event_dispatcher_manager.go | 1 + .../dispatcher_orchestrator.go | 3 ++ downstreamadapter/sink/mysql_sink.go | 2 +- pkg/sink/mysql/mysql_writer.go | 30 +++++-------------- 4 files changed, 12 insertions(+), 24 deletions(-) diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 5ac2652be..069af28d7 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -108,6 +108,7 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID, tableTriggerEventDispatcherID *heartbeatpb.DispatcherID, startTs uint64, maintainerID node.ID) (*EventDispatcherManager, uint64, error) { + log.Info("hyy NewEventDispatcherManager") ctx, cancel := context.WithCancel(context.Background()) manager := &EventDispatcherManager{ dispatcherMap: newDispatcherMap(), diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index e974c86f6..edf2e935b 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -61,6 +61,7 @@ func (m *DispatcherOrchestrator) RecvMaintainerRequest(_ context.Context, msg *m } func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *heartbeatpb.MaintainerBootstrapRequest) error { + log.Info("hyy handle add dispatcher manager request", zap.Any("request", req)) cfId := common.NewChangefeedIDFromPB(req.ChangefeedID) manager, exists := m.dispatcherManagers[cfId] var err error @@ -140,6 +141,8 @@ func createBootstrapResponse(changefeedID *heartbeatpb.ChangefeedID, manager *di }) }) + log.Info("hyy bootstrap response", zap.Any("response", response)) + return response } diff --git a/downstreamadapter/sink/mysql_sink.go b/downstreamadapter/sink/mysql_sink.go index 2da5a5422..f3f9e1fec 100644 --- a/downstreamadapter/sink/mysql_sink.go +++ b/downstreamadapter/sink/mysql_sink.go @@ -73,7 +73,7 @@ func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerC mysqlSink.ddlWorker = worker.NewMysqlDDLWorker(ctx, db, cfg, mysqlSink.changefeedID, errgroup, mysqlSink.statistics) mysqlSink.db = db - mysqlSink.run() + go mysqlSink.run() return &mysqlSink, nil } diff --git a/pkg/sink/mysql/mysql_writer.go b/pkg/sink/mysql/mysql_writer.go index fce30fdce..1cd4e0524 100644 --- a/pkg/sink/mysql/mysql_writer.go +++ b/pkg/sink/mysql/mysql_writer.go @@ -95,8 +95,7 @@ func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error { if !(event.TiDBOnly && !w.cfg.IsTiDB) { err := w.execDDLWithMaxRetries(event) if err != nil { - log.Error("exec ddl failed", zap.Error(err)) - return err + return errors.Trace(err) } } @@ -130,11 +129,7 @@ func (w *MysqlWriter) FlushDDLTs(event *commonEvent.DDLEvent) error { } err := w.SendDDLTs(event) - if err != nil { - log.Error("send ddl ts failed", zap.Error(err)) - return err - } - return nil + return errors.Trace(err) } func (w *MysqlWriter) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) error { @@ -142,15 +137,13 @@ func (w *MysqlWriter) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) err // create sync point table if not exist err := w.CreateSyncTable() if err != nil { - log.Error("create sync table failed", zap.Error(err)) - return err + return errors.Trace(err) } w.syncPointTableInit = true } err := w.SendSyncPointEvent(event) if err != nil { - log.Error("send syncpoint event failed", zap.Error(err)) - return err + return errors.Trace(err) } for _, callback := range event.PostTxnFlushed { callback() @@ -161,7 +154,6 @@ func (w *MysqlWriter) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) err func (w *MysqlWriter) SendSyncPointEvent(event *commonEvent.SyncPointEvent) error { tx, err := w.db.BeginTx(w.ctx, nil) if err != nil { - log.Error("sync table: begin Tx fail", zap.Error(err)) return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "sync table: begin Tx fail;")) } row := tx.QueryRow("select @@tidb_current_ts") @@ -252,7 +244,6 @@ func (w *MysqlWriter) SendSyncPointEvent(event *commonEvent.SyncPointEvent) erro func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error { tx, err := w.db.BeginTx(w.ctx, nil) if err != nil { - log.Error("ddl ts table: begin Tx fail", zap.Error(err)) return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "ddl ts table: begin Tx fail;")) } @@ -684,7 +675,6 @@ func (w *MysqlWriter) execDDL(event *commonEvent.DDLEvent) error { } if err = tx.Commit(); err != nil { - log.Error("Failed to exec DDL", zap.String("sql", event.GetDDLQuery()), zap.Error(err)) return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", event.GetDDLQuery()))) } @@ -729,7 +719,6 @@ func (w *MysqlWriter) Flush(events []*commonEvent.DMLEvent, workerNum int) error if !w.cfg.DryRun { if err := w.execDMLWithMaxRetries(dmls); err != nil { - log.Error("execute DMLs failed", zap.Error(err)) return errors.Trace(err) } } else { @@ -835,10 +824,7 @@ func (w *MysqlWriter) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error { if len(dmls.sqls) != len(dmls.values) { - log.Error("unexpected number of sqls and values", - zap.Strings("sqls", dmls.sqls), - zap.Any("values", dmls.values)) - return cerror.ErrUnexpected.FastGenByArgs("unexpected number of sqls and values") + return cerror.ErrUnexpected.FastGenByArgs(fmt.Sprintf("unexpected number of sqls and values, sqls is %s, values is %s", dmls.sqls, dmls.values)) } // approximateSize is multiplied by 2 because in extreme circustumas, every @@ -851,8 +837,7 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error { tryExec := func() (int, int64, error) { tx, err := w.db.BeginTx(w.ctx, nil) if err != nil { - log.Error("BeginTx", zap.Error(err)) - return 0, 0, err + return 0, 0, errors.Trace(err) } // Set session variables first and then execute the transaction. @@ -890,8 +875,7 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error { return retry.Do(w.ctx, func() error { err := w.statistics.RecordBatchExecution(tryExec) if err != nil { - log.Error("RecordBatchExecution", zap.Error(err)) - return err + return errors.Trace(err) } return nil }, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()), From d68484b805318e9367931a5566de6fa65b23baa7 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 10 Nov 2024 20:35:04 +0800 Subject: [PATCH 3/4] update --- .../dispatcherorchestrator/dispatcher_orchestrator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index edf2e935b..e974c86f6 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -61,7 +61,6 @@ func (m *DispatcherOrchestrator) RecvMaintainerRequest(_ context.Context, msg *m } func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *heartbeatpb.MaintainerBootstrapRequest) error { - log.Info("hyy handle add dispatcher manager request", zap.Any("request", req)) cfId := common.NewChangefeedIDFromPB(req.ChangefeedID) manager, exists := m.dispatcherManagers[cfId] var err error @@ -141,8 +140,6 @@ func createBootstrapResponse(changefeedID *heartbeatpb.ChangefeedID, manager *di }) }) - log.Info("hyy bootstrap response", zap.Any("response", response)) - return response } From dc8ba107c5ffe8b7d2484baee128d0d9852a0676 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sun, 10 Nov 2024 20:48:06 +0800 Subject: [PATCH 4/4] update --- downstreamadapter/dispatchermanager/event_dispatcher_manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 069af28d7..5ac2652be 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -108,7 +108,6 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID, tableTriggerEventDispatcherID *heartbeatpb.DispatcherID, startTs uint64, maintainerID node.ID) (*EventDispatcherManager, uint64, error) { - log.Info("hyy NewEventDispatcherManager") ctx, cancel := context.WithCancel(context.Background()) manager := &EventDispatcherManager{ dispatcherMap: newDispatcherMap(),