Skip to content

Commit

Permalink
kafka(ticdc): fix kafka dml sink report error to sink manager (#9879)
Browse files Browse the repository at this point in the history
ref #9855
  • Loading branch information
3AceShowHand authored Oct 13, 2023
1 parent 3777731 commit 07ad2c1
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,21 @@ func newDMLSink(
s.alive.Unlock()
close(s.dead)

if err != nil && errors.Cause(err) != context.Canceled {
if err != nil {
if errors.Cause(err) == context.Canceled {
err = context.Cause(ctx)
}
select {
case <-ctx.Done():
case errCh <- err:
log.Warn("mq dml sink meet error",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.Error(err))
default:
log.Info("mq dml sink meet error, ignored",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.Error(err))
}
}
}()
Expand Down

0 comments on commit 07ad2c1

Please sign in to comment.