Skip to content

Commit

Permalink
kafka(ticdc): fix kafka dml sink report error to sink manager (#9879) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 18, 2023
1 parent c27eddf commit ef317bf
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions cdc/sinkv2/eventsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,24 @@ func newSink(ctx context.Context,
s.alive.Unlock()
close(s.dead)

if err != nil && errors.Cause(err) != context.Canceled {
if err != nil {
if errors.Cause(err) == context.Canceled {
if s.cancelErr == nil {
return
}
err = s.cancelErr
}
select {
case <-ctx.Done():
case errCh <- err:
log.Warn("mq dml sink meet error",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.Error(err))
default:
log.Info("mq dml sink meet error, ignored",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.Error(err))
}
}
}()
Expand Down

0 comments on commit ef317bf

Please sign in to comment.