Skip to content

Commit

Permalink
Merge branch 'master' into align-stats-healthy
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Feb 3, 2023
2 parents ed077e3 + eb11fb2 commit 30e1bdd
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 41 deletions.
21 changes: 17 additions & 4 deletions ddl/ingest/disk_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package ingest

import (
"sync"

lcom "github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -37,6 +39,7 @@ type diskRootImpl struct {
currentUsage uint64
maxQuota uint64
bcCtx *backendCtxManager
mu sync.RWMutex
}

// NewDiskRootImpl creates a new DiskRoot.
Expand All @@ -49,22 +52,32 @@ func NewDiskRootImpl(path string, bcCtx *backendCtxManager) DiskRoot {

// CurrentUsage implements DiskRoot interface.
func (d *diskRootImpl) CurrentUsage() uint64 {
return d.currentUsage
d.mu.RLock()
usage := d.currentUsage
d.mu.RUnlock()
return usage
}

// MaxQuota implements DiskRoot interface.
func (d *diskRootImpl) MaxQuota() uint64 {
return d.maxQuota
d.mu.RLock()
quota := d.maxQuota
d.mu.RUnlock()
return quota
}

// UpdateUsageAndQuota implements DiskRoot interface.
func (d *diskRootImpl) UpdateUsageAndQuota() error {
d.currentUsage = d.bcCtx.TotalDiskUsage()
totalDiskUsage := d.bcCtx.TotalDiskUsage()
sz, err := lcom.GetStorageSize(d.path)
if err != nil {
logutil.BgLogger().Error(LitErrGetStorageQuota, zap.Error(err))
return err
}
d.maxQuota = mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity)))
maxQuota := mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity)))
d.mu.Lock()
d.currentUsage = totalDiskUsage
d.maxQuota = maxQuota
d.mu.Unlock()
return nil
}
4 changes: 2 additions & 2 deletions planner/core/indexmerge_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func (ds *DataSource) buildPartialPaths4MVIndex(accessFilters []expression.Expre
case ast.JSONContains: // (json_contains(a->'$.zip', '[1, 2, 3]')
isIntersection = true
virColVals, ok = jsonArrayExpr2Exprs(ds.ctx, sf.GetArgs()[1], jsonType)
if !ok {
if !ok || len(virColVals) == 0 { // json_contains(JSON, '[]') is TRUE
return nil, false, false, nil
}
case ast.JSONOverlaps: // (json_overlaps(a->'$.zip', '[1, 2, 3]')
Expand All @@ -682,7 +682,7 @@ func (ds *DataSource) buildPartialPaths4MVIndex(accessFilters []expression.Expre
}
var ok bool
virColVals, ok = jsonArrayExpr2Exprs(ds.ctx, sf.GetArgs()[1-jsonPathIdx], jsonType)
if !ok {
if !ok || len(virColVals) == 0 { // forbid empty array for safety
return nil, false, false, nil
}
default:
Expand Down
38 changes: 36 additions & 2 deletions planner/core/indexmerge_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,31 @@ func TestMVIndexFullScan(t *testing.T) {
tk.MustGetErrMsg(`select /*+ use_index(t, kj) */ count(*) from t`, "[planner:1815]Internal : Can't find a proper physical plan for this query")
}

func TestMVIndexEmptyArray(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`create table t(j json, index kj((cast(j as signed array))))`)
tk.MustExec(`insert into t values ('[1]')`)
tk.MustExec(`insert into t values ('[1, 2]')`)
tk.MustExec(`insert into t values ('[]')`)
tk.MustExec(`insert into t values (NULL)`)

for _, cond := range []string{
"json_contains(j, '[]')",
"json_contains(j, '[1]')",
"json_contains(j, '[1, 2]')",
"json_contains(j, '[1, 10]')",
"json_overlaps(j, '[]')",
"json_overlaps(j, '[1]')",
"json_overlaps(j, '[1, 2]')",
"json_overlaps(j, '[1, 10]')",
} {
tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t) */ * from t where %v", cond)).Sort().Check(
tk.MustQuery(fmt.Sprintf("select /*+ ignore_index(t, kj) */ * from t where %v", cond)).Sort().Rows())
}
}

func TestMVIndexRandom(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -364,14 +389,23 @@ func randMVIndexCond(condType int, valOpts randMVIndexValOpts) string {
case 0: // member_of
return fmt.Sprintf(`(%v member of (j))`, randMVIndexValue(valOpts))
case 1: // json_contains
return fmt.Sprintf(`json_contains(j, '[%v, %v]')`, randMVIndexValue(valOpts), randMVIndexValue(valOpts))
return fmt.Sprintf(`json_contains(j, '%v')`, randArray(valOpts))
case 2: // json_overlaps
return fmt.Sprintf(`json_overlaps(j, '[%v, %v]')`, randMVIndexValue(valOpts), randMVIndexValue(valOpts))
return fmt.Sprintf(`json_overlaps(j, '%v')`, randArray(valOpts))
default: // others
return fmt.Sprintf(`a < %v`, rand.Intn(valOpts.distinct))
}
}

func randArray(opts randMVIndexValOpts) string {
n := rand.Intn(5) // n can be 0
var vals []string
for i := 0; i < n; i++ {
vals = append(vals, randMVIndexValue(opts))
}
return "[" + strings.Join(vals, ", ") + "]"
}

type randMVIndexValOpts struct {
valType string // INT, UNSIGNED, STR, DATE
maxStrLen int
Expand Down
4 changes: 2 additions & 2 deletions ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{})
}

// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
func PeekWaitingTTLTask(hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC", []interface{}{hbExpire.Format("2006-01-02 15:04:05")}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestParallelLockNewJob(t *testing.T) {
successCounter.Add(1)
successJob = job
} else {
logutil.BgLogger().Error("lock new job with error", zap.Error(err))
logutil.BgLogger().Info("lock new job with error", zap.Error(err))
}
wg.Done()
}()
Expand Down
56 changes: 27 additions & 29 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,45 +263,43 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) {
return
}

for len(idleScanWorkers) > 0 {
tasks, err := m.peekWaitingScanTasks(se, len(idleScanWorkers), now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err))
return
}
tasks, err := m.peekWaitingScanTasks(se, now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err))
return
}

if len(tasks) == 0 {
break
for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID))

task, err := m.lockScanTask(se, t, now)
if err != nil {
// If other nodes lock the task, it will return an error. It's expected
// so the log level is only `info`
logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err))
continue
}

for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID))
idleWorker := idleScanWorkers[0]
idleScanWorkers = idleScanWorkers[1:]

task, err := m.lockScanTask(se, t, now)
if err != nil {
// If other nodes lock the task, it will return an error. It's expected
// so the log level is only `info`
logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err))
continue
}

idleWorker := idleScanWorkers[0]
idleScanWorkers = idleScanWorkers[1:]
err = idleWorker.Schedule(task.ttlScanTask)
if err != nil {
logger.Warn("fail to schedule task", zap.Error(err))
continue
}

err = idleWorker.Schedule(task.ttlScanTask)
if err != nil {
logger.Warn("fail to schedule task", zap.Error(err))
continue
}
logger.Info("scheduled ttl task")
m.runningTasks = append(m.runningTasks, task)

logger.Info("scheduled ttl task")
m.runningTasks = append(m.runningTasks, task)
if len(idleScanWorkers) == 0 {
return
}
}
}

func (m *taskManager) peekWaitingScanTasks(se session.Session, limit int, now time.Time) ([]*cache.TTLTask, error) {
sql, args := cache.PeekWaitingTTLTask(limit, now.Add(-2*ttlTaskHeartBeatTickerInterval))
func (m *taskManager) peekWaitingScanTasks(se session.Session, now time.Time) ([]*cache.TTLTask, error) {
sql, args := cache.PeekWaitingTTLTask(now.Add(-2 * ttlTaskHeartBeatTickerInterval))
rows, err := se.ExecuteSQL(m.ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
Expand Down
37 changes: 36 additions & 1 deletion ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestParallelLockNewTask(t *testing.T) {
if err == nil {
successCounter.Add(1)
} else {
logutil.BgLogger().Error("lock new task with error", zap.Error(err))
logutil.BgLogger().Info("lock new task with error", zap.Error(err))
}
wg.Done()
}()
Expand Down Expand Up @@ -230,3 +230,38 @@ func TestTaskMetrics(t *testing.T) {
require.NoError(t, metrics.RunningTaskCnt.Write(out))
require.Equal(t, float64(1), out.GetGauge().GetValue())
}

func TestRescheduleWithError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
// insert a wrong scan task with random table id
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", 613, 1)
tk.MustExec(sql)

isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
now := time.Now()

// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1")
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
notify := make(chan struct{})
go func() {
m.RescheduleTasks(sessionFactory(), now)
notify <- struct{}{}
}()
timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

select {
case <-timeout.Done():
require.Fail(t, "reschedule didn't finish in time")
case <-notify:
}
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("waiting"))
}

0 comments on commit 30e1bdd

Please sign in to comment.