diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index b79f712e383..872f0d6c3f1 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/util" "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" @@ -290,7 +291,7 @@ func (c *Capture) run(stdCtx context.Context) error { // when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, orchestrator.ProcessorRole) + processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, util.RoleProcessor.String()) log.Info("the processor routine has exited", zap.Error(processorErr)) }() wg.Add(1) @@ -388,7 +389,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { c.MessageRouter.RemovePeer(captureID) }) - err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, orchestrator.OwnerRole) + err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String()) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index f4c07004280..64acf720c79 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -23,6 +23,10 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/chdelay" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/pingcap/tiflow/pkg/orchestrator/util" + pkgutil "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" @@ -32,10 +36,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" - - cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/etcd" - "github.com/pingcap/tiflow/pkg/orchestrator/util" ) const ( @@ -43,11 +43,6 @@ const ( // takes more than etcdWorkerLogsWarnDuration, it will print a log etcdWorkerLogsWarnDuration = 1 * time.Second deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" - - // ProcessorRole is the role of the processor etcd worker - ProcessorRole = "processor" - // OwnerRole is the role of the owner etcd worker - OwnerRole = "owner" ) // EtcdWorker handles all interactions with Etcd @@ -140,7 +135,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, defer cancel() watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) - if role == ProcessorRole { + if role == pkgutil.RoleProcessor.String() { failpoint.Inject("ProcessorEtcdDelay", func() { delayer := chdelay.NewChannelDelayer(time.Second*3, watchCh, 1024, 16) defer delayer.Close()