Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: implement backoff pool for TiFlash Cluster Management #32317

Merged
merged 32 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c5b65ee
initial
CalvinNeo Feb 11, 2022
2f81e5b
fix
CalvinNeo Feb 11, 2022
4d4e07b
add
CalvinNeo Feb 14, 2022
3297a12
fix
CalvinNeo Feb 14, 2022
9383258
fix
CalvinNeo Feb 14, 2022
6a5e500
Merge remote-tracking branch 'upstream/master' into batch_backoff
CalvinNeo Feb 14, 2022
0d06260
fix
CalvinNeo Feb 14, 2022
d055b5c
change log
CalvinNeo Feb 14, 2022
bbf20c0
fix
CalvinNeo Feb 14, 2022
b4abc79
fix
CalvinNeo Feb 15, 2022
0ab8302
Merge branch 'master' into batch_backoff
CalvinNeo Feb 15, 2022
946d7d0
fix
CalvinNeo Feb 16, 2022
60483ec
Merge branch 'batch_backoff' of github.com:CalvinNeo/tidb into batch_…
CalvinNeo Feb 16, 2022
ef99368
add
CalvinNeo Feb 16, 2022
24571b6
fix
CalvinNeo Feb 16, 2022
ead33d8
Merge branch 'master' into batch_backoff
CalvinNeo Feb 17, 2022
0709e8e
Merge branch 'master' into batch_backoff
CalvinNeo Feb 17, 2022
c6ac2f9
Merge branch 'master' into batch_backoff
CalvinNeo Feb 17, 2022
d8e5fd8
Merge branch 'master' into batch_backoff
CalvinNeo Feb 17, 2022
b130440
Merge branch 'master' into batch_backoff
CalvinNeo Feb 18, 2022
3915ae9
typo
CalvinNeo Feb 18, 2022
b7950a6
Merge branch 'master' into batch_backoff
CalvinNeo Feb 18, 2022
2646a38
Merge branch 'batch_backoff' of github.com:CalvinNeo/tidb into batch_…
CalvinNeo Feb 18, 2022
4ab1945
adapt master
CalvinNeo Feb 18, 2022
4dba1a5
Merge branch 'master' into batch_backoff
CalvinNeo Feb 21, 2022
e107c93
Merge remote-tracking branch 'upstream/master' into batch_backoff
CalvinNeo Feb 23, 2022
a306b73
Merge branch 'batch_backoff' of github.com:CalvinNeo/tidb into batch_…
CalvinNeo Feb 23, 2022
61230bd
fix
CalvinNeo Feb 23, 2022
34942ac
fix
CalvinNeo Feb 23, 2022
c1e2e52
addr review
CalvinNeo Mar 3, 2022
9791471
fix
CalvinNeo Mar 3, 2022
cd62327
Merge branch 'master' into batch_backoff
ti-chi-bot Mar 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 155 additions & 3 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,156 @@ type TiFlashReplicaStatus struct {
IsPartition bool
}

// TiFlashTick is type for backoff threshold.
type TiFlashTick float64

// PollTiFlashBackoffElement records backoff for each TiFlash Table.
// When `Counter` reached `Threshold`, `Threshold` shall grow.
type PollTiFlashBackoffElement struct {
Counter int
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
Threshold TiFlashTick
TotalCounter int
}

// NewPollTiFlashBackoffElement initialize backoff element for a TiFlash table.
func NewPollTiFlashBackoffElement() *PollTiFlashBackoffElement {
return &PollTiFlashBackoffElement{
Counter: 0,
Threshold: PollTiFlashBackoffMinTick,
TotalCounter: 0,
}
}

// PollTiFlashBackoffContext is a collection of all backoff states.
type PollTiFlashBackoffContext struct {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
MinTick TiFlashTick
MaxTick TiFlashTick
// Capacity limits tables a backoff pool can handle, in order to limit handling of big tables.
Capacity int
Rate TiFlashTick
elements map[int64]*PollTiFlashBackoffElement
}

// NewPollTiFlashBackoffContext creates an instance of PollTiFlashBackoffContext.
func NewPollTiFlashBackoffContext(MinTick, MaxTick TiFlashTick, Capacity int, Rate TiFlashTick) (*PollTiFlashBackoffContext, error) {
if MaxTick < MinTick {
return nil, fmt.Errorf("`MaxTick` should always be larger than `MinTick`")
}
if MinTick < 1 {
return nil, fmt.Errorf("`MinTick` should not be less than 1")
}
if Capacity < 0 {
return nil, fmt.Errorf("negative `Capacity`")
}
if Rate <= 1 {
return nil, fmt.Errorf("`Rate` should always be larger than 1")
}
return &PollTiFlashBackoffContext{
MinTick: MinTick,
MaxTick: MaxTick,
Capacity: Capacity,
elements: make(map[int64]*PollTiFlashBackoffElement),
Rate: Rate,
}, nil
}

// TiFlashManagementContext is the context for TiFlash Replica Management
type TiFlashManagementContext struct {
TiFlashStores map[int64]helper.StoreStat
HandlePdCounter uint64
UpdateTiFlashStoreCounter uint64
UpdateMap map[int64]bool
Backoff *PollTiFlashBackoffContext
}

// Tick will first check increase Counter.
func (b *PollTiFlashBackoffContext) Tick(ID int64) (bool, bool, int) {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
e, ok := b.Get(ID)
if !ok {
return false, false, 0
}
growed := e.MaybeGrow(b)
e.Counter += 1
e.TotalCounter += 1
zimulala marked this conversation as resolved.
Show resolved Hide resolved
return growed, true, e.TotalCounter
}

func (e *PollTiFlashBackoffElement) limit() int {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
return int(e.Threshold)
}

// NeedGrow returns if we need to grow
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
func (e *PollTiFlashBackoffElement) NeedGrow() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove this function? It only used in one place and only one line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to export this for testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

return e.Counter >= e.limit()
}

func (e *PollTiFlashBackoffElement) doGrow(b *PollTiFlashBackoffContext) {
defer func() {
e.Counter = 0
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
}()
if e.Threshold < b.MinTick {
e.Threshold = b.MinTick
}
if e.Threshold*b.Rate > b.MaxTick {
e.Threshold = b.MaxTick
} else {
e.Threshold *= b.Rate
}
}

// MaybeGrow grows threshold and reset counter when need
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
func (e *PollTiFlashBackoffElement) MaybeGrow(b *PollTiFlashBackoffContext) bool {
if !e.NeedGrow() {
return false
}
e.doGrow(b)
return true
}

// Remove will reset table from backoff
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
func (b *PollTiFlashBackoffContext) Remove(ID int64) bool {
_, ok := b.elements[ID]
delete(b.elements, ID)
return ok
}

// Get returns pointer to inner PollTiFlashBackoffElement.
// Only exported for test.
func (b *PollTiFlashBackoffContext) Get(ID int64) (*PollTiFlashBackoffElement, bool) {
res, ok := b.elements[ID]
return res, ok
}

// Put will record table into backoff pool, if there is enough room, or returns false.
func (b *PollTiFlashBackoffContext) Put(ID int64) (*PollTiFlashBackoffElement, bool) {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
e, ok := b.elements[ID]
if ok {
return e, true
} else if b.Len() < b.Capacity {
b.elements[ID] = NewPollTiFlashBackoffElement()
return b.elements[ID], true
}
return nil, false
}

// Len gets size of PollTiFlashBackoffContext.
func (b *PollTiFlashBackoffContext) Len() int {
return len(b.elements)
}

// NewTiFlashManagementContext creates an instance for TiFlashManagementContext.
func NewTiFlashManagementContext() *TiFlashManagementContext {
func NewTiFlashManagementContext() (*TiFlashManagementContext, error) {
c, err := NewPollTiFlashBackoffContext(PollTiFlashBackoffMinTick, PollTiFlashBackoffMaxTick, PollTiFlashBackoffCapacity, PollTiFlashBackoffRate)
if err != nil {
return nil, err
}
return &TiFlashManagementContext{
HandlePdCounter: 0,
UpdateTiFlashStoreCounter: 0,
TiFlashStores: make(map[int64]helper.StoreStat),
UpdateMap: make(map[int64]bool),
}
Backoff: c,
}, nil
}

var (
Expand All @@ -82,6 +216,14 @@ var (
PullTiFlashPdTick = atomicutil.NewUint64(30 * 5)
// UpdateTiFlashStoreTick indicates the number of intervals before we fully update TiFlash stores.
UpdateTiFlashStoreTick = atomicutil.NewUint64(5)
// PollTiFlashBackoffMaxTick is the max tick before we try to update TiFlash replica availability for one table.
PollTiFlashBackoffMaxTick TiFlashTick = 10
// PollTiFlashBackoffMinTick is the min tick before we try to update TiFlash replica availability for one table.
PollTiFlashBackoffMinTick TiFlashTick = 1
// PollTiFlashBackoffCapacity is the cache size of backoff struct.
PollTiFlashBackoffCapacity int = 1000
// PollTiFlashBackoffRate is growth rate of exponential backoff threshold.
PollTiFlashBackoffRate TiFlashTick = 1.5
)

func getTiflashHTTPAddr(host string, statusAddr string) (string, error) {
Expand Down Expand Up @@ -258,6 +400,11 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
// We only check unavailable tables here, so doesn't include blocked add partition case.
if !available {
allReplicaReady = false
growed, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID)
if inqueue && !growed {
logutil.BgLogger().Info("Escape checking available status due to backoff", zap.Int64("tableId", tb.ID))
continue
}

// We don't need to set accelerate schedule for this table, since it is already done in DDL, when
// 1. Add partition
Expand Down Expand Up @@ -287,12 +434,14 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex

if !avail {
logutil.BgLogger().Info("Tiflash replica is not available", zap.Int64("tableID", tb.ID), zap.Uint64("region need", uint64(regionCount)), zap.Uint64("region have", uint64(flashRegionCount)))
pollTiFlashContext.Backoff.Put(tb.ID)
err := infosync.UpdateTiFlashTableSyncProgress(context.Background(), tb.ID, float64(flashRegionCount)/float64(regionCount))
if err != nil {
return false, err
}
} else {
logutil.BgLogger().Info("Tiflash replica is available", zap.Int64("tableID", tb.ID), zap.Uint64("region need", uint64(regionCount)))
pollTiFlashContext.Backoff.Remove(tb.ID)
err := infosync.DeleteTiFlashTableSyncProgress(tb.ID)
if err != nil {
return false, err
Expand Down Expand Up @@ -418,7 +567,10 @@ func HandlePlacementRuleRoutine(ctx sessionctx.Context, d *ddl, tableList []TiFl
}

func (d *ddl) PollTiFlashRoutine() {
pollTiflashContext := NewTiFlashManagementContext()
pollTiflashContext, err := NewTiFlashManagementContext()
if err != nil {
logutil.BgLogger().Fatal("TiFlashManagement init failed", zap.Error(err))
}
for {
select {
case <-d.ctx.Done():
Expand Down
115 changes: 115 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,121 @@ func TestSetPlacementRuleFail(t *testing.T) {
require.False(t, res)
}

// Test standalone backoffer
func TestTiFlashBackoffer(t *testing.T) {
var maxTick ddl.TiFlashTick = 10
var rate ddl.TiFlashTick = 1.5
c := 2
backoff, err := ddl.NewPollTiFlashBackoffContext(1, maxTick, c, rate)
require.NoError(t, err)
mustGet := func(ID int64) *ddl.PollTiFlashBackoffElement {
e, ok := backoff.Get(ID)
require.True(t, ok)
return e
}
mustNotGrow := func(ID int64) {
e := mustGet(ID)
ori := e.Threshold
oriTotal := e.TotalCounter
c := e.Counter
growed, ok, total := backoff.Tick(ID)
require.True(t, ok)
require.False(t, growed)
require.Equal(t, ori, e.Threshold)
require.Equal(t, c+1, e.Counter)
require.Equal(t, oriTotal+1, total)
}
mustGrow := func(ID int64) {
e := mustGet(ID)
ori := e.Threshold
oriTotal := e.TotalCounter
growed, ok, total := backoff.Tick(ID)
require.True(t, ok)
require.True(t, growed)
require.Equal(t, e.Threshold, rate*ori)
require.Equal(t, 1, e.Counter)
require.Equal(t, oriTotal+1, total)
}
// Test grow
_, ok := backoff.Put(1)
require.True(t, ok)
require.False(t, mustGet(1).NeedGrow())
mustNotGrow(1) // 0;1 -> 1;1
mustGrow(1) // 1;1 -> 0;1.5 -> 1;1.5
mustGrow(1) // 1;1.5 -> 0;2.25 -> 1;2.25
mustNotGrow(1) // 1;2.25 -> 2;2.25
mustGrow(1) // 2;2.25 -> 0;3.375 -> 1;3.375
mustNotGrow(1) // 1;3.375 -> 2;3.375
mustNotGrow(1) // 2;3.375 -> 3;3.375
mustGrow(1) // 3;3.375 -> 0;5.0625
require.Equal(t, 8, mustGet(1).TotalCounter)

// Test converge
backoff.Put(2)
for i := 0; i < 20; i++ {
backoff.Tick(2)
}
require.Equal(t, maxTick, mustGet(2).Threshold)
require.Equal(t, 20, mustGet(2).TotalCounter)

// Test context
_, ok = backoff.Put(3)
require.False(t, ok)
_, ok, _ = backoff.Tick(3)
require.False(t, ok)

require.True(t, backoff.Remove(1))
require.False(t, backoff.Remove(1))
require.Equal(t, 1, backoff.Len())

// Test error context
_, err = ddl.NewPollTiFlashBackoffContext(0.5, 1, 1, 1)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(10, 1, 1, 1)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(1, 10, 0, 1)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(1, 10, 1, 0.5)
require.Error(t, err)
_, err = ddl.NewPollTiFlashBackoffContext(1, 10, 1, -1)
require.Error(t, err)
}

// Test backoffer in TiFlash.
func TestTiFlashBackoff(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists ddltiflash")
tk.MustExec("create table ddltiflash(z int)")

// Not available for all tables
ddl.DisableTiFlashPoll(s.dom.DDL())
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplacePrevAvailableValue", `return(false)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`))
ddl.EnableTiFlashPoll(s.dom.DDL())
tk.MustExec("alter table ddltiflash set tiflash replica 1")

// 1, 1.5, 2.25, 3.375, 5.5625
// (1), 1, 1, 2, 3, 5
time.Sleep(ddl.PollTiFlashInterval * 5)
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
require.NotNil(t, tb)
require.False(t, tb.Meta().TiFlashReplica.Available)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplacePrevAvailableValue"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue"))

time.Sleep(ddl.PollTiFlashInterval * 3)
tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
require.NotNil(t, tb)
require.True(t, tb.Meta().TiFlashReplica.Available)
}

func TestAlterDatabaseErrorGrammar(t *testing.T) {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func initTiFlashPlacementManager(addrs []string) TiFlashPlacementManager {
m := mockTiFlashPlacementManager{}
return &m
}
logutil.BgLogger().Warn("init TiFlashPlacementManager", zap.Strings("pd addrs", addrs))
return &TiFlashPDPlacementManager{addrs: addrs}
}

Expand Down