Skip to content

Commit

Permalink
ddl: implement backoff pool for TiFlash Cluster Management (#32317)
Browse files Browse the repository at this point in the history
ref #32254
  • Loading branch information
CalvinNeo authored Mar 4, 2022
1 parent 5464eec commit 55d0f12
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 3 deletions.
158 changes: 155 additions & 3 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,156 @@ type TiFlashReplicaStatus struct {
IsPartition bool
}

// TiFlashTick is type for backoff threshold.
type TiFlashTick float64

// PollTiFlashBackoffElement records backoff for each TiFlash Table.
// `Counter` increases every `Tick`, if it reached `Threshold`, it will be reset to 0 while `Threshold` grows.
// `TotalCounter` records total `Tick`s this element has since created.
type PollTiFlashBackoffElement struct {
Counter int
Threshold TiFlashTick
TotalCounter int
}

// NewPollTiFlashBackoffElement initialize backoff element for a TiFlash table.
func NewPollTiFlashBackoffElement() *PollTiFlashBackoffElement {
return &PollTiFlashBackoffElement{
Counter: 0,
Threshold: PollTiFlashBackoffMinTick,
TotalCounter: 0,
}
}

// PollTiFlashBackoffContext is a collection of all backoff states.
type PollTiFlashBackoffContext struct {
MinThreshold TiFlashTick
MaxThreshold TiFlashTick
// Capacity limits tables a backoff pool can handle, in order to limit handling of big tables.
Capacity int
Rate TiFlashTick
elements map[int64]*PollTiFlashBackoffElement
}

// NewPollTiFlashBackoffContext creates an instance of PollTiFlashBackoffContext.
func NewPollTiFlashBackoffContext(MinThreshold, MaxThreshold TiFlashTick, Capacity int, Rate TiFlashTick) (*PollTiFlashBackoffContext, error) {
if MaxThreshold < MinThreshold {
return nil, fmt.Errorf("`MaxThreshold` should always be larger than `MinThreshold`")
}
if MinThreshold < 1 {
return nil, fmt.Errorf("`MinThreshold` should not be less than 1")
}
if Capacity < 0 {
return nil, fmt.Errorf("negative `Capacity`")
}
if Rate <= 1 {
return nil, fmt.Errorf("`Rate` should always be larger than 1")
}
return &PollTiFlashBackoffContext{
MinThreshold: MinThreshold,
MaxThreshold: MaxThreshold,
Capacity: Capacity,
elements: make(map[int64]*PollTiFlashBackoffElement),
Rate: Rate,
}, nil
}

// TiFlashManagementContext is the context for TiFlash Replica Management
type TiFlashManagementContext struct {
TiFlashStores map[int64]helper.StoreStat
HandlePdCounter uint64
UpdateTiFlashStoreCounter uint64
UpdateMap map[int64]bool
Backoff *PollTiFlashBackoffContext
}

// Tick will first check increase Counter.
// It returns:
// 1. A bool indicates whether threshold is grown during this tick.
// 2. A bool indicates whether this ID exists.
// 3. A int indicates how many ticks ID has counted till now.
func (b *PollTiFlashBackoffContext) Tick(ID int64) (bool, bool, int) {
e, ok := b.Get(ID)
if !ok {
return false, false, 0
}
grew := e.MaybeGrow(b)
e.Counter += 1
e.TotalCounter += 1
return grew, true, e.TotalCounter
}

// NeedGrow returns if we need to grow.
// It is exported for testing.
func (e *PollTiFlashBackoffElement) NeedGrow() bool {
return e.Counter >= int(e.Threshold)
}

func (e *PollTiFlashBackoffElement) doGrow(b *PollTiFlashBackoffContext) {
if e.Threshold < b.MinThreshold {
e.Threshold = b.MinThreshold
}
if e.Threshold*b.Rate > b.MaxThreshold {
e.Threshold = b.MaxThreshold
} else {
e.Threshold *= b.Rate
}
e.Counter = 0
}

// MaybeGrow grows threshold and reset counter when needed.
func (e *PollTiFlashBackoffElement) MaybeGrow(b *PollTiFlashBackoffContext) bool {
if !e.NeedGrow() {
return false
}
e.doGrow(b)
return true
}

// Remove will reset table from backoff.
func (b *PollTiFlashBackoffContext) Remove(ID int64) bool {
_, ok := b.elements[ID]
delete(b.elements, ID)
return ok
}

// Get returns pointer to inner PollTiFlashBackoffElement.
// Only exported for test.
func (b *PollTiFlashBackoffContext) Get(ID int64) (*PollTiFlashBackoffElement, bool) {
res, ok := b.elements[ID]
return res, ok
}

// Put will record table into backoff pool, if there is enough room, or returns false.
func (b *PollTiFlashBackoffContext) Put(ID int64) bool {
_, ok := b.elements[ID]
if ok {
return true
} else if b.Len() < b.Capacity {
b.elements[ID] = NewPollTiFlashBackoffElement()
return true
}
return false
}

// Len gets size of PollTiFlashBackoffContext.
func (b *PollTiFlashBackoffContext) Len() int {
return len(b.elements)
}

// NewTiFlashManagementContext creates an instance for TiFlashManagementContext.
func NewTiFlashManagementContext() *TiFlashManagementContext {
func NewTiFlashManagementContext() (*TiFlashManagementContext, error) {
c, err := NewPollTiFlashBackoffContext(PollTiFlashBackoffMinTick, PollTiFlashBackoffMaxTick, PollTiFlashBackoffCapacity, PollTiFlashBackoffRate)
if err != nil {
return nil, err
}
return &TiFlashManagementContext{
HandlePdCounter: 0,
UpdateTiFlashStoreCounter: 0,
TiFlashStores: make(map[int64]helper.StoreStat),
UpdateMap: make(map[int64]bool),
}
Backoff: c,
}, nil
}

var (
Expand All @@ -82,6 +216,14 @@ var (
PullTiFlashPdTick = atomicutil.NewUint64(30 * 5)
// UpdateTiFlashStoreTick indicates the number of intervals before we fully update TiFlash stores.
UpdateTiFlashStoreTick = atomicutil.NewUint64(5)
// PollTiFlashBackoffMaxTick is the max tick before we try to update TiFlash replica availability for one table.
PollTiFlashBackoffMaxTick TiFlashTick = 10
// PollTiFlashBackoffMinTick is the min tick before we try to update TiFlash replica availability for one table.
PollTiFlashBackoffMinTick TiFlashTick = 1
// PollTiFlashBackoffCapacity is the cache size of backoff struct.
PollTiFlashBackoffCapacity int = 1000
// PollTiFlashBackoffRate is growth rate of exponential backoff threshold.
PollTiFlashBackoffRate TiFlashTick = 1.5
)

func getTiflashHTTPAddr(host string, statusAddr string) (string, error) {
Expand Down Expand Up @@ -258,6 +400,11 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
// We only check unavailable tables here, so doesn't include blocked add partition case.
if !available {
allReplicaReady = false
enabled, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID)
if inqueue && !enabled {
logutil.BgLogger().Info("Escape checking available status due to backoff", zap.Int64("tableId", tb.ID))
continue
}

// We don't need to set accelerate schedule for this table, since it is already done in DDL, when
// 1. Add partition
Expand Down Expand Up @@ -287,12 +434,14 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex

if !avail {
logutil.BgLogger().Info("Tiflash replica is not available", zap.Int64("tableID", tb.ID), zap.Uint64("region need", uint64(regionCount)), zap.Uint64("region have", uint64(flashRegionCount)))
pollTiFlashContext.Backoff.Put(tb.ID)
err := infosync.UpdateTiFlashTableSyncProgress(context.Background(), tb.ID, float64(flashRegionCount)/float64(regionCount))
if err != nil {
return false, err
}
} else {
logutil.BgLogger().Info("Tiflash replica is available", zap.Int64("tableID", tb.ID), zap.Uint64("region need", uint64(regionCount)))
pollTiFlashContext.Backoff.Remove(tb.ID)
err := infosync.DeleteTiFlashTableSyncProgress(tb.ID)
if err != nil {
return false, err
Expand Down Expand Up @@ -418,7 +567,10 @@ func HandlePlacementRuleRoutine(ctx sessionctx.Context, d *ddl, tableList []TiFl
}

func (d *ddl) PollTiFlashRoutine() {
pollTiflashContext := NewTiFlashManagementContext()
pollTiflashContext, err := NewTiFlashManagementContext()
if err != nil {
logutil.BgLogger().Fatal("TiFlashManagement init failed", zap.Error(err))
}
for {
select {
case <-d.ctx.Done():
Expand Down
115 changes: 115 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,121 @@ func TestSetPlacementRuleFail(t *testing.T) {
require.False(t, res)
}

// Test standalone backoffer
func TestTiFlashBackoffer(t *testing.T) {
var maxTick ddl.TiFlashTick = 10
var rate ddl.TiFlashTick = 1.5
c := 2
backoff, err := ddl.NewPollTiFlashBackoffContext(1, maxTick, c, rate)
require.NoError(t, err)
mustGet := func(ID int64) *ddl.PollTiFlashBackoffElement {
e, ok := backoff.Get(ID)
require.True(t, ok)
return e
}
mustNotGrow := func(ID int64) {
e := mustGet(ID)
ori := e.Threshold
oriTotal := e.TotalCounter
c := e.Counter
growed, ok, total := backoff.Tick(ID)
require.True(t, ok)
require.False(t, growed)
require.Equal(t, ori, e.Threshold)
require.Equal(t, c+1, e.Counter)
require.Equal(t, oriTotal+1, total)
}
mustGrow := func(ID int64) {
e := mustGet(ID)
ori := e.Threshold
oriTotal := e.TotalCounter
growed, ok, total := backoff.Tick(ID)
require.True(t, ok)
require.True(t, growed)
require.Equal(t, e.Threshold, rate*ori)
require.Equal(t, 1, e.Counter)
require.Equal(t, oriTotal+1, total)
}
// Test grow
ok := backoff.Put(1)
require.True(t, ok)
require.False(t, mustGet(1).NeedGrow())
mustNotGrow(1) // 0;1 -> 1;1
mustGrow(1) // 1;1 -> 0;1.5 -> 1;1.5
mustGrow(1) // 1;1.5 -> 0;2.25 -> 1;2.25
mustNotGrow(1) // 1;2.25 -> 2;2.25
mustGrow(1) // 2;2.25 -> 0;3.375 -> 1;3.375
mustNotGrow(1) // 1;3.375 -> 2;3.375
mustNotGrow(1) // 2;3.375 -> 3;3.375
mustGrow(1) // 3;3.375 -> 0;5.0625
require.Equal(t, 8, mustGet(1).TotalCounter)

// Test converge
backoff.Put(2)
for i := 0; i < 20; i++ {
backoff.Tick(2)
}
require.Equal(t, maxTick, mustGet(2).Threshold)
require.Equal(t, 20, mustGet(2).TotalCounter)

// Test context
ok = backoff.Put(3)
require.False(t, ok)
_, ok, _ = backoff.Tick(3)
require.False(t, ok)

require.True(t, backoff.Remove(1))
require.False(t, backoff.Remove(1))
require.Equal(t, 1, backoff.Len())

// Test error context
_, err = ddl.NewPollTiFlashBackoffContext(0.5, 1, 1, 1)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(10, 1, 1, 1)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(1, 10, 0, 1)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(1, 10, 1, 0.5)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(1, 10, 1, -1)
require.Error(t, err)
}

// Test backoffer in TiFlash.
func TestTiFlashBackoff(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists ddltiflash")
tk.MustExec("create table ddltiflash(z int)")

// Not available for all tables
ddl.DisableTiFlashPoll(s.dom.DDL())
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplacePrevAvailableValue", `return(false)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`))
ddl.EnableTiFlashPoll(s.dom.DDL())
tk.MustExec("alter table ddltiflash set tiflash replica 1")

// 1, 1.5, 2.25, 3.375, 5.5625
// (1), 1, 1, 2, 3, 5
time.Sleep(ddl.PollTiFlashInterval * 5)
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
require.NotNil(t, tb)
require.False(t, tb.Meta().TiFlashReplica.Available)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplacePrevAvailableValue"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue"))

time.Sleep(ddl.PollTiFlashInterval * 3)
tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
require.NotNil(t, tb)
require.True(t, tb.Meta().TiFlashReplica.Available)
}

func TestAlterDatabaseErrorGrammar(t *testing.T) {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func initTiFlashPlacementManager(addrs []string) TiFlashPlacementManager {
m := mockTiFlashPlacementManager{}
return &m
}
logutil.BgLogger().Warn("init TiFlashPlacementManager", zap.Strings("pd addrs", addrs))
return &TiFlashPDPlacementManager{addrs: addrs}
}

Expand Down

0 comments on commit 55d0f12

Please sign in to comment.