Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove ExtraPidCol and replace it with ExtraPhysTblIDCol #53974

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func (b *executorBuilder) buildCleanupIndex(v *plannercore.CleanupIndex) exec.Ex
sessCtx := e.Ctx().GetSessionVars().StmtCtx
e.handleCols = buildHandleColsForExec(sessCtx, tblInfo, e.columns)
if e.index.Meta().Global {
e.columns = append(e.columns, model.NewExtraPartitionIDColInfo())
e.columns = append(e.columns, model.NewExtraPhysTblIDColInfo())
}
return e
}
Expand Down Expand Up @@ -4935,7 +4935,7 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}
defVal := func(i int, chk *chunk.Chunk) error {
if reqCols[i].ID < 0 {
// model.ExtraHandleID, ExtraPidColID, ExtraPhysTblID... etc
// model.ExtraHandleID, ExtraPhysTblID... etc
// Don't set the default value for that column.
chk.AppendNull(i)
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {

datumRow := make([]types.Datum, 0, len(fields))
for i, field := range fields {
if columns[i].ID == model.ExtraPidColID || columns[i].ID == model.ExtraPhysTblID {
if columns[i].ID == model.ExtraPhysTblID {
continue
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,24 +581,22 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in

func (e *IndexLookUpExecutor) needPartitionHandle(tp getHandleType) (bool, error) {
var col *expression.Column
var needPartitionHandle, hasExtraCol bool
var needPartitionHandle bool
if tp == getHandleFromIndex {
cols := e.idxPlans[0].Schema().Columns
outputOffsets := e.dagPB.OutputOffsets
col = cols[outputOffsets[len(outputOffsets)-1]]
// For indexScan, need partitionHandle when global index or keepOrder with partitionTable
needPartitionHandle = e.index.Global || e.partitionTableMode && e.keepOrder
hasExtraCol = col.ID == model.ExtraPhysTblID || col.ID == model.ExtraPidColID
} else {
cols := e.tblPlans[0].Schema().Columns
outputOffsets := e.tableRequest.OutputOffsets
col = cols[outputOffsets[len(outputOffsets)-1]]

// For TableScan, need partitionHandle in `indexOrder` when e.keepOrder == true or execute `admin check [table|index]` with global index
needPartitionHandle = ((e.index.Global || e.partitionTableMode) && e.keepOrder) || (e.index.Global && e.checkIndexValue != nil)
// no ExtraPidColID here, because TableScan shouldn't contain them.
hasExtraCol = col.ID == model.ExtraPhysTblID
}
hasExtraCol := col.ID == model.ExtraPhysTblID

// There will be two needPartitionHandle != hasExtraCol situations.
// Only `needPartitionHandle` == true and `hasExtraCol` == false are not allowed.
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ func (w *partialTableWorker) needPartitionHandle() (bool, error) {
col := cols[outputOffsets[len(outputOffsets)-1]]

needPartitionHandle := w.partitionTableMode && len(w.byItems) > 0
// no ExtraPidColID here, because a clustered index couldn't be a global index.
hasExtraCol := col.ID == model.ExtraPhysTblID

// There will be two needPartitionHandle != hasExtraCol situations.
Expand Down Expand Up @@ -1698,24 +1697,25 @@ func syncErr(ctx context.Context, finished <-chan struct{}, errCh chan<- *indexM
}

// needPartitionHandle indicates whether we need create a partitionHandle or not.
// If the schema from planner part contains ExtraPidColID or ExtraPhysTblID,
// If the schema from planner part contains ExtraPhysTblID,
// we need create a partitionHandle, otherwise create a normal handle.
// In TableRowIDScan, the partitionHandle will be used to create key ranges.
func (w *partialIndexWorker) needPartitionHandle() (bool, error) {
cols := w.plan[0].Schema().Columns
outputOffsets := w.dagPB.OutputOffsets
col := cols[outputOffsets[len(outputOffsets)-1]]

needPartitionHandle := w.partitionTableMode && len(w.byItems) > 0
hasExtraCol := col.ID == model.ExtraPidColID || col.ID == model.ExtraPhysTblID
is := w.plan[0].(*plannercore.PhysicalIndexScan)
needPartitionHandle := w.partitionTableMode && len(w.byItems) > 0 || is.Index.Global
hasExtraCol := col.ID == model.ExtraPhysTblID

// There will be two needPartitionHandle != hasExtraCol situations.
// Only `needPartitionHandle` == true and `hasExtraCol` == false are not allowed.
// `ExtraPhysTblID` will be used in `SelectLock` when `needPartitionHandle` == false and `hasExtraCol` == true.
if needPartitionHandle && !hasExtraCol {
return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret")
}
return needPartitionHandle || (col.ID == model.ExtraPidColID), nil
return needPartitionHandle, nil
}

func (w *partialIndexWorker) fetchHandles(
Expand Down
29 changes: 29 additions & 0 deletions pkg/executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2407,3 +2407,32 @@ func TestIssue31024(t *testing.T) {

tk2.MustExec("rollback")
}

func TestGlobalIndexWithSelectLock(t *testing.T) {
store := testkit.CreateMockStore(t)

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("set tidb_enable_global_index = true")
tk1.MustExec("use test")
tk1.MustExec("create table t(a int, b int, unique index(b), primary key(a)) partition by hash(a) partitions 5;")
tk1.MustExec("insert into t values (1,1),(2,2),(3,3),(4,4),(5,5);")
tk1.MustExec("begin")
tk1.MustExec("select * from t use index(b) where b = 2 order by b limit 1 for update;")

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

ch := make(chan int, 10)
go func() {
// Check the key is locked.
tk2.MustExec("update t set b = 6 where b = 2")
ch <- 1
}()

time.Sleep(50 * time.Millisecond)
ch <- 0
tk1.MustExec("commit")

require.Equal(t, <-ch, 0)
require.Equal(t, <-ch, 1)
}
6 changes: 3 additions & 3 deletions pkg/expression/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func (expr *ScalarFunction) explainInfo(ctx EvalContext, normalized bool) string
intest.Assert(normalized || ctx != nil)
var buffer bytes.Buffer
fmt.Fprintf(&buffer, "%s(", expr.FuncName.L)
// convert `in(_tidb_pid, -1)` to `in(_tidb_pid, dual)` whether normalized equals to true or false.
// convert `in(_tidb_tid, -1)` to `in(_tidb_tid, dual)` whether normalized equals to true or false.
if expr.FuncName.L == ast.In {
args := expr.GetArgs()
if len(args) == 2 && args[0].ExplainNormalizedInfo() == model.ExtraPartitionIdName.L && args[1].(*Constant).Value.GetInt64() == -1 {
buffer.WriteString(model.ExtraPartitionIdName.L + ", dual)")
if len(args) == 2 && strings.HasSuffix(args[0].ExplainNormalizedInfo(), model.ExtraPhysTblIdName.L) && args[1].(*Constant).Value.GetInt64() == -1 {
buffer.WriteString(args[0].ExplainNormalizedInfo() + ", dual)")
return buffer.String()
}
}
Expand Down
26 changes: 5 additions & 21 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,12 @@ func IsIndexPrefixCovered(tbInfo *TableInfo, index *IndexInfo, cols ...CIStr) bo
// for use of execution phase.
const ExtraHandleID = -1

// ExtraPidColID is the column ID of column which store the partitionID decoded in global index values.
const ExtraPidColID = -2
// Deprecated: Use ExtraPhysTblID instead.
// const ExtraPidColID = -2

// ExtraPhysTblID is the column ID of column that should be filled in with the physical table id.
// Primarily used for table partition dynamic prune mode, to return which partition (physical table id) the row came from.
// Using a dedicated id for this, since in the future ExtraPidColID and ExtraPhysTblID may be used for the same request.
// Must be after ExtraPidColID!
// If used with a global index, the partition ID decoded from the key value will be filled in.
const ExtraPhysTblID = -3

// ExtraRowChecksumID is the column ID of column which holds the row checksum info.
Expand Down Expand Up @@ -435,8 +434,8 @@ const (
// ExtraHandleName is the name of ExtraHandle Column.
var ExtraHandleName = NewCIStr("_tidb_rowid")

// ExtraPartitionIdName is the name of ExtraPartitionId Column.
var ExtraPartitionIdName = NewCIStr("_tidb_pid") //nolint:revive
// Deprecated: Use ExtraPhysTblIdName instead.
// var ExtraPartitionIdName = NewCIStr("_tidb_pid") //nolint:revive

// ExtraPhysTblIdName is the name of ExtraPhysTblID Column.
var ExtraPhysTblIdName = NewCIStr("_tidb_tid") //nolint:revive
Expand Down Expand Up @@ -923,21 +922,6 @@ func NewExtraHandleColInfo() *ColumnInfo {
return colInfo
}

// NewExtraPartitionIDColInfo mocks a column info for extra partition id column.
func NewExtraPartitionIDColInfo() *ColumnInfo {
colInfo := &ColumnInfo{
ID: ExtraPidColID,
Name: ExtraPartitionIdName,
}
colInfo.SetType(mysql.TypeLonglong)
flen, decimal := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong)
colInfo.SetFlen(flen)
colInfo.SetDecimal(decimal)
colInfo.SetCharset(charset.CharsetBin)
colInfo.SetCollate(charset.CollationBin)
return colInfo
}

// NewExtraPhysTblIDColInfo mocks a column info for extra partition id column.
func NewExtraPhysTblIDColInfo() *ColumnInfo {
colInfo := &ColumnInfo{
Expand Down
38 changes: 17 additions & 21 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,15 +1423,6 @@ func (ds *DataSource) FindBestTask(prop *property.PhysicalProperty, planCounter
}
}
if canConvertPointGet {
// If the schema contains ExtraPidColID, do not convert to point get.
// Because the point get executor can not handle the extra partition ID column now.
// I.e. Global Index is used
for _, col := range ds.schema.Columns {
if col.ID == model.ExtraPidColID {
canConvertPointGet = false
break
}
}
if path != nil && path.Index != nil && path.Index.Global {
// Don't convert to point get during ddl
// TODO: Revisit truncate partition and global index
Expand Down Expand Up @@ -2144,6 +2135,15 @@ func (is *PhysicalIndexScan) initSchema(idxExprCols []*expression.Column, isDoub
}
}

var extraPhysTblCol *expression.Column
// If `dataSouceSchema` contains `model.ExtraPhysTblID`, we should add it into `indexScan.schema`
for _, col := range is.dataSourceSchema.Columns {
if col.ID == model.ExtraPhysTblID {
extraPhysTblCol = col.Clone().(*expression.Column)
break
}
}

if isDoubleRead || is.Index.Global {
// If it's double read case, the first index must return handle. So we should add extra handle column
// if there isn't a handle column.
Expand All @@ -2157,23 +2157,19 @@ func (is *PhysicalIndexScan) initSchema(idxExprCols []*expression.Column, isDoub
})
}
}
// If it's global index, handle and PidColID columns has to be added, so that needed pids can be filtered.
if is.Index.Global {
// If it's global index, handle and PhysTblID columns has to be added, so that needed pids can be filtered.
if is.Index.Global && extraPhysTblCol == nil {
indexCols = append(indexCols, &expression.Column{
RetType: types.NewFieldType(mysql.TypeLonglong),
ID: model.ExtraPidColID,
ID: model.ExtraPhysTblID,
UniqueID: is.SCtx().GetSessionVars().AllocPlanColumnID(),
OrigName: model.ExtraPartitionIdName.O,
OrigName: model.ExtraPhysTblIdName.O,
})
}
}

// If `dataSouceSchema` contains `model.ExtraPhysTblID`, we should add it into `indexScan.schema`
for _, col := range is.dataSourceSchema.Columns {
if col.ID == model.ExtraPhysTblID {
indexCols = append(indexCols, col.Clone().(*expression.Column))
break
}
if extraPhysTblCol != nil {
indexCols = append(indexCols, extraPhysTblCol)
}

is.SetSchema(expression.NewSchema(indexCols...))
Expand All @@ -2185,14 +2181,14 @@ func (is *PhysicalIndexScan) addSelectionConditionForGlobalIndex(p *DataSource,
}
args := make([]expression.Expression, 0, len(p.partitionNames)+1)
for _, col := range is.schema.Columns {
if col.ID == model.ExtraPidColID {
if col.ID == model.ExtraPhysTblID {
args = append(args, col.Clone())
break
}
}

if len(args) != 1 {
return nil, errors.Errorf("Can't find column %s in schema %s", model.ExtraPartitionIdName.O, is.schema)
return nil, errors.Errorf("Can't find column %s in schema %s", model.ExtraPhysTblIdName.O, is.schema)
}

// For SQL like 'select x from t partition(p0, p1) use index(idx)',
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3892,7 +3892,7 @@ func unfoldWildStar(field *ast.SelectField, outputName types.NameSlice, column [
}
if (dbName.L == "" || dbName.L == name.DBName.L) &&
(tblName.L == "" || tblName.L == name.TblName.L) &&
col.ID != model.ExtraHandleID && col.ID != model.ExtraPidColID && col.ID != model.ExtraPhysTblID {
col.ID != model.ExtraHandleID && col.ID != model.ExtraPhysTblID {
colName := &ast.ColumnNameExpr{
Name: &ast.ColumnName{
Schema: name.DBName,
Expand Down
2 changes: 0 additions & 2 deletions pkg/planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,6 @@ func (p *PhysicalIndexScan) ToPB(_ *base.BuildPBContext, _ kv.StoreType) (*tipb.
columns = append(columns, model.NewExtraHandleColInfo())
} else if col.ID == model.ExtraPhysTblID {
columns = append(columns, model.NewExtraPhysTblIDColInfo())
} else if col.ID == model.ExtraPidColID {
columns = append(columns, model.NewExtraPartitionIDColInfo())
} else {
columns = append(columns, FindColumnInfoByID(tableColumns, col.ID))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (p *LogicalUnionScan) PruneColumns(parentUsedCols []*expression.Column, opt
parentUsedCols = append(parentUsedCols, p.handleCols.GetCol(i))
}
for _, col := range p.Schema().Columns {
if col.ID == model.ExtraPidColID || col.ID == model.ExtraPhysTblID {
if col.ID == model.ExtraPhysTblID {
parentUsedCols = append(parentUsedCols, col)
}
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,6 @@ func (*partitionProcessor) reconstructTableColNames(ds *DataSource) ([]*types.Fi
})
continue
}
if colExpr.ID == model.ExtraPidColID {
names = append(names, &types.FieldName{
DBName: ds.DBName,
TblName: ds.tableInfo.Name,
ColName: model.ExtraPartitionIdName,
OrigColName: model.ExtraPartitionIdName,
})
continue
}
if colExpr.ID == model.ExtraPhysTblID {
names = append(names, &types.FieldName{
DBName: ds.DBName,
Expand Down
11 changes: 3 additions & 8 deletions pkg/store/mockstore/unistore/cophandler/closure_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,6 @@ func (e *closureExecutor) initIdxScanCtx(idxScan *tipb.IndexScan) {
lastColumn = e.columnInfos[e.idxScanCtx.columnLen-1]
}

// Here it is required that ExtraPidColID
// is after all other columns except ExtraPhysTblID
if lastColumn.GetColumnId() == model.ExtraPidColID {
e.idxScanCtx.columnLen--
lastColumn = e.columnInfos[e.idxScanCtx.columnLen-1]
}

if len(e.idxScanCtx.primaryColumnIds) == 0 {
if lastColumn.GetPkHandle() {
if mysql.HasUnsignedFlag(uint(lastColumn.GetFlag())) {
Expand Down Expand Up @@ -932,7 +925,9 @@ func (e *closureExecutor) indexScanProcessCore(key, value []byte) error {
}
// Add ExtraPhysTblID if requested
// Assumes it is always last!
if e.columnInfos[len(e.columnInfos)-1].ColumnId == model.ExtraPhysTblID {
// If we need pid, it already filled by above loop. Because `DecodeIndexKV` func will return pid in `values`.
// The following if statement is to fill in the tid when we needed it.
if e.columnInfos[len(e.columnInfos)-1].ColumnId == model.ExtraPhysTblID && len(e.columnInfos) >= len(values) {
tblID := tablecodec.DecodeTableID(key)
chk.AppendInt64(len(e.columnInfos)-1, tblID)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/store/mockstore/unistore/cophandler/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ func (b *mppExecBuilder) buildIdxScan(pb *tipb.IndexScan) (*indexScanExec, error
*physTblIDColIdx = numIdxCols
lastCol = pb.Columns[numIdxCols-1]
}
if lastCol.GetColumnId() == model.ExtraPidColID {
numIdxCols--
lastCol = pb.Columns[numIdxCols-1]
}

hdlStatus := tablecodec.HandleDefault
if len(primaryColIds) == 0 {
Expand Down
5 changes: 4 additions & 1 deletion pkg/store/mockstore/unistore/cophandler/mpp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ func (e *indexScanExec) Process(key, value []byte) error {
}
}
}
if e.physTblIDColIdx != nil {

// If we need pid, it already filled by above loop. Because `DecodeIndexKV` func will return pid in `values`.
// The following if statement is to fill in the tid when we needed it.
if e.physTblIDColIdx != nil && *e.physTblIDColIdx >= len(values) {
tblID := tablecodec.DecodeTableID(key)
e.chk.AppendInt64(*e.physTblIDColIdx, tblID)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integrationtest/r/globalindex/aggregate.result
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ id estRows task access object operator info
HashAgg 1.00 root NULL funcs:count(Column#9)->Column#4, funcs:max(Column#10)->Column#5, funcs:min(Column#11)->Column#6
└─IndexReader 1.00 root partition:p0 index:HashAgg
└─HashAgg 1.00 cop[tikv] NULL funcs:count(1)->Column#9, funcs:max(globalindex__aggregate.p.id)->Column#10, funcs:min(globalindex__aggregate.p.id)->Column#11
└─Selection 10000.00 cop[tikv] NULL in(_tidb_pid, pid0)
└─Selection 10000.00 cop[tikv] NULL in(_tidb_tid, tid0)
└─IndexFullScan 10000.00 cop[tikv] table:p, index:idx(id) keep order:false, stats:pseudo
select count(*), max(id), min(id) from p partition(p0) use index(idx);
count(*) max(id) min(id)
Expand All @@ -41,7 +41,7 @@ explain format='brief' select avg(id), max(id), min(id) from p partition(p0) use
id estRows task access object operator info
HashAgg 8000.00 root NULL group by:globalindex__aggregate.p.c, funcs:avg(Column#9, Column#10)->Column#4, funcs:max(Column#11)->Column#5, funcs:min(Column#12)->Column#6
└─IndexLookUp 8000.00 root partition:p0 NULL
├─Selection(Build) 10000.00 cop[tikv] NULL in(_tidb_pid, pid0)
├─Selection(Build) 10000.00 cop[tikv] NULL in(_tidb_tid, tid0)
│ └─IndexFullScan 10000.00 cop[tikv] table:p, index:idx(id) keep order:false, stats:pseudo
└─HashAgg(Probe) 8000.00 cop[tikv] NULL group by:globalindex__aggregate.p.c, funcs:count(globalindex__aggregate.p.id)->Column#9, funcs:sum(globalindex__aggregate.p.id)->Column#10, funcs:max(globalindex__aggregate.p.id)->Column#11, funcs:min(globalindex__aggregate.p.id)->Column#12
└─TableRowIDScan 10000.00 cop[tikv] table:p keep order:false, stats:pseudo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ explain format='brief' select * from t partition(p0) use index(idx) where lower(
id estRows task access object operator info
Projection 3333.33 root NULL globalindex__expression_index.t.a, globalindex__expression_index.t.b
└─IndexLookUp 3333.33 root partition:p0 NULL
├─Selection(Build) 3333.33 cop[tikv] NULL in(_tidb_pid, pid0)
├─Selection(Build) 3333.33 cop[tikv] NULL in(_tidb_tid, tid0)
│ └─IndexRangeScan 3333.33 cop[tikv] table:t, index:idx(lower(`b`)) range:("c",+inf], keep order:false, stats:pseudo
└─TableRowIDScan(Probe) 3333.33 cop[tikv] table:t keep order:false, stats:pseudo
select * from t partition(p0) use index(idx) where lower(b) > 'c';
Expand Down
Loading