From 07ad2c1a438ab773308987922954ee2c909fba03 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Thu, 12 Oct 2023 23:00:29 -0500 Subject: [PATCH] kafka(ticdc): fix kafka dml sink report error to sink manager (#9879) ref pingcap/tiflow#9855 --- cdc/sink/dmlsink/mq/mq_dml_sink.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 0646593b63f..d04e98f0d38 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -114,10 +114,21 @@ func newDMLSink( s.alive.Unlock() close(s.dead) - if err != nil && errors.Cause(err) != context.Canceled { + if err != nil { + if errors.Cause(err) == context.Canceled { + err = context.Cause(ctx) + } select { - case <-ctx.Done(): case errCh <- err: + log.Warn("mq dml sink meet error", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) + default: + log.Info("mq dml sink meet error, ignored", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.Error(err)) } } }()