Skip to content

Commit

Permalink
cdc/kafka: 5.3-manaual-cherry-pick-for-kafka-configuration (#4641)
Browse files Browse the repository at this point in the history
ref #4383, close #4385, ref #4499
  • Loading branch information
3AceShowHand authored Feb 21, 2022
1 parent 648ce2c commit f034233
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 72 deletions.
2 changes: 1 addition & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode

ti.findHandleIndex()
ti.initColumnsFlag()
log.Debug("warpped table info", zap.Reflect("tableInfo", ti))
log.Debug("warped table info", zap.Reflect("tableInfo", ti))
return ti
}

Expand Down
5 changes: 4 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
err = cerror.ErrExecDDLFailed.GenWithStackByArgs()
})
if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) {
log.Info("Execute DDL succeeded", zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Bool("ignored", err != nil), zap.Reflect("ddl", ddl))
log.Info("Execute DDL succeeded",
zap.String("changefeed", ctx.ChangefeedVars().ID),
zap.Bool("ignored", err != nil),
zap.Reflect("ddl", ddl))
atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs)
continue
}
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {

stdCtx := util.PutChangefeedIDInCtx(ctx, p.changefeed.ID)
stdCtx = util.PutCaptureAddrInCtx(stdCtx, p.captureInfo.AdvertiseAddr)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor)

p.mounter = entry.NewMounter(p.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, p.changefeed.Info.Config.EnableOldValue)
p.wg.Add(1)
Expand Down
30 changes: 25 additions & 5 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -60,6 +61,9 @@ type mqSink struct {
resolvedReceiver *notify.Receiver

statistics *Statistics

role util.Role
id model.ChangeFeedID
}

func newMqSink(
Expand Down Expand Up @@ -100,6 +104,9 @@ func newMqSink(
return nil, errors.Trace(err)
}

changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)

k := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
Expand All @@ -115,6 +122,9 @@ func newMqSink(
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ"),

role: role,
id: changefeedID,
}

go func() {
Expand All @@ -124,7 +134,8 @@ func newMqSink(
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
log.Error("error channel is full", zap.Error(err),
zap.String("changefeed", changefeedID), zap.Any("role", k.role))
}
}
}()
Expand All @@ -135,7 +146,10 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
rowsCount := 0
for _, row := range rows {
if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
log.Info("Row changed event ignored",
zap.Uint64("start-ts", row.StartTs),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
continue
}
partition := k.dispatcher.Dispatch(row)
Expand Down Expand Up @@ -216,6 +230,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
zap.String("query", ddl.Query),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("changefeed", k.id),
zap.Any("role", k.role),
)
return cerror.ErrDDLEventIgnored.GenWithStackByArgs()
}
Expand All @@ -233,7 +249,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
}

k.statistics.AddDDLCount()
log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs))
log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commitTs", ddl.CommitTs),
zap.String("changefeed", k.id), zap.Any("role", k.role))
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1)
return errors.Trace(err)
}
Expand Down Expand Up @@ -294,7 +311,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
return 0, err
}
}
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize))
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize),
zap.String("changefeed", k.id), zap.Any("role", k.role))
return thisBatchSize, nil
})
}
Expand Down Expand Up @@ -364,7 +382,9 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,
log.Warn("writeToProducer called with no-op",
zap.ByteString("key", message.Key),
zap.ByteString("value", message.Value),
zap.Int32("partition", partition))
zap.Int32("partition", partition),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
return nil
}

Expand Down
Loading

0 comments on commit f034233

Please sign in to comment.