diff --git a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go index 6bf485d38d1..85f140d157e 100644 --- a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go @@ -117,10 +117,24 @@ func newSink(ctx context.Context, s.alive.Unlock() close(s.dead) - if err != nil && errors.Cause(err) != context.Canceled { + if err != nil { + if errors.Cause(err) == context.Canceled { + if s.cancelErr == nil { + return + } + err = s.cancelErr + } select { - case <-ctx.Done(): case errCh <- err: + log.Warn("mq dml sink meet error", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) + default: + log.Info("mq dml sink meet error, ignored", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) } } }()