Skip to content

Commit

Permalink
Merge branch 'master' into s13-add-warning-to-slow-query-part1
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Dec 20, 2022
2 parents 81701ab + 4a72171 commit 284a7c7
Show file tree
Hide file tree
Showing 27 changed files with 433 additions and 132 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C
var genCols []genCol
for i, col := range cols {
if col.GeneratedExpr != nil {
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names)
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names, false)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6164,7 +6164,7 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
// After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic.
// The recover step causes DDL wait a few seconds, makes the unit test painfully slow.
// For same reason, decide whether index is global here.
indexColumns, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications)
indexColumns, _, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -6274,7 +6274,7 @@ func BuildHiddenColumnInfo(ctx sessionctx.Context, indexPartSpecifications []*as
if err != nil {
return nil, errors.Trace(err)
}
expr, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, idxPart.Expr)
expr, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, idxPart.Expr, true)
if err != nil {
// TODO: refine the error message.
return nil, err
Expand Down Expand Up @@ -6389,7 +6389,7 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
// After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic.
// The recover step causes DDL wait a few seconds, makes the unit test painfully slow.
// For same reason, decide whether index is global here.
indexColumns, err := buildIndexColumns(ctx, finalColumns, indexPartSpecifications)
indexColumns, _, err := buildIndexColumns(ctx, finalColumns, indexPartSpecifications)
if err != nil {
return errors.Trace(err)
}
Expand Down
38 changes: 34 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,16 +536,20 @@ func cleanMDLInfo(pool *sessionPool, jobID int64, ec *clientv3.Client) {
}

// checkMDLInfo checks if metadata lock info exists. It means the schema is locked by some TiDBs if exists.
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, error) {
sql := fmt.Sprintf("select * from mysql.tidb_mdl_info where job_id = %d", jobID)
func checkMDLInfo(jobID int64, pool *sessionPool) (bool, int64, error) {
sql := fmt.Sprintf("select version from mysql.tidb_mdl_info where job_id = %d", jobID)
sctx, _ := pool.get()
defer pool.put(sctx)
sess := newSession(sctx)
rows, err := sess.execute(context.Background(), sql, "check-mdl-info")
if err != nil {
return false, err
return false, 0, err
}
return len(rows) > 0, nil
if len(rows) == 0 {
return false, 0, nil
}
ver := rows[0].GetInt64(0)
return true, ver, nil
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
Expand Down Expand Up @@ -1377,6 +1381,32 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l
zap.String("job", job.String()))
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64) error {
failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
} else {
mockDDLErrOnce = -1
}
}
})

timeStart := time.Now()
// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return err
}
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSynced handles the following situation:
// If the job enters a new state, and the worker crashs when it's in the process of waiting for 2 * lease time,
// Then the worker restarts quickly, we may run the job immediately again,
Expand Down
24 changes: 18 additions & 6 deletions ddl/generated_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,14 @@ func checkModifyGeneratedColumn(sctx sessionctx.Context, tbl table.Table, oldCol
}

type illegalFunctionChecker struct {
hasIllegalFunc bool
hasAggFunc bool
hasRowVal bool // hasRowVal checks whether the functional index refers to a row value
hasWindowFunc bool
hasNotGAFunc4ExprIdx bool
otherErr error
hasIllegalFunc bool
hasAggFunc bool
hasRowVal bool // hasRowVal checks whether the functional index refers to a row value
hasWindowFunc bool
hasNotGAFunc4ExprIdx bool
hasCastArrayFunc bool
disallowCastArrayFunc bool
otherErr error
}

func (c *illegalFunctionChecker) Enter(inNode ast.Node) (outNode ast.Node, skipChildren bool) {
Expand Down Expand Up @@ -308,7 +310,14 @@ func (c *illegalFunctionChecker) Enter(inNode ast.Node) (outNode ast.Node, skipC
case *ast.WindowFuncExpr:
c.hasWindowFunc = true
return inNode, true
case *ast.FuncCastExpr:
c.hasCastArrayFunc = c.hasCastArrayFunc || node.Tp.IsArray()
if c.disallowCastArrayFunc && node.Tp.IsArray() {
c.otherErr = expression.ErrNotSupportedYet.GenWithStackByArgs("Use of CAST( .. AS .. ARRAY) outside of functional index in CREATE(non-SELECT)/ALTER TABLE or in general expressions")
return inNode, true
}
}
c.disallowCastArrayFunc = true
return inNode, false
}

Expand Down Expand Up @@ -355,6 +364,9 @@ func checkIllegalFn4Generated(name string, genType int, expr ast.ExprNode) error
if genType == typeIndex && c.hasNotGAFunc4ExprIdx && !config.GetGlobalConfig().Experimental.AllowsExpressionIndex {
return dbterror.ErrUnsupportedExpressionIndex
}
if genType == typeColumn && c.hasCastArrayFunc {
return expression.ErrNotSupportedYet.GenWithStackByArgs("Use of CAST( .. AS .. ARRAY) outside of functional index in CREATE(non-SELECT)/ALTER TABLE or in general expressions")
}
return nil
}

Expand Down
21 changes: 12 additions & 9 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,28 @@ var (
telemetryAddIndexIngestUsage = metrics.TelemetryAddIndexIngestCnt
)

func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification) ([]*model.IndexColumn, error) {
func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification) ([]*model.IndexColumn, bool, error) {
// Build offsets.
idxParts := make([]*model.IndexColumn, 0, len(indexPartSpecifications))
var col *model.ColumnInfo
var mvIndex bool
maxIndexLength := config.GetGlobalConfig().MaxIndexLength
// The sum of length of all index columns.
sumLength := 0
for _, ip := range indexPartSpecifications {
col = model.FindColumnInfo(columns, ip.Column.Name.L)
if col == nil {
return nil, dbterror.ErrKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ip.Column.Name)
return nil, false, dbterror.ErrKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ip.Column.Name)
}

if err := checkIndexColumn(ctx, col, ip.Length); err != nil {
return nil, err
return nil, false, err
}
mvIndex = mvIndex || col.FieldType.IsArray()
indexColLen := ip.Length
indexColumnLength, err := getIndexColumnLength(col, ip.Length)
if err != nil {
return nil, err
return nil, false, err
}
sumLength += indexColumnLength

Expand All @@ -92,12 +94,12 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde
// The multiple column index and the unique index in which the length sum exceeds the maximum size
// will return an error instead produce a warning.
if ctx == nil || ctx.GetSessionVars().StrictSQLMode || mysql.HasUniKeyFlag(col.GetFlag()) || len(indexPartSpecifications) > 1 {
return nil, dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength)
return nil, false, dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength)
}
// truncate index length and produce warning message in non-restrict sql mode.
colLenPerUint, err := getIndexColumnLength(col, 1)
if err != nil {
return nil, err
return nil, false, err
}
indexColLen = maxIndexLength / colLenPerUint
// produce warning message
Expand All @@ -111,7 +113,7 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde
})
}

return idxParts, nil
return idxParts, mvIndex, nil
}

// CheckPKOnGeneratedColumn checks the specification of PK is valid.
Expand Down Expand Up @@ -154,7 +156,7 @@ func checkIndexColumn(ctx sessionctx.Context, col *model.ColumnInfo, indexColumn
}

// JSON column cannot index.
if col.FieldType.GetType() == mysql.TypeJSON {
if col.FieldType.GetType() == mysql.TypeJSON && !col.FieldType.IsArray() {
if col.Hidden {
return dbterror.ErrFunctionalIndexOnJSONOrGeometryFunction
}
Expand Down Expand Up @@ -263,7 +265,7 @@ func BuildIndexInfo(
return nil, errors.Trace(err)
}

idxColumns, err := buildIndexColumns(ctx, allTableColumns, indexPartSpecifications)
idxColumns, mvIndex, err := buildIndexColumns(ctx, allTableColumns, indexPartSpecifications)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -276,6 +278,7 @@ func BuildIndexInfo(
Primary: isPrimary,
Unique: isUnique,
Global: isGlobal,
MVIndex: mvIndex,
}

if indexOption != nil {
Expand Down
6 changes: 2 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if variable.EnableMDL.Load() {
exist, err := checkMDLInfo(job.ID, d.sessPool)
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
logutil.BgLogger().Warn("[ddl] check MDL info failed", zap.Error(err), zap.String("job", job.String()))
// Release the worker resource.
Expand All @@ -246,10 +246,8 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
} else if exist {
// Release the worker resource.
pool.put(wk)
err = waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
err = waitSchemaSyncedForMDL(d.ddlCtx, job, version)
if err != nil {
logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String()))
time.Sleep(time.Second)
return
}
d.once.Store(false)
Expand Down
2 changes: 1 addition & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ func checkPartitionFuncType(ctx sessionctx.Context, expr ast.ExprNode, tblInfo *
return nil
}

e, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, expr)
e, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, expr, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/design/2020-08-04-global-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ In TiDB, operators in the partitioned table will be translated to UnionAll in th

## Compatibility

MySQL does not support global index, which means this feature may cause some compatibility issues. We add an option `enable_global_index` in `config.Config` to control it. The default value of this option is `false`, so TiDB will keep consistent with MySQL, unless the user open global index feature manually.
MySQL does not support global index, which means this feature may cause some compatibility issues. We add an option `enable-global-index` in `config.Config` to control it. The default value of this option is `false`, so TiDB will keep consistent with MySQL, unless the user open global index feature manually.

## Implementation

Expand Down
85 changes: 38 additions & 47 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3511,17 +3511,39 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta
return nextRange, nil
}

func keyColumnsIncludeAllPartitionColumns(keyColumns []int, pe *tables.PartitionExpr) bool {
tmp := make(map[int]struct{}, len(keyColumns))
for _, offset := range keyColumns {
tmp[offset] = struct{}{}
func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []int {
keyColOffsets := make([]int, len(keyColIDs))
for i, colID := range keyColIDs {
offset := -1
for j, col := range pt.Cols() {
if colID == col.ID {
offset = j
break
}
}
if offset == -1 {
return nil
}
keyColOffsets[i] = offset
}

pe, err := pt.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil
}

offsetMap := make(map[int]struct{})
for _, offset := range keyColOffsets {
offsetMap[offset] = struct{}{}
}
for _, offset := range pe.ColumnOffset {
if _, ok := tmp[offset]; !ok {
return false
if _, ok := offsetMap[offset]; !ok {
return nil
}
}
return true
return keyColOffsets
}

func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table, schema *expression.Schema, partitionInfo *plannercore.PartitionInfo,
Expand All @@ -3536,45 +3558,16 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table
return nil, false, nil, err
}

// check whether can runtime prune.
type partitionExpr interface {
PartitionExpr() (*tables.PartitionExpr, error)
}
pe, err := tbl.(partitionExpr).PartitionExpr()
if err != nil {
return nil, false, nil, err
}

// recalculate key column offsets
if len(lookUpContent) == 0 {
return nil, false, nil, nil
}
if lookUpContent[0].keyColIDs == nil {
return nil, false, nil, plannercore.ErrInternal.GenWithStack("cannot get column IDs when dynamic pruning")
}
keyColOffsets := make([]int, len(lookUpContent[0].keyColIDs))
for i, colID := range lookUpContent[0].keyColIDs {
offset := -1
for j, col := range partitionTbl.Cols() {
if colID == col.ID {
offset = j
break
}
}
if offset == -1 {
return nil, false, nil, plannercore.ErrInternal.GenWithStack("invalid column offset when dynamic pruning")
}
keyColOffsets[i] = offset
}

offsetMap := make(map[int]bool)
for _, offset := range keyColOffsets {
offsetMap[offset] = true
}
for _, offset := range pe.ColumnOffset {
if _, ok := offsetMap[offset]; !ok {
return condPruneResult, false, nil, nil
}
keyColOffsets := getPartitionKeyColOffsets(lookUpContent[0].keyColIDs, partitionTbl)
if len(keyColOffsets) == 0 {
return condPruneResult, false, nil, nil
}

locateKey := make([]types.Datum, len(partitionTbl.Cols()))
Expand Down Expand Up @@ -4149,12 +4142,6 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
partitionInfo := &v.PartitionInfo
usedPartitionList, err := builder.partitionPruning(pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
Expand All @@ -4165,8 +4152,12 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
usedPartitions[p.GetPhysicalID()] = p
}
var kvRanges []kv.KeyRange
var keyColOffsets []int
if len(lookUpContents) > 0 {
keyColOffsets = getPartitionKeyColOffsets(lookUpContents[0].keyColIDs, pt)
}
if v.IsCommonHandle {
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
if len(keyColOffsets) > 0 {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
// lookUpContentsByPID groups lookUpContents by pid(partition) so that kv ranges for same partition can be merged.
Expand Down Expand Up @@ -4212,7 +4203,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte

handles, lookUpContents := dedupHandles(lookUpContents)

if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
if len(keyColOffsets) > 0 {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
for _, content := range lookUpContents {
Expand Down
Loading

0 comments on commit 284a7c7

Please sign in to comment.