Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Nov 28, 2024
1 parent 16936a5 commit 0494095
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 76 deletions.
8 changes: 4 additions & 4 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
// Why do we need to add `is_index=1` in the SQL?
// because it is aligned to the `initStatsTopN` function, which only loads the topn of the index too.
// the other will be loaded by sync load.
sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %? order by table_id"
sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl) */ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %? order by table_id"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -435,7 +435,7 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %? order by table_id"
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %? and is_index=1 order by table_id"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -629,7 +629,7 @@ func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint6
return errors.Trace(err)
}
} else {
sql := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
sql := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl) */ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where is_index=1 order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -668,7 +668,7 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %? order by table_id, is_index, hist_id, bucket_id"
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %? and is_index=1 order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/handletest/statstest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ func TestInitStats(t *testing.T) {
require.NoError(t, h.Update(context.Background(), is))
// Index and pk are loaded.
needed := fmt.Sprintf(`Table:%v RealtimeCount:6
column:1 ndv:6 totColSize:0
column:2 ndv:6 totColSize:6
column:3 ndv:6 totColSize:6
index:1 ndv:6
num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0
num: 1 lower_bound: 2 upper_bound: 2 repeats: 1 ndv: 0
Expand Down
1 change: 0 additions & 1 deletion pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/types",
"//pkg/table",
"//pkg/types",
"//pkg/util",
"//pkg/util/intest",
Expand Down
46 changes: 23 additions & 23 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -314,47 +313,48 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
}

item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
statsTbl, ok := s.statsHandle.Get(item.TableID)

if !ok {
return nil
}
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
tblInfo, ok := s.statsHandle.TableInfoByID(is, item.TableID)
tbl, ok := s.statsHandle.TableInfoByID(is, item.TableID)
if !ok {
return nil
}
isPkIsHandle := tblInfo.Meta().PKIsHandle
if !tbl.ColAndIdxExistenceMap.Checked() {
tbl = tbl.Copy()
for _, col := range tbl.HistColl.GetColSlice() {
if tblInfo.Meta().FindColumnByID(col.ID) == nil {
tbl.HistColl.DelCol(col.ID)
tbl.ColAndIdxExistenceMap.DeleteColAnalyzed(col.ID)
tblInfo := tbl.Meta()
isPkIsHandle := tblInfo.PKIsHandle
if !statsTbl.ColAndIdxExistenceMap.Checked() {
statsTbl = statsTbl.Copy()
for _, col := range statsTbl.HistColl.GetColSlice() {
if tblInfo.FindColumnByID(col.ID) == nil {
statsTbl.HistColl.DelCol(col.ID)
statsTbl.ColAndIdxExistenceMap.DeleteColAnalyzed(col.ID)
}
}
for _, idx := range tbl.HistColl.GetIdxSlice() {
if tblInfo.Meta().FindIndexByID(idx.ID) == nil {
tbl.HistColl.DelIdx(idx.ID)
tbl.ColAndIdxExistenceMap.DeleteIdxAnalyzed(idx.ID)
for _, idx := range statsTbl.HistColl.GetIdxSlice() {
if tblInfo.FindIndexByID(idx.ID) == nil {
statsTbl.HistColl.DelIdx(idx.ID)
statsTbl.ColAndIdxExistenceMap.DeleteIdxAnalyzed(idx.ID)
}
}
tbl.ColAndIdxExistenceMap.SetChecked()
s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
statsTbl.ColAndIdxExistenceMap.SetChecked()
s.statsHandle.UpdateStatsCache([]*statistics.Table{statsTbl}, nil)
}
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
index, loadNeeded := statsTbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
return nil
}
if index != nil {
wrapper.idxInfo = index.Info
} else {
wrapper.idxInfo = tblInfo.Meta().FindIndexByID(item.ID)
wrapper.idxInfo = tblInfo.FindIndexByID(item.ID)
}
} else {
col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
col, loadNeeded, analyzed := statsTbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
return nil
}
Expand All @@ -363,7 +363,7 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
} else {
// Now, we cannot init the column info in the ColAndIdxExistenceMap when to disable lite-init-stats.
// so we have to get the column info from the domain.
wrapper.colInfo = tblInfo.Meta().GetColumnByID(item.ID)
wrapper.colInfo = tblInfo.GetColumnByID(item.ID)
}
if skipTypes != nil {
_, skip := skipTypes[types.TypeToStr(wrapper.colInfo.FieldType.GetType(), wrapper.colInfo.FieldType.GetCharset())]
Expand All @@ -377,7 +377,7 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
// Otherwise, it will trigger the sync/async load again, even if the column has not been analyzed.
if loadNeeded && !analyzed {
wrapper.col = statistics.EmptyColumn(item.TableID, isPkIsHandle, wrapper.colInfo)
s.updateCachedItem(tblInfo, item, wrapper.col, wrapper.idx, task.Item.FullLoad)
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
return nil
}
}
Expand All @@ -402,7 +402,7 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
}
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
if needUpdate {
s.updateCachedItem(tblInfo, item, wrapper.col, wrapper.idx, task.Item.FullLoad)
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
}
return nil
}
Expand Down Expand Up @@ -578,7 +578,7 @@ func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, r
}

// updateCachedItem updates the column/index hist to global statsCache.
func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) {
func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) {
s.StatsLoad.Lock()
defer s.StatsLoad.Unlock()
// Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions
Expand Down
30 changes: 15 additions & 15 deletions tests/integrationtest/r/executor/issues.result
Original file line number Diff line number Diff line change
Expand Up @@ -969,18 +969,18 @@ Limit_7 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_9 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt where val = 125 limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_11 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 125.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
Limit_8 0.26 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 0.26 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_11 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:unInitialized] <memory> <disk>
explain analyze select * from pt where val = 125 limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 204.80 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_13 204.80 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 204.80 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 204.80 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
Limit_8 0.26 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_13 0.26 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:unInitialized] <memory> <disk>
explain analyze select * from pt order by id limit 100; # expected distsql concurrency 7, but currently get 1, see issue #55190
id estRows actRows task access object execution info operator info memory disk
Limit_10 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
Expand All @@ -995,11 +995,11 @@ Limit_11 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_19 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt where val = 126 order by id limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
└─TableFullScan_17 125.00 <actRows> cop[tikv] table:pt NULL keep order:true, stats:partial[val:missing] <memory> <disk>
Limit_11 0.26 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 0.26 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
└─TableFullScan_17 256.00 <actRows> cop[tikv] table:pt NULL keep order:true, stats:partial[val:unInitialized] <memory> <disk>
CREATE TABLE test_55837 (col1 int(4) NOT NULL, col2 bigint(4) NOT NULL, KEY col2_index (col2));
insert into test_55837 values(0,1725292800),(0,1725292800);
select from_unixtime( if(col2 >9999999999, col2/1000, col2), '%Y-%m-%d %H:%i:%s') as result from test_55837;
Expand Down
14 changes: 7 additions & 7 deletions tests/integrationtest/r/executor/partition/issues.result
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,13 @@ join t on c.txt_account_id = t.txn_account_id
and t.broker = '0009'
and c.occur_trade_date = '2022-11-17' and c.serial_id = t.serial_id;
id estRows task access object operator info
IndexJoin_20 0.80 root inner join, inner:TableReader_19, outer key:executor__partition__issues.t.txn_account_id, executor__partition__issues.t.serial_id, inner key:executor__partition__issues.c.txt_account_id, executor__partition__issues.c.serial_id, equal cond:eq(executor__partition__issues.t.serial_id, executor__partition__issues.c.serial_id), eq(executor__partition__issues.t.txn_account_id, executor__partition__issues.c.txt_account_id)
├─TableReader_25(Build) 0.80 root data:Selection_24
│ └─Selection_24 0.80 cop[tikv] eq(executor__partition__issues.t.broker, "0009"), not(isnull(executor__partition__issues.t.serial_id))
│ └─TableFullScan_23 1.00 cop[tikv] table:t keep order:false, stats:partial[serial_id:missing]
└─TableReader_19(Probe) 0.80 root partition:all data:Selection_18
└─Selection_18 0.80 cop[tikv] eq(executor__partition__issues.c.occur_trade_date, 2022-11-17 00:00:00.000000)
└─TableRangeScan_17 0.80 cop[tikv] table:c range: decided by [eq(executor__partition__issues.c.txt_account_id, executor__partition__issues.t.txn_account_id) eq(executor__partition__issues.c.serial_id, executor__partition__issues.t.serial_id) eq(executor__partition__issues.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false
IndexJoin_20 1.00 root inner join, inner:TableReader_19, outer key:executor__partition__issues.t.txn_account_id, executor__partition__issues.t.serial_id, inner key:executor__partition__issues.c.txt_account_id, executor__partition__issues.c.serial_id, equal cond:eq(executor__partition__issues.t.serial_id, executor__partition__issues.c.serial_id), eq(executor__partition__issues.t.txn_account_id, executor__partition__issues.c.txt_account_id)
├─TableReader_25(Build) 1.00 root data:Selection_24
│ └─Selection_24 1.00 cop[tikv] eq(executor__partition__issues.t.broker, "0009"), not(isnull(executor__partition__issues.t.serial_id))
│ └─TableFullScan_23 1.00 cop[tikv] table:t keep order:false, stats:partial[serial_id:unInitialized]
└─TableReader_19(Probe) 1.00 root partition:all data:Selection_18
└─Selection_18 1.00 cop[tikv] eq(executor__partition__issues.c.occur_trade_date, 2022-11-17 00:00:00.000000)
└─TableRangeScan_17 1.00 cop[tikv] table:c range: decided by [eq(executor__partition__issues.c.txt_account_id, executor__partition__issues.t.txn_account_id) eq(executor__partition__issues.c.serial_id, executor__partition__issues.t.serial_id) eq(executor__partition__issues.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false
set @@tidb_opt_advanced_join_hint=default;
set tidb_partition_prune_mode=default;
drop table if exists t;
Expand Down
Loading

0 comments on commit 0494095

Please sign in to comment.