Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ticdc): add processor etcd worker delay fail point #5426

Merged
merged 8 commits into from
May 17, 2022
5 changes: 3 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/util"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
Expand Down Expand Up @@ -290,7 +291,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the processor and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor")
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, util.RoleProcessor.String())
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
Expand Down Expand Up @@ -388,7 +389,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
c.MessageRouter.RemovePeer(captureID)
})

err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, "owner")
err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval, util.RoleOwner.String())
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand Down
2 changes: 1 addition & 1 deletion pkg/chdelay/channel_delayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type entry[T any] struct {
// NewChannelDelayer creates a new ChannelDelayer.
func NewChannelDelayer[T any](
delayBy time.Duration,
in chan T,
in <-chan T,
queueSize int,
outChSize int,
) *ChannelDelayer[T] {
Expand Down
17 changes: 13 additions & 4 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/chdelay"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator/util"
pkgutil "github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
Expand All @@ -31,10 +36,6 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"

cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator/util"
)

const (
Expand Down Expand Up @@ -134,6 +135,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
defer cancel()
watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), role, clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1))

if role == pkgutil.RoleProcessor.String() {
failpoint.Inject("ProcessorEtcdDelay", func() {
delayer := chdelay.NewChannelDelayer(time.Second*3, watchCh, 1024, 16)
defer delayer.Close()
watchCh = delayer.Out()
})
}

var (
pendingPatches [][]DataPatch
exiting bool
Expand Down