diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index e5f9a395364..9e4b4b0374a 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -24,9 +24,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" +<<<<<<< HEAD tidbkv "github.com/pingcap/tidb/kv" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" +======= + "github.com/pingcap/tiflow/pkg/util" +>>>>>>> ae3cefd6c (test(ticdc): add processor etcd worker delay fail point (#5426)) "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/mvcc" "go.uber.org/zap" @@ -319,7 +323,7 @@ func (c *Capture) run(stdCtx context.Context) error { // when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor") + processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, util.RoleProcessor.String()) log.Info("the processor routine has exited", zap.Error(processorErr)) }() wg.Add(1) @@ -431,7 +435,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { }) } - err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, "owner") + err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String()) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) // if owner exits, resign the owner key diff --git a/pkg/chdelay/channel_delayer.go b/pkg/chdelay/channel_delayer.go new file mode 100644 index 00000000000..8a29d6ac6c9 --- /dev/null +++ b/pkg/chdelay/channel_delayer.go @@ -0,0 +1,174 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package chdelay + +import ( + "sync" + "time" +) + +// ChannelDelayer provides a mechanism to inject delay in a channel. +// +// Algorithm sketch: +// When an element arrives from the input channel, +// attach the current timestamp and put it in a queue. +// The top of the queue is checked, if the element +// has stayed in the queue for more than `delayBy`, +// then the element in popped and sent to the output channel. +type ChannelDelayer[T any] struct { + inCh <-chan T + outCh chan T + + queue []entry[T] + size int + + closeCh chan struct{} + wg sync.WaitGroup + + delayBy time.Duration +} + +type entry[T any] struct { + elem T + inTime time.Time +} + +// NewChannelDelayer creates a new ChannelDelayer. +func NewChannelDelayer[T any]( + delayBy time.Duration, + in <-chan T, + queueSize int, + outChSize int, +) *ChannelDelayer[T] { + ret := &ChannelDelayer[T]{ + inCh: in, + outCh: make(chan T, outChSize), + queue: make([]entry[T], 0, queueSize), + size: queueSize, + closeCh: make(chan struct{}), + delayBy: delayBy, + } + + ret.wg.Add(1) + go func() { + ret.run() + }() + + return ret +} + +// Out returns the delayed channel. The downstream logic +// should read from Out(). +func (d *ChannelDelayer[T]) Out() <-chan T { + return d.outCh +} + +// Close closes the ChannelDelayer. +func (d *ChannelDelayer[T]) Close() { + close(d.closeCh) + d.wg.Wait() +} + +func (d *ChannelDelayer[T]) run() { + defer d.wg.Done() + defer close(d.outCh) + + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + + // currentTime is a timestamp cache to + // avoid having to read the system's + // clock frequently. + currentTime := time.Now() + + // Returns the output channel if + // the first element in the queue + // is ready to be popped. + // Otherwise, nil is returned. + // Note that nil channels are ignored in + // a select statement, so it would disable + // a case block. + outChIfReady := func() chan<- T { + if len(d.queue) == 0 { + return nil + } + if currentTime.Sub(d.queue[0].inTime) >= d.delayBy { + return d.outCh + } + return nil + } + + // dummyEntry provides a zero value entry. + var dummyEntry entry[T] + + for { + var firstElem *T + if len(d.queue) > 0 { + firstElem = &d.queue[0].elem + } else { + // Must provide a valid pointer. + firstElem = &dummyEntry.elem + } + + select { + case <-d.closeCh: + return + case <-ticker.C: + currentTime = time.Now() + case inElem, ok := <-d.inChIfSizeOk(): + if !ok { + if len(d.queue) == 0 { + return + } + continue + } + d.queue = append(d.queue, entry[T]{ + elem: inElem, + inTime: time.Now(), + }) + case outChIfReady() <- *firstElem: + // Cleans any reference to *T if T is a pointer, + // to prompt a timely GC. + d.queue[0] = dummyEntry + d.queue = d.queue[1:] + + LOOP: + // Drain the queue as much as possible. + for { + if len(d.queue) == 0 { + break LOOP + } + if currentTime.Sub(d.queue[0].inTime) < d.delayBy { + break + } + + select { + case d.outCh <- d.queue[0].elem: + // Cleans any reference to *T if T is a pointer + d.queue[0] = dummyEntry + d.queue = d.queue[1:] + default: + break LOOP + } + } + } + } +} + +func (d *ChannelDelayer[T]) inChIfSizeOk() <-chan T { + if len(d.queue) < d.size { + return d.inCh + } + return nil +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index d7b23a3c760..c1855ddbe51 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -22,6 +22,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/chdelay" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/pingcap/tiflow/pkg/orchestrator/util" + pkgutil "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" @@ -31,10 +36,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" - - cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/etcd" - "github.com/pingcap/tiflow/pkg/orchestrator/util" ) const ( @@ -134,6 +135,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, defer cancel() watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + if role == pkgutil.RoleProcessor.String() { + failpoint.Inject("ProcessorEtcdDelay", func() { + delayer := chdelay.NewChannelDelayer(time.Second*3, watchCh, 1024, 16) + defer delayer.Close() + watchCh = delayer.Out() + }) + } + var ( pendingPatches [][]DataPatch exiting bool