diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index b18b1017bcd..94f363000b5 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -111,10 +111,21 @@ func newDMLSink( s.alive.Unlock() close(s.dead) - if err != nil && errors.Cause(err) != context.Canceled { + if err != nil { + if errors.Cause(err) == context.Canceled { + err = context.Cause(ctx) + } select { - case <-ctx.Done(): case errCh <- err: + log.Warn("mq dml sink meet error", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) + default: + log.Info("mq dml sink meet error, ignored", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) } } }()