Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error handling for dispatchers #492

Merged
merged 5 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type Dispatcher struct {
// Dispatcher will be ready when it receives the handshake event from eventService.
// If false, the dispatcher will drop the event it received.
isReady atomic.Bool

errCh chan error
}

func NewDispatcher(
Expand All @@ -123,7 +125,8 @@ func NewDispatcher(
filter filter.Filter,
schemaID int64,
schemaIDToDispatchers *SchemaIDToDispatchers,
syncPointInfo *syncpoint.SyncPointInfo) *Dispatcher {
syncPointInfo *syncpoint.SyncPointInfo,
errCh chan error) *Dispatcher {
dispatcher := &Dispatcher{
changefeedID: changefeedID,
id: id,
Expand All @@ -141,6 +144,7 @@ func NewDispatcher(
schemaID: schemaID,
schemaIDToDispatchers: schemaIDToDispatchers,
resendTaskMap: newResendTaskMap(),
errCh: errCh,
}
dispatcher.startTs.Store(startTs)

Expand Down Expand Up @@ -181,8 +185,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
if action.Action == heartbeatpb.Action_Write {
err := d.sink.WriteBlockEvent(pendingEvent, d.tableProgress)
if err != nil {
// TODO: handle error
log.Error("write block event failed", zap.Error(err))
d.errCh <- err
return
}
} else {
Expand Down Expand Up @@ -406,8 +409,7 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
if !d.shouldBlock(event) {
err := d.sink.WriteBlockEvent(event, d.tableProgress)
if err != nil {
// TODO: handle error
log.Error("write block event failed", zap.Error(err))
d.errCh <- err
}
if event.GetNeedAddedTables() != nil || event.GetNeedDroppedTables() != nil {
message := &heartbeatpb.TableSpanBlockStatus{
Expand Down
42 changes: 26 additions & 16 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type EventDispatcherManager struct {
// TODO: changefeed update config
syncPointConfig *syncpoint.SyncPointConfig

// collect the error in all the dispatchers and sink module
// report the error to the maintainer
errCh chan error

tableEventDispatcherCount prometheus.Gauge
metricCreateDispatcherDuration prometheus.Observer
metricCheckpointTs prometheus.Gauge
Expand All @@ -112,6 +116,7 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID,
statusesChan: make(chan *heartbeatpb.TableSpanStatus, 8192),
blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
dispatcherActionChan: make(chan common.DispatcherAction, 1024*1024),
errCh: make(chan error, 16),
cancel: cancel,
config: cfConfig,
schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(),
Expand Down Expand Up @@ -162,21 +167,26 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID,
manager.wg.Add(1)
go func() {
defer manager.wg.Done()
err := manager.sink.Run()
if err != nil && errors.Cause(err) != context.Canceled {
log.Error("Sink Run Meets Error",
zap.String("changefeedID", manager.changefeedID.String()),
zap.Error(err))
// report error to maintainer
var message heartbeatpb.HeartBeatRequest
message.ChangefeedID = manager.changefeedID.ToPB()
message.Err = &heartbeatpb.RunningError{
Time: time.Now().String(),
Node: appcontext.GetID(),
Code: string(apperror.ErrorCode(err)),
Message: err.Error(),
select {
case <-ctx.Done():
return
case err := <-manager.errCh:
if errors.Cause(err) != context.Canceled {
log.Error("Event Dispatcher Manager Meets Error",
zap.String("changefeedID", manager.changefeedID.String()),
zap.Error(err))

// report error to maintainer
var message heartbeatpb.HeartBeatRequest
message.ChangefeedID = manager.changefeedID.ToPB()
message.Err = &heartbeatpb.RunningError{
Time: time.Now().String(),
Node: appcontext.GetID(),
Code: string(apperror.ErrorCode(err)),
Message: err.Error(),
}
manager.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: manager.GetMaintainerID(), Request: &message})
}
manager.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: manager.GetMaintainerID(), Request: &message})
}
}()

Expand Down Expand Up @@ -213,7 +223,7 @@ func NewEventDispatcherManager(changefeedID common.ChangeFeedID,
}

func (e *EventDispatcherManager) InitSink(ctx context.Context) error {
sink, err := sink.NewSink(ctx, e.config, e.changefeedID)
sink, err := sink.NewSink(ctx, e.config, e.changefeedID, e.errCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -334,7 +344,7 @@ func (e *EventDispatcherManager) NewDispatchers(infos []DispatcherCreateInfo) er
e.changefeedID,
id, tableSpans[idx], e.sink,
uint64(newStartTsList[idx]), e.dispatcherActionChan, e.blockStatusesChan,
e.filter, schemaIds[idx], e.schemaIDToDispatchers, &syncPointInfo)
e.filter, schemaIds[idx], e.schemaIDToDispatchers, &syncPointInfo, e.errCh)

if e.heartBeatTask == nil {
e.heartBeatTask = newHeartBeatTask(e)
Expand Down
15 changes: 6 additions & 9 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/node"

"github.com/pingcap/log"
Expand Down Expand Up @@ -80,27 +81,23 @@ func (c *HeartBeatCollector) RegisterEventDispatcherManager(m *EventDispatcherMa
m.SetBlockStatusRequestQueue(c.blockStatusReqQueue)
err := c.heartBeatResponseDynamicStream.AddPath(m.changefeedID, m)
if err != nil {
log.Error("heartBeatResponseDynamicStream Failed to add path", zap.Any("ChangefeedID", m.changefeedID))
return err
return errors.Trace(err)
}
err = c.schedulerDispatcherRequestDynamicStream.AddPath(m.changefeedID, m)
if err != nil {
log.Error("schedulerDispatcherRequestDynamicStream Failed to add path", zap.Any("ChangefeedID", m.changefeedID))
return err
return errors.Trace(err)
}
return nil
}

func (c *HeartBeatCollector) RemoveEventDispatcherManager(m *EventDispatcherManager) error {
err := c.heartBeatResponseDynamicStream.RemovePath(m.changefeedID)
if err != nil {
log.Error("heartBeatResponseDynamicStream Failed to remove path", zap.Any("ChangefeedID", m.changefeedID))
return err
return errors.Trace(err)
}
err = c.schedulerDispatcherRequestDynamicStream.RemovePath(m.changefeedID)
if err != nil {
log.Error("schedulerDispatcherRequestDynamicStream Failed to remove path", zap.Any("ChangefeedID", m.changefeedID))
return err
return errors.Trace(err)
}
return nil
}
Expand Down Expand Up @@ -194,7 +191,7 @@ func (h *SchedulerDispatcherRequestHandler) Handle(eventDispatcherManager *Event
if len(infos) > 0 {
err := eventDispatcherManager.NewDispatchers(infos)
if err != nil {
log.Info("failed to create dispatchers", zap.Any("infos", infos), zap.Error(err))
eventDispatcherManager.errCh <- err
}
}
return false
Expand Down
14 changes: 9 additions & 5 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ type KafkaSink struct {
statistics *metrics.Statistics

errgroup *errgroup.Group
errCh chan error
}

func (s *KafkaSink) SinkType() SinkType {
return KafkaSinkType
}

func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig) (*KafkaSink, error) {
func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig, errCh chan error) (*KafkaSink, error) {
errGroup, ctx := errgroup.WithContext(ctx)
topic, err := helper.GetTopic(sinkURI)
if err != nil {
Expand Down Expand Up @@ -157,21 +158,24 @@ func NewKafkaSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI
ddlProducer := producer.NewKafkaDDLProducer(ctx, changefeedID, ddlSyncProducer)
ddlWorker := worker.NewKafkaDDLWorker(ctx, changefeedID, protocol, ddlProducer, encoder, eventRouter, topicManager, statistics, errGroup)

return &KafkaSink{
sink := &KafkaSink{
changefeedID: changefeedID,
dmlWorker: dmlWorker,
ddlWorker: ddlWorker,
adminClient: adminClient,
topicManager: topicManager,
statistics: statistics,
errgroup: errGroup,
}, nil
errCh: errCh,
}
go sink.run()
return sink, nil
}

func (s *KafkaSink) Run() error {
func (s *KafkaSink) run() {
s.dmlWorker.Run()
s.ddlWorker.Run()
return s.errgroup.Wait()
s.errCh <- s.errgroup.Wait()
}

func (s *KafkaSink) AddDMLEvent(event *commonEvent.DMLEvent, tableProgress *types.TableProgress) {
Expand Down
10 changes: 7 additions & 3 deletions downstreamadapter/sink/mysql_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ type MysqlSink struct {
db *sql.DB
errgroup *errgroup.Group
statistics *metrics.Statistics
errCh chan error
}

func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerCount int, config *config.ChangefeedConfig, sinkURI *url.URL) (*MysqlSink, error) {
func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerCount int, config *config.ChangefeedConfig, sinkURI *url.URL, errCh chan error) (*MysqlSink, error) {
errgroup, ctx := errgroup.WithContext(ctx)
mysqlSink := MysqlSink{
changefeedID: changefeedID,
dmlWorker: make([]*worker.MysqlDMLWorker, workerCount),
workerCount: workerCount,
errgroup: errgroup,
statistics: metrics.NewStatistics(changefeedID, "TxnSink"),
errCh: errCh,
}

cfg, db, err := mysql.NewMysqlConfigAndDB(ctx, changefeedID, sinkURI)
Expand All @@ -71,14 +73,16 @@ func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerC
mysqlSink.ddlWorker = worker.NewMysqlDDLWorker(ctx, db, cfg, mysqlSink.changefeedID, errgroup, mysqlSink.statistics)
mysqlSink.db = db

go mysqlSink.run()

return &mysqlSink, nil
}

func (s *MysqlSink) Run() error {
func (s *MysqlSink) run() {
for i := 0; i < s.workerCount; i++ {
s.dmlWorker[i].Run()
}
return s.errgroup.Wait()
s.errCh <- s.errgroup.Wait()
}

func (s *MysqlSink) SinkType() SinkType {
Expand Down
7 changes: 3 additions & 4 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,19 @@ type Sink interface {
CheckStartTsList(tableIds []int64, startTsList []int64) ([]int64, error)
Close(removeDDLTsItem bool) error
SinkType() SinkType
Run() error
}

func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID common.ChangeFeedID) (Sink, error) {
func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID common.ChangeFeedID, errCh chan error) (Sink, error) {
sinkURI, err := url.Parse(config.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
return NewMysqlSink(ctx, changefeedID, 16, config, sinkURI)
return NewMysqlSink(ctx, changefeedID, 16, config, sinkURI, errCh)
case sink.KafkaScheme, sink.KafkaSSLScheme:
return NewKafkaSink(ctx, changefeedID, sinkURI, config.SinkConfig)
return NewKafkaSink(ctx, changefeedID, sinkURI, config.SinkConfig, errCh)
}
return nil, nil
}
30 changes: 7 additions & 23 deletions pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error {
if !(event.TiDBOnly && !w.cfg.IsTiDB) {
err := w.execDDLWithMaxRetries(event)
if err != nil {
log.Error("exec ddl failed", zap.Error(err))
return err
return errors.Trace(err)
}
}

Expand Down Expand Up @@ -130,27 +129,21 @@ func (w *MysqlWriter) FlushDDLTs(event *commonEvent.DDLEvent) error {
}

err := w.SendDDLTs(event)
if err != nil {
log.Error("send ddl ts failed", zap.Error(err))
return err
}
return nil
return errors.Trace(err)
}

func (w *MysqlWriter) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) error {
if !w.syncPointTableInit {
// create sync point table if not exist
err := w.CreateSyncTable()
if err != nil {
log.Error("create sync table failed", zap.Error(err))
return err
return errors.Trace(err)
}
w.syncPointTableInit = true
}
err := w.SendSyncPointEvent(event)
if err != nil {
log.Error("send syncpoint event failed", zap.Error(err))
return err
return errors.Trace(err)
}
for _, callback := range event.PostTxnFlushed {
callback()
Expand All @@ -161,7 +154,6 @@ func (w *MysqlWriter) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) err
func (w *MysqlWriter) SendSyncPointEvent(event *commonEvent.SyncPointEvent) error {
tx, err := w.db.BeginTx(w.ctx, nil)
if err != nil {
log.Error("sync table: begin Tx fail", zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "sync table: begin Tx fail;"))
}
row := tx.QueryRow("select @@tidb_current_ts")
Expand Down Expand Up @@ -252,7 +244,6 @@ func (w *MysqlWriter) SendSyncPointEvent(event *commonEvent.SyncPointEvent) erro
func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
tx, err := w.db.BeginTx(w.ctx, nil)
if err != nil {
log.Error("ddl ts table: begin Tx fail", zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "ddl ts table: begin Tx fail;"))
}

Expand Down Expand Up @@ -684,7 +675,6 @@ func (w *MysqlWriter) execDDL(event *commonEvent.DDLEvent) error {
}

if err = tx.Commit(); err != nil {
log.Error("Failed to exec DDL", zap.String("sql", event.GetDDLQuery()), zap.Error(err))
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", event.GetDDLQuery())))
}

Expand Down Expand Up @@ -729,7 +719,6 @@ func (w *MysqlWriter) Flush(events []*commonEvent.DMLEvent, workerNum int) error

if !w.cfg.DryRun {
if err := w.execDMLWithMaxRetries(dmls); err != nil {
log.Error("execute DMLs failed", zap.Error(err))
return errors.Trace(err)
}
} else {
Expand Down Expand Up @@ -835,10 +824,7 @@ func (w *MysqlWriter) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs

func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error {
if len(dmls.sqls) != len(dmls.values) {
log.Error("unexpected number of sqls and values",
zap.Strings("sqls", dmls.sqls),
zap.Any("values", dmls.values))
return cerror.ErrUnexpected.FastGenByArgs("unexpected number of sqls and values")
return cerror.ErrUnexpected.FastGenByArgs(fmt.Sprintf("unexpected number of sqls and values, sqls is %s, values is %s", dmls.sqls, dmls.values))
}

// approximateSize is multiplied by 2 because in extreme circustumas, every
Expand All @@ -851,8 +837,7 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error {
tryExec := func() (int, int64, error) {
tx, err := w.db.BeginTx(w.ctx, nil)
if err != nil {
log.Error("BeginTx", zap.Error(err))
return 0, 0, err
return 0, 0, errors.Trace(err)
}

// Set session variables first and then execute the transaction.
Expand Down Expand Up @@ -890,8 +875,7 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error {
return retry.Do(w.ctx, func() error {
err := w.statistics.RecordBatchExecution(tryExec)
if err != nil {
log.Error("RecordBatchExecution", zap.Error(err))
return err
return errors.Trace(err)
}
return nil
}, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),
Expand Down
Loading