From 1c336c6bd1718c1e0a159045921ae82c05e30e63 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 16 May 2022 10:00:43 +0800 Subject: [PATCH 1/2] add processor etcd worker delay fail point --- cdc/capture/capture.go | 4 ++-- pkg/chdelay/channel_delayer.go | 2 +- pkg/orchestrator/etcd_worker.go | 14 ++++++++++++++ 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index c3ef03fee6a..b79f712e383 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -290,7 +290,7 @@ func (c *Capture) run(stdCtx context.Context) error { // when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor") + processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, orchestrator.ProcessorRole) log.Info("the processor routine has exited", zap.Error(processorErr)) }() wg.Add(1) @@ -388,7 +388,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { c.MessageRouter.RemovePeer(captureID) }) - err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, "owner") + err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, orchestrator.OwnerRole) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key diff --git a/pkg/chdelay/channel_delayer.go b/pkg/chdelay/channel_delayer.go index 0ed34ea181b..8a29d6ac6c9 100644 --- a/pkg/chdelay/channel_delayer.go +++ b/pkg/chdelay/channel_delayer.go @@ -47,7 +47,7 @@ type entry[T any] struct { // NewChannelDelayer creates a new ChannelDelayer. func NewChannelDelayer[T any]( delayBy time.Duration, - in chan T, + in <-chan T, queueSize int, outChSize int, ) *ChannelDelayer[T] { diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 263b861b778..f4c07004280 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/chdelay" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" @@ -42,6 +43,11 @@ const ( // takes more than etcdWorkerLogsWarnDuration, it will print a log etcdWorkerLogsWarnDuration = 1 * time.Second deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" + + // ProcessorRole is the role of the processor etcd worker + ProcessorRole = "processor" + // OwnerRole is the role of the owner etcd worker + OwnerRole = "owner" ) // EtcdWorker handles all interactions with Etcd @@ -134,6 +140,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, defer cancel() watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + if role == ProcessorRole { + failpoint.Inject("ProcessorEtcdDelay", func() { + delayer := chdelay.NewChannelDelayer(time.Second*3, watchCh, 1024, 16) + defer delayer.Close() + watchCh = delayer.Out() + }) + } + var ( pendingPatches [][]DataPatch exiting bool From 64cd3c656ca5a1a64028b5b931c1a0b2e035da90 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Mon, 16 May 2022 15:16:05 +0800 Subject: [PATCH 2/2] address comments --- cdc/capture/capture.go | 5 +++-- pkg/orchestrator/etcd_worker.go | 15 +++++---------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index b79f712e383..872f0d6c3f1 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/util" "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" @@ -290,7 +291,7 @@ func (c *Capture) run(stdCtx context.Context) error { // when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, orchestrator.ProcessorRole) + processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, util.RoleProcessor.String()) log.Info("the processor routine has exited", zap.Error(processorErr)) }() wg.Add(1) @@ -388,7 +389,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { c.MessageRouter.RemovePeer(captureID) }) - err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, orchestrator.OwnerRole) + err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String()) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index f4c07004280..64acf720c79 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -23,6 +23,10 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/chdelay" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/pingcap/tiflow/pkg/orchestrator/util" + pkgutil "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" @@ -32,10 +36,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" - - cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/etcd" - "github.com/pingcap/tiflow/pkg/orchestrator/util" ) const ( @@ -43,11 +43,6 @@ const ( // takes more than etcdWorkerLogsWarnDuration, it will print a log etcdWorkerLogsWarnDuration = 1 * time.Second deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" - - // ProcessorRole is the role of the processor etcd worker - ProcessorRole = "processor" - // OwnerRole is the role of the owner etcd worker - OwnerRole = "owner" ) // EtcdWorker handles all interactions with Etcd @@ -140,7 +135,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, defer cancel() watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) - if role == ProcessorRole { + if role == pkgutil.RoleProcessor.String() { failpoint.Inject("ProcessorEtcdDelay", func() { delayer := chdelay.NewChannelDelayer(time.Second*3, watchCh, 1024, 16) defer delayer.Close()