Skip to content

Commit

Permalink
Merge branch 'master' into disable_gc_aware
Browse files Browse the repository at this point in the history
  • Loading branch information
chrysan authored Dec 20, 2022
2 parents b63d6fd + 017901d commit ffc2b26
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 118 deletions.
85 changes: 38 additions & 47 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3511,17 +3511,39 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta
return nextRange, nil
}

func keyColumnsIncludeAllPartitionColumns(keyColumns []int, pe *tables.PartitionExpr) bool {
tmp := make(map[int]struct{}, len(keyColumns))
for _, offset := range keyColumns {
tmp[offset] = struct{}{}
func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []int {
keyColOffsets := make([]int, len(keyColIDs))
for i, colID := range keyColIDs {
offset := -1
for j, col := range pt.Cols() {
if colID == col.ID {
offset = j
break
}
}
if offset == -1 {
return nil
}
keyColOffsets[i] = offset
}

pe, err := pt.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil
}

offsetMap := make(map[int]struct{})
for _, offset := range keyColOffsets {
offsetMap[offset] = struct{}{}
}
for _, offset := range pe.ColumnOffset {
if _, ok := tmp[offset]; !ok {
return false
if _, ok := offsetMap[offset]; !ok {
return nil
}
}
return true
return keyColOffsets
}

func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table, schema *expression.Schema, partitionInfo *plannercore.PartitionInfo,
Expand All @@ -3536,45 +3558,16 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table
return nil, false, nil, err
}

// check whether can runtime prune.
type partitionExpr interface {
PartitionExpr() (*tables.PartitionExpr, error)
}
pe, err := tbl.(partitionExpr).PartitionExpr()
if err != nil {
return nil, false, nil, err
}

// recalculate key column offsets
if len(lookUpContent) == 0 {
return nil, false, nil, nil
}
if lookUpContent[0].keyColIDs == nil {
return nil, false, nil, plannercore.ErrInternal.GenWithStack("cannot get column IDs when dynamic pruning")
}
keyColOffsets := make([]int, len(lookUpContent[0].keyColIDs))
for i, colID := range lookUpContent[0].keyColIDs {
offset := -1
for j, col := range partitionTbl.Cols() {
if colID == col.ID {
offset = j
break
}
}
if offset == -1 {
return nil, false, nil, plannercore.ErrInternal.GenWithStack("invalid column offset when dynamic pruning")
}
keyColOffsets[i] = offset
}

offsetMap := make(map[int]bool)
for _, offset := range keyColOffsets {
offsetMap[offset] = true
}
for _, offset := range pe.ColumnOffset {
if _, ok := offsetMap[offset]; !ok {
return condPruneResult, false, nil, nil
}
keyColOffsets := getPartitionKeyColOffsets(lookUpContent[0].keyColIDs, partitionTbl)
if len(keyColOffsets) == 0 {
return condPruneResult, false, nil, nil
}

locateKey := make([]types.Datum, len(partitionTbl.Cols()))
Expand Down Expand Up @@ -4149,12 +4142,6 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
partitionInfo := &v.PartitionInfo
usedPartitionList, err := builder.partitionPruning(pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
Expand All @@ -4165,8 +4152,12 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
usedPartitions[p.GetPhysicalID()] = p
}
var kvRanges []kv.KeyRange
var keyColOffsets []int
if len(lookUpContents) > 0 {
keyColOffsets = getPartitionKeyColOffsets(lookUpContents[0].keyColIDs, pt)
}
if v.IsCommonHandle {
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
if len(keyColOffsets) > 0 {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
// lookUpContentsByPID groups lookUpContents by pid(partition) so that kv ranges for same partition can be merged.
Expand Down Expand Up @@ -4212,7 +4203,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte

handles, lookUpContents := dedupHandles(lookUpContents)

if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
if len(keyColOffsets) > 0 {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
for _, content := range lookUpContents {
Expand Down
20 changes: 11 additions & 9 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,17 +428,19 @@ PARTITIONS 1`)

// Why does the t2.prefiller need be at least 2^32 ? If smaller the bug will not appear!?!
tk.MustExec("insert into t2 values ( pow(2,32), 1, 1), ( pow(2,32)+1, 2, 0)")
tk.MustExec(`analyze table t1`)
tk.MustExec(`analyze table t2`)

// Why must it be = 1 and not 2?
tk.MustQuery("explain select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows("" +
"IndexJoin_15 10.00 root inner join, inner:TableReader_14, outer key:test.t2.prefiller, inner key:test.t1.id, equal cond:eq(test.t2.prefiller, test.t1.id)]\n" +
"[├─HashAgg_25(Build) 8.00 root group by:test.t2.prefiller, funcs:firstrow(test.t2.prefiller)->test.t2.prefiller]\n" +
"[│ └─TableReader_26 8.00 root data:HashAgg_20]\n" +
"[│ └─HashAgg_20 8.00 cop[tikv] group by:test.t2.prefiller, ]\n" +
"[│ └─Selection_24 10.00 cop[tikv] eq(test.t2.postfiller, 1)]\n" +
"[│ └─TableFullScan_23 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo]\n" +
"[└─TableReader_14(Probe) 8.00 root partition:all data:TableRangeScan_13]\n" +
"[ └─TableRangeScan_13 8.00 cop[tikv] table:t1 range: decided by [eq(test.t1.id, test.t2.prefiller)], keep order:false, stats:pseudo"))
tk.MustQuery("explain format='brief' select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows(""+
`IndexJoin 1.25 root inner join, inner:TableReader, outer key:test.t2.prefiller, inner key:test.t1.id, equal cond:eq(test.t2.prefiller, test.t1.id)`,
`├─HashAgg(Build) 1.00 root group by:test.t2.prefiller, funcs:firstrow(test.t2.prefiller)->test.t2.prefiller`,
`│ └─TableReader 1.00 root data:HashAgg`,
`│ └─HashAgg 1.00 cop[tikv] group by:test.t2.prefiller, `,
`│ └─Selection 1.00 cop[tikv] eq(test.t2.postfiller, 1)`,
`│ └─TableFullScan 2.00 cop[tikv] table:t2 keep order:false`,
`└─TableReader(Probe) 1.00 root partition:all data:TableRangeScan`,
` └─TableRangeScan 1.00 cop[tikv] table:t1 range: decided by [eq(test.t1.id, test.t2.prefiller)], keep order:false, stats:pseudo`))
tk.MustQuery("show warnings").Check(testkit.Rows())
// without fix it fails with: "runtime error: index out of range [0] with length 0"
tk.MustQuery("select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows())
Expand Down
3 changes: 2 additions & 1 deletion executor/oomtest/oom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ func (h *oomCapture) Write(entry zapcore.Entry, fields []zapcore.Field) error {
return nil
}
// They are just common background task and not related to the oom.
if entry.Message == "SetTiFlashGroupConfig" {
if entry.Message == "SetTiFlashGroupConfig" ||
entry.Message == "record table item load status failed due to not finding item" {
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3831,3 +3831,72 @@ func TestIssue21732(t *testing.T) {
})
}
}

func TestIssue39999(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)

tk.MustExec(`create schema test39999`)
tk.MustExec(`use test39999`)
tk.MustExec(`drop table if exists c, t`)
tk.MustExec("CREATE TABLE `c` (" +
"`serial_id` varchar(24)," +
"`occur_trade_date` date," +
"`txt_account_id` varchar(24)," +
"`capital_sub_class` varchar(10)," +
"`occur_amount` decimal(16,2)," +
"`broker` varchar(10)," +
"PRIMARY KEY (`txt_account_id`,`occur_trade_date`,`serial_id`) /*T![clustered_index] CLUSTERED */," +
"KEY `idx_serial_id` (`serial_id`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci " +
"PARTITION BY RANGE COLUMNS(`serial_id`) (" +
"PARTITION `p202209` VALUES LESS THAN ('20221001')," +
"PARTITION `p202210` VALUES LESS THAN ('20221101')," +
"PARTITION `p202211` VALUES LESS THAN ('20221201')" +
")")

tk.MustExec("CREATE TABLE `t` ( " +
"`txn_account_id` varchar(24), " +
"`account_id` varchar(32), " +
"`broker` varchar(10), " +
"PRIMARY KEY (`txn_account_id`) /*T![clustered_index] CLUSTERED */ " +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci")

tk.MustExec("INSERT INTO `c` (serial_id, txt_account_id, capital_sub_class, occur_trade_date, occur_amount, broker) VALUES ('2022111700196920','04482786','CUST','2022-11-17',-2.01,'0009')")
tk.MustExec("INSERT INTO `t` VALUES ('04482786','1142927','0009')")

tk.MustExec(`set tidb_partition_prune_mode='dynamic'`)
tk.MustExec(`analyze table c`)
tk.MustExec(`analyze table t`)
query := `select
/*+ inl_join(c) */
c.occur_amount
from
c
join t on c.txt_account_id = t.txn_account_id
and t.broker = '0009'
and c.occur_trade_date = '2022-11-17'`
tk.MustQuery("explain " + query).Check(testkit.Rows(""+
"IndexJoin_22 1.00 root inner join, inner:TableReader_21, outer key:test39999.t.txn_account_id, inner key:test39999.c.txt_account_id, equal cond:eq(test39999.t.txn_account_id, test39999.c.txt_account_id)",
"├─TableReader_27(Build) 1.00 root data:Selection_26",
"│ └─Selection_26 1.00 cop[tikv] eq(test39999.t.broker, \"0009\")",
"│ └─TableFullScan_25 1.00 cop[tikv] table:t keep order:false",
"└─TableReader_21(Probe) 1.00 root partition:all data:Selection_20",
" └─Selection_20 1.00 cop[tikv] eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)",
" └─TableRangeScan_19 1.00 cop[tikv] table:c range: decided by [eq(test39999.c.txt_account_id, test39999.t.txn_account_id) eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false"))
tk.MustQuery(query).Check(testkit.Rows("-2.01"))

// Add the missing partition key part.
tk.MustExec(`alter table t add column serial_id varchar(24) default '2022111700196920'`)
query += ` and c.serial_id = t.serial_id`
tk.MustQuery(query).Check(testkit.Rows("-2.01"))
tk.MustQuery("explain " + query).Check(testkit.Rows(""+
`IndexJoin_20 0.80 root inner join, inner:TableReader_19, outer key:test39999.t.txn_account_id, test39999.t.serial_id, inner key:test39999.c.txt_account_id, test39999.c.serial_id, equal cond:eq(test39999.t.serial_id, test39999.c.serial_id), eq(test39999.t.txn_account_id, test39999.c.txt_account_id)`,
`├─TableReader_25(Build) 0.80 root data:Selection_24`,
`│ └─Selection_24 0.80 cop[tikv] eq(test39999.t.broker, "0009"), not(isnull(test39999.t.serial_id))`,
`│ └─TableFullScan_23 1.00 cop[tikv] table:t keep order:false`,
`└─TableReader_19(Probe) 0.80 root partition:all data:Selection_18`,
` └─Selection_18 0.80 cop[tikv] eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)`,
` └─TableRangeScan_17 0.80 cop[tikv] table:c range: decided by [eq(test39999.c.txt_account_id, test39999.t.txn_account_id) eq(test39999.c.serial_id, test39999.t.serial_id) eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false`))
}
23 changes: 16 additions & 7 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,12 +1358,15 @@ func canScalarFuncPushDown(scalarFunc *ScalarFunction, pc PbConverter, storeType
panic(errors.Errorf("unspecified PbCode: %T", scalarFunc.Function))
})
}
storageName := storeType.Name()
if storeType == kv.UnSpecified {
storageName = "storage layer"
}
warnErr := errors.New("Scalar function '" + scalarFunc.FuncName.L + "'(signature: " + scalarFunc.Function.PbCode().String() + ", return type: " + scalarFunc.RetType.CompactStr() + ") is not supported to push down to " + storageName + " now.")
if pc.sc.InExplainStmt {
storageName := storeType.Name()
if storeType == kv.UnSpecified {
storageName = "storage layer"
}
pc.sc.AppendWarning(errors.New("Scalar function '" + scalarFunc.FuncName.L + "'(signature: " + scalarFunc.Function.PbCode().String() + ", return type: " + scalarFunc.RetType.CompactStr() + ") is not supported to push down to " + storageName + " now."))
pc.sc.AppendWarning(warnErr)
} else {
pc.sc.AppendExtraWarning(warnErr)
}
return false
}
Expand Down Expand Up @@ -1393,14 +1396,20 @@ func canExprPushDown(expr Expression, pc PbConverter, storeType kv.StoreType, ca
if expr.GetType().GetType() == mysql.TypeEnum && canEnumPush {
break
}
warnErr := errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains unsupported calculation of type '" + types.TypeStr(expr.GetType().GetType()) + "'.")
if pc.sc.InExplainStmt {
pc.sc.AppendWarning(errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains unsupported calculation of type '" + types.TypeStr(expr.GetType().GetType()) + "'."))
pc.sc.AppendWarning(warnErr)
} else {
pc.sc.AppendExtraWarning(warnErr)
}
return false
case mysql.TypeNewDecimal:
if !expr.GetType().IsDecimalValid() {
warnErr := errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains invalid decimal('" + strconv.Itoa(expr.GetType().GetFlen()) + "','" + strconv.Itoa(expr.GetType().GetDecimal()) + "').")
if pc.sc.InExplainStmt {
pc.sc.AppendWarning(errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains invalid decimal('" + strconv.Itoa(expr.GetType().GetFlen()) + "','" + strconv.Itoa(expr.GetType().GetDecimal()) + "')."))
pc.sc.AppendWarning(warnErr)
} else {
pc.sc.AppendExtraWarning(warnErr)
}
return false
}
Expand Down
Loading

0 comments on commit ffc2b26

Please sign in to comment.