Skip to content

Commit

Permalink
br: add chaos testing for advancer owner (pingcap#58183)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan1900 authored Dec 31, 2024
1 parent b71ad38 commit a6cd5e7
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 33 deletions.
13 changes: 4 additions & 9 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *CheckpointAdvancer) HasTask() bool {
return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
// HasSubscriptions returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscriptions() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

Expand All @@ -117,7 +117,7 @@ func newCheckpointWithTS(ts uint64) *checkpoint {
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
func newCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
Expand Down Expand Up @@ -270,11 +270,6 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
c.checkpoints = cps
}

func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string {
region, err := locateKeyOfRegion(ctx, c.env, startKey)
if err != nil {
Expand Down Expand Up @@ -473,7 +468,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
}

func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
cp := newCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestRemoveTaskAndFlush(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
return !adv.HasSubscriptions()
}, 10*time.Second, 100*time.Millisecond)
}

Expand Down
46 changes: 33 additions & 13 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ import (
const (
flagBackoffTime = "backoff-time"
flagTickInterval = "tick-interval"
flagFullScanDiffTick = "full-scan-tick"
flagAdvancingByCache = "advancing-by-cache"
flagTryAdvanceThreshold = "try-advance-threshold"
flagCheckPointLagLimit = "check-point-lag-limit"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
DefaultAdvanceByCache = true
// used for chaos testing
flagOwnershipCycleInterval = "ownership-cycle-interval"
)

const (
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second

// used for chaos testing, default to disable
DefaultOwnershipCycleInterval = 0
)

var (
Expand All @@ -38,6 +41,11 @@ type Config struct {
TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"`
// The maximum lag could be tolerated for the checkpoint lag.
CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"`

// Following configs are used in chaos testings, better not to enable in prod
//
// used to periodically becomes/retire advancer owner
OwnershipCycleInterval time.Duration `toml:"ownership-cycle-interval" json:"ownership-cycle-interval"`
}

func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
Expand All @@ -49,14 +57,22 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
"If the checkpoint lag is greater than how long, we would try to poll TiKV for checkpoints.")
f.Duration(flagCheckPointLagLimit, DefaultCheckPointLagLimit,
"The maximum lag could be tolerated for the checkpoint lag.")

// used for chaos testing
f.Duration(flagOwnershipCycleInterval, DefaultOwnershipCycleInterval,
"The interval that the owner will retire itself")

// mark hidden
_ = f.MarkHidden(flagOwnershipCycleInterval)
}

func Default() Config {
return Config{
BackoffTime: DefaultBackOffTime,
TickDuration: DefaultTickInterval,
TryAdvanceThreshold: DefaultTryAdvanceThreshold,
CheckPointLagLimit: DefaultCheckPointLagLimit,
BackoffTime: DefaultBackOffTime,
TickDuration: DefaultTickInterval,
TryAdvanceThreshold: DefaultTryAdvanceThreshold,
CheckPointLagLimit: DefaultCheckPointLagLimit,
OwnershipCycleInterval: DefaultOwnershipCycleInterval,
}
}

Expand All @@ -78,6 +94,10 @@ func (conf *Config) GetFromFlags(f *pflag.FlagSet) error {
if err != nil {
return err
}
conf.OwnershipCycleInterval, err = f.GetDuration(flagOwnershipCycleInterval)
if err != nil {
return err
}
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions br/pkg/streamhelper/daemon/owner_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,11 @@ func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) {
}
return loop, nil
}

func (od *OwnerDaemon) ForceToBeOwner(ctx context.Context) error {
return od.manager.ForceToBeOwner(ctx)
}

func (od *OwnerDaemon) RetireIfOwner() {
od.manager.RetireOwner()
}
1 change: 0 additions & 1 deletion br/pkg/streamhelper/daemon/owner_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func TestDaemon(t *testing.T) {
ow.RetireOwner()
req.False(ow.IsOwner())
app.AssertNotRunning(1 * time.Second)
ow.CampaignOwner()
req.Eventually(func() bool {
return ow.IsOwner()
}, 1*time.Second, 100*time.Millisecond)
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,8 @@ func RunStreamResume(
func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
ctx, cancel := context.WithCancel(c)
defer cancel()
log.Info("starting", zap.String("cmd", cmdName))

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false, conn.StreamVersionChecker)
if err != nil {
Expand All @@ -941,10 +943,46 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
if err != nil {
return err
}
if cfg.AdvancerCfg.OwnershipCycleInterval > 0 {
err = advancerd.ForceToBeOwner(ctx)
if err != nil {
return err
}
log.Info("command line advancer forced to be the owner")
go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval, true)
}
loop()
return nil
}

// runOwnershipCycle handles the periodic cycling of ownership for the advancer
func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration, isOwner bool) {
ticker := time.NewTicker(cycleDuration)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !isOwner {
// try to become owner
if err := advancerd.ForceToBeOwner(ctx); err != nil {
log.Error("command line advancer failed to force ownership", zap.Error(err))
continue
}
log.Info("command line advancer forced to be the owner")
isOwner = true
} else {
// retire from being owner
advancerd.RetireIfOwner()
log.Info("command line advancer retired from being owner")
isOwner = false
}
}
}
}

func checkConfigForStatus(pd []string) error {
if len(pd) == 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
Expand Down
1 change: 0 additions & 1 deletion pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/config",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/streamhelper/config",
"//pkg/parser/terror",
"//pkg/util/logutil",
"//pkg/util/tiflashcompute",
Expand Down
8 changes: 0 additions & 8 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
zaplog "github.com/pingcap/log"
logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/tiflashcompute"
Expand Down Expand Up @@ -458,13 +457,6 @@ func (b *AtomicBool) UnmarshalText(text []byte) error {
return nil
}

// LogBackup is the config for log backup service.
// For now, it includes the embed advancer.
type LogBackup struct {
Advancer logbackupconf.Config `toml:"advancer" json:"advancer"`
Enabled bool `toml:"enabled" json:"enabled"`
}

// Log is the log section of config.
type Log struct {
// Log level.
Expand Down

0 comments on commit a6cd5e7

Please sign in to comment.