Skip to content

Commit

Permalink
Merge branch 'master' into executor_write_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Dec 27, 2022
2 parents 1c702db + 3c8f11d commit df63c08
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 40 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf {
if db.preallocedIDs == nil {
return true
}
prealloced := db.preallocedIDs.Prealloced(ti.ID)
prealloced := db.preallocedIDs.PreallocedFor(ti)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/restore/prealloc_table_id/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ go_library(
srcs = ["alloc.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id",
visibility = ["//visibility:public"],
deps = ["//br/pkg/metautil"],
deps = [
"//br/pkg/metautil",
"//parser/model",
],
)

go_test(
Expand Down
23 changes: 23 additions & 0 deletions br/pkg/restore/prealloc_table_id/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"

"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/parser/model"
)

const (
Expand Down Expand Up @@ -48,6 +49,14 @@ func New(tables []*metautil.Table) *PreallocIDs {
if t.Info.ID > max && t.Info.ID < insaneTableIDThreshold {
max = t.Info.ID
}

if t.Info.Partition != nil && t.Info.Partition.Definitions != nil {
for _, part := range t.Info.Partition.Definitions {
if part.ID > max && part.ID < insaneTableIDThreshold {
max = part.ID
}
}
}
}
return &PreallocIDs{
end: max + 1,
Expand Down Expand Up @@ -86,3 +95,17 @@ func (p *PreallocIDs) Alloc(m Allocator) error {
func (p *PreallocIDs) Prealloced(tid int64) bool {
return p.allocedFrom <= tid && tid < p.end
}

func (p *PreallocIDs) PreallocedFor(ti *model.TableInfo) bool {
if !p.Prealloced(ti.ID) {
return false
}
if ti.Partition != nil && ti.Partition.Definitions != nil {
for _, part := range ti.Partition.Definitions {
if !p.Prealloced(part.ID) {
return false
}
}
}
return true
}
38 changes: 32 additions & 6 deletions br/pkg/restore/prealloc_table_id/alloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (t *testAllocator) AdvanceGlobalIDs(n int) (int64, error) {
func TestAllocator(t *testing.T) {
type Case struct {
tableIDs []int64
partitions map[int64][]int64
hasAllocatedTo int64
successfullyAllocated []int64
shouldAllocatedTo int64
Expand Down Expand Up @@ -57,26 +58,51 @@ func TestAllocator(t *testing.T) {
successfullyAllocated: []int64{5, 6},
shouldAllocatedTo: 7,
},
{
tableIDs: []int64{1, 2, 5, 6, 7},
hasAllocatedTo: 6,
successfullyAllocated: []int64{6, 7},
shouldAllocatedTo: 13,
partitions: map[int64][]int64{
7: {8, 9, 10, 11, 12},
},
},
{
tableIDs: []int64{1, 2, 5, 6, 7, 13},
hasAllocatedTo: 9,
successfullyAllocated: []int64{13},
shouldAllocatedTo: 14,
partitions: map[int64][]int64{
7: {8, 9, 10, 11, 12},
},
},
}

run := func(t *testing.T, c Case) {
tables := make([]*metautil.Table, 0, len(c.tableIDs))
for _, id := range c.tableIDs {
tables = append(tables, &metautil.Table{
table := metautil.Table{
Info: &model.TableInfo{
ID: id,
ID: id,
Partition: &model.PartitionInfo{},
},
})
}
if c.partitions != nil {
for _, part := range c.partitions[id] {
table.Info.Partition.Definitions = append(table.Info.Partition.Definitions, model.PartitionDefinition{ID: part})
}
}
tables = append(tables, &table)
}

ids := prealloctableid.New(tables)
allocator := testAllocator(c.hasAllocatedTo)
require.NoError(t, ids.Alloc(&allocator))

allocated := make([]int64, 0, len(c.successfullyAllocated))
for _, t := range c.tableIDs {
if ids.Prealloced(t) {
allocated = append(allocated, t)
for _, t := range tables {
if ids.PreallocedFor(t.Info) {
allocated = append(allocated, t.Info.ID)
}
}
require.ElementsMatch(t, allocated, c.successfullyAllocated)
Expand Down
1 change: 0 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func RegisterMetrics() {
prometheus.MustRegister(SyncLoadHistogram)
prometheus.MustRegister(ReadStatsHistogram)
prometheus.MustRegister(JobsGauge)
prometheus.MustRegister(KeepAliveCounter)
prometheus.MustRegister(LoadPrivilegeCounter)
prometheus.MustRegister(InfoCacheCounters)
prometheus.MustRegister(LoadSchemaCounter)
Expand Down
8 changes: 0 additions & 8 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,6 @@ var (
Help: "Counter of system time jumps backward.",
})

KeepAliveCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "monitor",
Name: "keep_alive_total",
Help: "Counter of TiDB keep alive.",
})

PlanCacheCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down
11 changes: 1 addition & 10 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,16 +788,7 @@ func setupMetrics() {
systimeErrHandler := func() {
metrics.TimeJumpBackCounter.Inc()
}
callBackCount := 0
successCallBack := func() {
callBackCount++
// It is callback by monitor per second, we increase metrics.KeepAliveCounter per 5s.
if callBackCount >= 5 {
callBackCount = 0
metrics.KeepAliveCounter.Inc()
}
}
go systimemon.StartMonitor(time.Now, systimeErrHandler, successCallBack)
go systimemon.StartMonitor(time.Now, systimeErrHandler)

pushMetric(cfg.Status.MetricsAddr, time.Duration(cfg.Status.MetricsInterval)*time.Second)
}
Expand Down
1 change: 1 addition & 0 deletions util/cpu/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ go_test(
srcs = ["cpu_test.go"],
embed = [":cpu"],
flaky = True,
race = "on",
deps = ["@com_github_stretchr_testify//require"],
)
7 changes: 3 additions & 4 deletions util/cpu/cpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
)

func TestCPUValue(t *testing.T) {
Observer := NewCPUObserver()
Observer.Start()
observer := NewCPUObserver()
exit := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
Expand All @@ -42,11 +41,11 @@ func TestCPUValue(t *testing.T) {
}
}()
}
Observer.Start()
observer.Start()
time.Sleep(5 * time.Second)
require.GreaterOrEqual(t, GetCPUUsage(), 0.0)
require.Less(t, GetCPUUsage(), 1.0)
Observer.Stop()
observer.Stop()
close(exit)
wg.Wait()
}
9 changes: 1 addition & 8 deletions util/systimemon/systime_mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,16 @@ import (
)

// StartMonitor calls systimeErrHandler if system time jump backward.
func StartMonitor(now func() time.Time, systimeErrHandler func(), successCallback func()) {
func StartMonitor(now func() time.Time, systimeErrHandler func()) {
logutil.BgLogger().Info("start system time monitor")
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
tickCount := 0
for {
last := now().UnixNano()
<-tick.C
if now().UnixNano() < last {
logutil.BgLogger().Error("system time jump backward", zap.Int64("last", last))
systimeErrHandler()
}
// call successCallback per second.
tickCount++
if tickCount >= 10 {
tickCount = 0
successCallback()
}
}
}
2 changes: 1 addition & 1 deletion util/systimemon/systime_mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestSystimeMonitor(t *testing.T) {
return time.Now().Add(-2 * time.Second)
}, func() {
errTriggered.Store(true)
}, func() {})
})

require.Eventually(t, errTriggered.Load, time.Second, 10*time.Millisecond)
}

0 comments on commit df63c08

Please sign in to comment.