diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 4d06527f21e..5ae0fedd26b 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -139,7 +139,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode ti.findHandleIndex() ti.initColumnsFlag() - log.Debug("warpped table info", zap.Reflect("tableInfo", ti)) + log.Debug("warped table info", zap.Reflect("tableInfo", ti)) return ti } diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index b0d52093f7c..22ed26e7b16 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -154,7 +154,10 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m err = cerror.ErrExecDDLFailed.GenWithStackByArgs() }) if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) { - log.Info("Execute DDL succeeded", zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Bool("ignored", err != nil), zap.Reflect("ddl", ddl)) + log.Info("Execute DDL succeeded", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Bool("ignored", err != nil), + zap.Reflect("ddl", ddl)) atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs) continue } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 926f8a60edb..915b70b469d 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -273,6 +273,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { stdCtx := util.PutChangefeedIDInCtx(ctx, p.changefeed.ID) stdCtx = util.PutCaptureAddrInCtx(stdCtx, p.captureInfo.AdvertiseAddr) + stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor) p.mounter = entry.NewMounter(p.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, p.changefeed.Info.Config.EnableOldValue) p.wg.Add(1) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 19cdfc988f5..4fff5ee6081 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/notify" "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -60,6 +61,9 @@ type mqSink struct { resolvedReceiver *notify.Receiver statistics *Statistics + + role util.Role + id model.ChangeFeedID } func newMqSink( @@ -100,6 +104,9 @@ func newMqSink( return nil, errors.Trace(err) } + changefeedID := util.ChangefeedIDFromCtx(ctx) + role := util.RoleFromCtx(ctx) + k := &mqSink{ mqProducer: mqProducer, dispatcher: d, @@ -115,6 +122,9 @@ func newMqSink( resolvedReceiver: resolvedReceiver, statistics: NewStatistics(ctx, "MQ"), + + role: role, + id: changefeedID, } go func() { @@ -124,7 +134,8 @@ func newMqSink( return case errCh <- err: default: - log.Error("error channel is full", zap.Error(err)) + log.Error("error channel is full", zap.Error(err), + zap.String("changefeed", changefeedID), zap.Any("role", k.role)) } } }() @@ -135,7 +146,10 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha rowsCount := 0 for _, row := range rows { if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { - log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs)) + log.Info("Row changed event ignored", + zap.Uint64("start-ts", row.StartTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role)) continue } partition := k.dispatcher.Dispatch(row) @@ -216,6 +230,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { zap.String("query", ddl.Query), zap.Uint64("startTs", ddl.StartTs), zap.Uint64("commitTs", ddl.CommitTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role), ) return cerror.ErrDDLEventIgnored.GenWithStackByArgs() } @@ -233,7 +249,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { } k.statistics.AddDDLCount() - log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs)) + log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commitTs", ddl.CommitTs), + zap.String("changefeed", k.id), zap.Any("role", k.role)) err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1) return errors.Trace(err) } @@ -294,7 +311,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { return 0, err } } - log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize)) + log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize), + zap.String("changefeed", k.id), zap.Any("role", k.role)) return thisBatchSize, nil }) } @@ -364,7 +382,9 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, log.Warn("writeToProducer called with no-op", zap.ByteString("key", message.Key), zap.ByteString("value", message.Value), - zap.Int32("partition", partition)) + zap.Int32("partition", partition), + zap.String("changefeed", k.id), + zap.Any("role", k.role)) return nil } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index bf9edade312..69bd3a37b41 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -55,6 +56,11 @@ type Config struct { SaslScram *security.SaslScram // control whether to create topic and verify partition number AutoCreate bool + + // Timeout for sarama `config.Net` configurations, default to `10s` + DialTimeout time.Duration + WriteTimeout time.Duration + ReadTimeout time.Duration } // NewConfig returns a default Kafka configuration @@ -68,6 +74,9 @@ func NewConfig() *Config { Credential: &security.Credential{}, SaslScram: &security.SaslScram{}, AutoCreate: true, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, } } @@ -167,17 +176,45 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi c.AutoCreate = autoCreate } + s = params.Get("dial-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + c.DialTimeout = a + } + + s = params.Get("write-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + c.WriteTimeout = a + } + + s = params.Get("read-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + c.ReadTimeout = a + } + return nil } type kafkaSaramaProducer struct { - // clientLock is used to protect concurrent access of asyncClient and syncClient. + // clientLock is used to protect concurrent access of asyncProducer and syncProducer. // Since we don't close these two clients (which have an input chan) from the // sender routine, data race or send on closed chan could happen. - clientLock sync.RWMutex - asyncClient sarama.AsyncProducer - syncClient sarama.SyncProducer - // producersReleased records whether asyncClient and syncClient have been closed properly + clientLock sync.RWMutex + client sarama.Client + asyncProducer sarama.AsyncProducer + syncProducer sarama.SyncProducer + // producersReleased records whether asyncProducer and syncProducer have been closed properly producersReleased bool topic string partitionNum int32 @@ -194,6 +231,8 @@ type kafkaSaramaProducer struct { closeCh chan struct{} // atomic flag indicating whether the producer is closing closing kafkaProducerClosingFlag + role util.Role + id model.ChangeFeedID } type kafkaProducerClosingFlag = int32 @@ -224,14 +263,14 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ failpoint.Inject("KafkaSinkAsyncSendError", func() { // simulate sending message to input channel successfully but flushing // message to Kafka meets error - log.Info("failpoint error injected") + log.Info("failpoint error injected", zap.String("changefeed", k.id), zap.Any("role", k.role)) k.failpointCh <- errors.New("kafka sink injected error") failpoint.Return(nil) }) failpoint.Inject("SinkFlushDMLPanic", func() { time.Sleep(time.Second) - log.Panic("SinkFlushDMLPanic") + log.Panic("SinkFlushDMLPanic", zap.String("changefeed", k.id), zap.Any("role", k.role)) }) select { @@ -239,7 +278,7 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ return ctx.Err() case <-k.closeCh: return nil - case k.asyncClient.Input() <- msg: + case k.asyncProducer.Input() <- msg: } return nil } @@ -262,7 +301,7 @@ func (k *kafkaSaramaProducer) SyncBroadcastMessage(ctx context.Context, message case <-k.closeCh: return nil default: - err := k.syncClient.SendMessages(msgs) + err := k.syncProducer.SendMessages(msgs) return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } } @@ -324,12 +363,13 @@ func (k *kafkaSaramaProducer) stop() { if atomic.SwapInt32(&k.closing, kafkaProducerClosing) == kafkaProducerClosing { return } - log.Info("kafka producer closing...") + log.Info("kafka producer closing...", zap.String("changefeed", k.id), zap.Any("role", k.role)) close(k.closeCh) } // Close closes the sync and async clients. func (k *kafkaSaramaProducer) Close() error { + log.Info("stop the kafka producer", zap.String("changefeed", k.id), zap.Any("role", k.role)) k.stop() k.clientLock.Lock() @@ -341,22 +381,39 @@ func (k *kafkaSaramaProducer) Close() error { return nil } k.producersReleased = true - // In fact close sarama sync client doesn't return any error. - // But close async client returns error if error channel is not empty, we - // don't populate this error to the upper caller, just add a log here. + // `client` is mainly used by `asyncProducer` to fetch metadata and other related + // operations. When we close the `kafkaSaramaProducer`, TiCDC no need to make sure + // that buffered messages flushed. + // Consider the situation that the broker does not respond, If the client is not + // closed, `asyncProducer.Close()` would waste a mount of time to try flush all messages. + // To prevent the scenario mentioned above, close client first. start := time.Now() - err := k.asyncClient.Close() + if err := k.client.Close(); err != nil { + log.Error("close sarama client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } else { + log.Info("sarama client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } + + start = time.Now() + err := k.asyncProducer.Close() if err != nil { - log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + log.Error("close async client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), zap.String("changefeed", k.id), zap.Any("role", k.role)) } else { - log.Info("async client closed", zap.Duration("duration", time.Since(start))) + log.Info("async client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) } start = time.Now() - err = k.syncClient.Close() + err = k.syncProducer.Close() if err != nil { - log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) } else { - log.Info("sync client closed", zap.Duration("duration", time.Since(start))) + log.Info("sync client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) } return nil } @@ -364,6 +421,8 @@ func (k *kafkaSaramaProducer) Close() error { func (k *kafkaSaramaProducer) run(ctx context.Context) error { defer func() { k.flushedReceiver.Stop() + log.Info("stop the kafka producer", + zap.String("changefeed", k.id), zap.Any("role", k.role)) k.stop() }() for { @@ -373,16 +432,17 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { case <-k.closeCh: return nil case err := <-k.failpointCh: - log.Warn("receive from failpoint chan", zap.Error(err)) + log.Warn("receive from failpoint chan", zap.Error(err), + zap.String("changefeed", k.id), zap.Any("role", k.role)) return err - case msg := <-k.asyncClient.Successes(): + case msg := <-k.asyncProducer.Successes(): if msg == nil || msg.Metadata == nil { continue } flushedOffset := msg.Metadata.(uint64) atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) k.flushedNotifier.Notify() - case err := <-k.asyncClient.Errors(): + case err := <-k.asyncProducer.Errors(): // We should not wrap a nil pointer if the pointer is of a subtype of `error` // because Go would store the type info and the resulted `error` variable would not be nil, // which will cause the pkg/error library to malfunction. @@ -424,7 +484,7 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) } if topicMaxMessageBytes < config.MaxMessageBytes { - log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+ + log.Warn("topic's `max.message.bytes` less than the `max-message-bytes`,"+ "use topic's `max.message.bytes` to initialize the Kafka producer", zap.Int("max.message.bytes", topicMaxMessageBytes), zap.Int("max-message-bytes", config.MaxMessageBytes)) @@ -459,7 +519,7 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) // TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than // broker's `message.max.bytes`. if brokerMessageMaxBytes < config.MaxMessageBytes { - log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+ + log.Warn("broker's `message.max.bytes` less than the `max-message-bytes`,"+ "use broker's `message.max.bytes` to initialize the Kafka producer", zap.Int("message.max.bytes", brokerMessageMaxBytes), zap.Int("max-message-bytes", config.MaxMessageBytes)) @@ -493,7 +553,11 @@ var newSaramaConfigImpl = newSaramaConfig // NewKafkaSaramaProducer creates a kafka sarama producer func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { - log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) + changefeedID := util.ChangefeedIDFromCtx(ctx) + role := util.RoleFromCtx(ctx) + log.Info("Starting kafka sarama producer ...", zap.Any("config", config), + zap.String("changefeed", changefeedID), zap.Any("role", role)) + cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { return nil, err @@ -504,11 +568,17 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o } opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes) - asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg) + client, err := sarama.NewClient(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + asyncProducer, err := sarama.NewAsyncProducerFromClient(client) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg) + + syncProducer, err := sarama.NewSyncProducerFromClient(client) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } @@ -519,10 +589,11 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o return nil, err } k := &kafkaSaramaProducer{ - asyncClient: asyncClient, - syncClient: syncClient, - topic: topic, - partitionNum: config.PartitionNum, + client: client, + asyncProducer: asyncProducer, + syncProducer: syncProducer, + topic: topic, + partitionNum: config.PartitionNum, partitionOffset: make([]struct { flushed uint64 sent uint64 @@ -547,10 +618,6 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o return k, nil } -func init() { - sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB -} - var ( validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) commonInvalidChar = regexp.MustCompile(`[\?:,"]`) @@ -569,7 +636,6 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) ( return } -// NewSaramaConfig return the default config and set the according version and metrics func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config := sarama.NewConfig() @@ -591,33 +657,45 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { return nil, errors.Trace(err) } config.Version = version - // See: https://kafka.apache.org/documentation/#replication - // When one of the brokers in a Kafka cluster is down, the partition leaders - // in this broker is broken, Kafka will election a new partition leader and - // replication logs, this process will last from a few seconds to a few minutes. - // Kafka cluster will not provide a writing service in this process. - // Time out in one minute. - config.Metadata.Retry.Max = 120 - config.Metadata.Retry.Backoff = 500 * time.Millisecond - // If it is not set, this means a metadata request against an unreachable - // cluster (all brokers are unreachable or unresponsive) can take up to - // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + - // Metadata.Retry.Backoff * Metadata.Retry.Max` - // to fail. - // See: https://github.com/Shopify/sarama/issues/765 - // and https://github.com/pingcap/tiflow/issues/3352. + + // Producer fetch metadata from brokers frequently, if metadata cannot be + // refreshed easily, this would indicate the network condition between the + // capture server and kafka broker is not good. + // In the scenario that cannot get response from Kafka server, this default + // setting can help to get response more quickly. + config.Metadata.Retry.Max = 1 + config.Metadata.Retry.Backoff = 100 * time.Millisecond + // This Timeout is useless if the `RefreshMetadata` time cost is less than it. config.Metadata.Timeout = 1 * time.Minute + // Admin.Retry take effect on `ClusterAdmin` related operations, + // only `CreateTopic` for cdc now. set the `Timeout` to `1m` to make CI stable. + config.Admin.Retry.Max = 5 + config.Admin.Retry.Backoff = 100 * time.Millisecond + config.Admin.Timeout = 1 * time.Minute + + // Producer.Retry take effect when the producer try to send message to kafka + // brokers. If kafka cluster is healthy, just the default value should be enough. + // For kafka cluster with a bad network condition, producer should not try to + // waster too much time on sending a message, get response no matter success + // or fail as soon as possible is preferred. + config.Producer.Retry.Max = 3 + config.Producer.Retry.Backoff = 100 * time.Millisecond + + // make sure sarama producer flush messages as soon as possible. + config.Producer.Flush.Bytes = 0 + config.Producer.Flush.Messages = 0 + config.Producer.Flush.Frequency = time.Duration(0) + + config.Net.DialTimeout = c.DialTimeout + config.Net.WriteTimeout = c.WriteTimeout + config.Net.ReadTimeout = c.ReadTimeout + config.Producer.Partitioner = sarama.NewManualPartitioner config.Producer.MaxMessageBytes = c.MaxMessageBytes config.Producer.Return.Successes = true config.Producer.Return.Errors = true config.Producer.RequiredAcks = sarama.WaitForAll - - // Time out in five minutes(600 * 500ms). - config.Producer.Retry.Max = 600 - config.Producer.Retry.Backoff = 500 * time.Millisecond - switch strings.ToLower(strings.TrimSpace(c.Compression)) { case "none": config.Producer.Compression = sarama.CompressionNone @@ -634,11 +712,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config.Producer.Compression = sarama.CompressionNone } - // Time out in one minute(120 * 500ms). - config.Admin.Retry.Max = 120 - config.Admin.Retry.Backoff = 500 * time.Millisecond - config.Admin.Timeout = 20 * time.Second - if c.Credential != nil && len(c.Credential.CAPath) != 0 { config.Net.TLS.Enable = true config.Net.TLS.Config, err = c.Credential.ToTLSConfig() diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index ebcd6093068..d9a6561fca2 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -67,8 +67,41 @@ func (s *kafkaSuite) TestClientID(c *check.C) { } } +func (s *kafkaSuite) TestConfigTimeouts(c *check.C) { + defer testleak.AfterTest(c)() + + cfg := NewConfig() + c.Assert(cfg.DialTimeout, check.Equals, 10*time.Second) + c.Assert(cfg.ReadTimeout, check.Equals, 10*time.Second) + c.Assert(cfg.WriteTimeout, check.Equals, 10*time.Second) + + saramaConfig, err := newSaramaConfig(context.Background(), cfg) + c.Assert(err, check.IsNil) + c.Assert(saramaConfig.Net.DialTimeout, check.Equals, cfg.DialTimeout) + c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, cfg.WriteTimeout) + c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, cfg.ReadTimeout) + + uri := "kafka://127.0.0.1:9092/kafka-test?dial-timeout=5s&read-timeout=1000ms" + + "&write-timeout=2m" + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + opts := make(map[string]string) + err = cfg.Initialize(sinkURI, config.GetDefaultReplicaConfig(), opts) + c.Assert(err, check.IsNil) + + c.Assert(cfg.DialTimeout, check.Equals, 5*time.Second) + c.Assert(cfg.ReadTimeout, check.Equals, 1000*time.Millisecond) + c.Assert(cfg.WriteTimeout, check.Equals, 2*time.Minute) + + saramaConfig, err = newSaramaConfig(context.Background(), cfg) + c.Assert(err, check.IsNil) + c.Assert(saramaConfig.Net.DialTimeout, check.Equals, 5*time.Second) + c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, 1000*time.Millisecond) + c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, 2*time.Minute) +} + func (s *kafkaSuite) TestInitializeConfig(c *check.C) { - defer testleak.AfterTest(c) + defer testleak.AfterTest(c)() cfg := NewConfig() uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + @@ -117,7 +150,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) @@ -152,6 +185,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) @@ -350,6 +384,7 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { config.BrokerEndpoints = []string{"127.0.0.1:1111"} topic := "topic" c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) _, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") @@ -368,7 +403,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) config := NewConfig() @@ -396,6 +431,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") @@ -446,7 +482,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) config := NewConfig() @@ -461,6 +497,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) errCh := make(chan error, 1) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) defer func() { err := producer.Close() diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index a714a3085ac..c9e7addd7ab 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/util" ) // Sink options keys @@ -124,6 +125,7 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig, op return err } errCh := make(chan error) + ctx = util.PutRoleInCtx(ctx, util.RoleClient) // TODO: find a better way to verify a sinkURI is valid s, err := New(ctx, "sink-verify", sinkURI, sinkFilter, cfg, opts, errCh) if err != nil { diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 68c6007b2a5..ee8fb4831fe 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -333,6 +333,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { } c.sinks = make([]*partitionSink, kafkaPartitionNum) ctx, cancel := context.WithCancel(ctx) + ctx = util.PutRoleInCtx(ctx, util.RoleKafkaConsumer) errCh := make(chan error, 1) opts := map[string]string{} for i := 0; i < int(kafkaPartitionNum); i++ { diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index cbe8f46b60e..28c705b8440 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -111,6 +112,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { return err } opts := map[string]string{} + ctx = util.PutRoleInCtx(ctx, util.RoleRedoLogApplier) s, err := sink.New(ctx, applierChangefeed, ra.cfg.SinkURI, ft, replicaConfig, opts, ra.errCh) if err != nil { return err diff --git a/pkg/util/ctx.go b/pkg/util/ctx.go index 492b00f8b4e..4f68af2cf33 100644 --- a/pkg/util/ctx.go +++ b/pkg/util/ctx.go @@ -31,6 +31,7 @@ const ( ctxKeyIsOwner = ctxKey("isOwner") ctxKeyTimezone = ctxKey("timezone") ctxKeyKVStorage = ctxKey("kvStorage") + ctxKeyRole = ctxKey("role") ) // CaptureAddrFromCtx returns a capture ID stored in the specified context. @@ -121,6 +122,21 @@ func PutChangefeedIDInCtx(ctx context.Context, changefeedID string) context.Cont return context.WithValue(ctx, ctxKeyChangefeedID, changefeedID) } +// RoleFromCtx returns a role stored in the specified context. +// It returns RoleUnknown if there's no valid role found +func RoleFromCtx(ctx context.Context) Role { + role, ok := ctx.Value(ctxKeyRole).(Role) + if !ok { + return RoleUnknown + } + return role +} + +// PutRoleInCtx return a new child context with the specified role stored. +func PutRoleInCtx(ctx context.Context, role Role) context.Context { + return context.WithValue(ctx, ctxKeyRole, role) +} + // ZapFieldCapture returns a zap field containing capture address // TODO: log redact for capture address func ZapFieldCapture(ctx context.Context) zap.Field { diff --git a/pkg/util/identity.go b/pkg/util/identity.go new file mode 100644 index 00000000000..5b0abe38a80 --- /dev/null +++ b/pkg/util/identity.go @@ -0,0 +1,47 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +// Role is the operator role, mainly used for logging at the moment. +type Role int + +const ( + RoleOwner Role = iota + RoleProcessor + RoleClient + RoleRedoLogApplier + RoleKafkaConsumer + RoleTester + RoleUnknown +) + +func (r Role) String() string { + switch r { + case RoleOwner: + return "owner" + case RoleProcessor: + return "processor" + case RoleClient: + return "cdc-client" + case RoleKafkaConsumer: + return "kafka-consumer" + case RoleRedoLogApplier: + return "redo-applier" + case RoleTester: + return "tester" + case RoleUnknown: + return "unknown" + } + return "unknown" +}