From f97a85ea7a214833fb63a5983543f6088ccfef27 Mon Sep 17 00:00:00 2001 From: hanfei Date: Thu, 22 Sep 2016 11:51:32 +0800 Subject: [PATCH 1/8] tmp commit --- kv/kv.go | 1 + plan/expr_to_pb.go | 153 ++++++++---------- plan/expression_rewriter.go | 6 +- plan/match_property.go | 17 +- plan/optimizer.go | 2 +- plan/physical_plan_builder.go | 244 ++++++++++++++++++++--------- plan/physical_plans.go | 75 ++++++--- plan/plan.go | 4 +- plan/plan_test.go | 15 +- plan/planbuilder.go | 2 +- plan/push_limit.go | 287 ---------------------------------- 11 files changed, 315 insertions(+), 491 deletions(-) delete mode 100644 plan/push_limit.go diff --git a/kv/kv.go b/kv/kv.go index 2f298028a4b6e..43a1dba42892a 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -106,6 +106,7 @@ const ( ReqSubTypeBasic = 0 ReqSubTypeDesc = 10000 ReqSubTypeGroupBy = 10001 + ReqSubTypeTopN = 10002 ) // KeyRange represents a range where StartKey <= key < EndKey. diff --git a/plan/expr_to_pb.go b/plan/expr_to_pb.go index 042f9125a04db..cbac53b713b27 100644 --- a/plan/expr_to_pb.go +++ b/plan/expr_to_pb.go @@ -14,7 +14,7 @@ package plan import ( - "github.com/juju/errors" + "github.com/ngaut/log" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -24,12 +24,9 @@ import ( "github.com/pingcap/tipb/go-tipb" ) -func expressionsToPB(exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, remained []expression.Expression, err error) { +func expressionsToPB(exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, remained []expression.Expression) { for _, expr := range exprs { - v, err := exprToPB(client, expr) - if err != nil { - return nil, nil, errors.Trace(err) - } + v := exprToPB(client, expr) if v == nil { remained = append(remained, expr) continue @@ -46,7 +43,7 @@ func expressionsToPB(exprs []expression.Expression, client kv.Client) (pbExpr *t return } -func exprToPB(client kv.Client, expr expression.Expression) (*tipb.Expr, error) { +func exprToPB(client kv.Client, expr expression.Expression) *tipb.Expr { switch x := expr.(type) { case *expression.Constant: return datumToPBExpr(client, x.Value) @@ -55,10 +52,10 @@ func exprToPB(client kv.Client, expr expression.Expression) (*tipb.Expr, error) case *expression.ScalarFunction: return scalarFuncToPBExpr(client, x) } - return nil, nil + return nil } -func datumToPBExpr(client kv.Client, d types.Datum) (*tipb.Expr, error) { +func datumToPBExpr(client kv.Client, d types.Datum) *tipb.Expr { var tp tipb.ExprType var val []byte switch d.Kind() { @@ -89,43 +86,43 @@ func datumToPBExpr(client kv.Client, d types.Datum) (*tipb.Expr, error) { tp = tipb.ExprType_MysqlDecimal val = codec.EncodeDecimal(nil, d) default: - return nil, nil + return nil } if !client.SupportRequestType(kv.ReqTypeSelect, int64(tp)) { - return nil, nil + return nil } - return &tipb.Expr{Tp: tp, Val: val}, nil + return &tipb.Expr{Tp: tp, Val: val} } -func columnToPBExpr(client kv.Client, column *expression.Column) (*tipb.Expr, error) { +func columnToPBExpr(client kv.Client, column *expression.Column) *tipb.Expr { if !client.SupportRequestType(kv.ReqTypeSelect, int64(tipb.ExprType_ColumnRef)) { - return nil, nil + return nil } switch column.GetType().Tp { case mysql.TypeBit, mysql.TypeSet, mysql.TypeEnum, mysql.TypeGeometry, mysql.TypeDecimal: - return nil, nil + return nil } if column.Correlated { - return nil, nil + return nil } id := column.ID // Zero Column ID is not a column from table, can not support for now. if id == 0 { - return nil, nil + return nil } // its value is available to use. if id == -1 { - return nil, nil + return nil } return &tipb.Expr{ Tp: tipb.ExprType_ColumnRef, - Val: codec.EncodeInt(nil, id)}, nil + Val: codec.EncodeInt(nil, id)} } -func scalarFuncToPBExpr(client kv.Client, expr *expression.ScalarFunction) (*tipb.Expr, error) { +func scalarFuncToPBExpr(client kv.Client, expr *expression.ScalarFunction) *tipb.Expr { var tp tipb.ExprType switch expr.FuncName.L { case ast.LT: @@ -154,94 +151,79 @@ func scalarFuncToPBExpr(client kv.Client, expr *expression.ScalarFunction) (*tip // Only patterns like 'abc', '%abc', 'abc%', '%abc%' can be converted to *tipb.Expr for now. escape := expr.Args[2].(*expression.Constant).Value if escape.IsNull() || byte(escape.GetInt64()) != '\\' { - return nil, nil + return nil } pattern, ok := expr.Args[1].(*expression.Constant) if !ok || pattern.Value.Kind() != types.KindString { - return nil, nil + return nil } for i, b := range pattern.Value.GetString() { switch b { case '\\', '_': - return nil, nil + return nil case '%': if i != 0 && i != len(pattern.Value.GetString())-1 { - return nil, nil + return nil } } } tp = tipb.ExprType_Like default: - return nil, nil + return nil } if !client.SupportRequestType(kv.ReqTypeSelect, int64(tp)) { - return nil, nil + return nil } - expr0, err := exprToPB(client, expr.Args[0]) - if err != nil { - return nil, errors.Trace(err) - } + expr0 := exprToPB(client, expr.Args[0]) if expr0 == nil { - return nil, nil - } - expr1, err := exprToPB(client, expr.Args[1]) - if err != nil { - return nil, errors.Trace(err) + return nil } + expr1 := exprToPB(client, expr.Args[1]) if expr1 == nil { - return nil, nil + return nil } return &tipb.Expr{ Tp: tp, - Children: []*tipb.Expr{expr0, expr1}}, nil + Children: []*tipb.Expr{expr0, expr1}} } -func inToPBExpr(client kv.Client, expr *expression.ScalarFunction) (*tipb.Expr, error) { +func inToPBExpr(client kv.Client, expr *expression.ScalarFunction) *tipb.Expr { if !client.SupportRequestType(kv.ReqTypeSelect, int64(tipb.ExprType_In)) { - return nil, nil + return nil } - pbExpr, err := exprToPB(client, expr.Args[0]) - if err != nil { - return nil, errors.Trace(err) - } + pbExpr := exprToPB(client, expr.Args[0]) if pbExpr == nil { - return nil, nil - } - listExpr, err := constListToPB(client, expr.Args[1:]) - if err != nil { - return nil, errors.Trace(err) + return nil } + listExpr := constListToPB(client, expr.Args[1:]) if listExpr == nil { - return nil, nil + return nil } return &tipb.Expr{ Tp: tipb.ExprType_In, - Children: []*tipb.Expr{pbExpr, listExpr}}, nil + Children: []*tipb.Expr{pbExpr, listExpr}} } -func notToPBExpr(client kv.Client, expr *expression.ScalarFunction) (*tipb.Expr, error) { +func notToPBExpr(client kv.Client, expr *expression.ScalarFunction) *tipb.Expr { if !client.SupportRequestType(kv.ReqTypeSelect, int64(tipb.ExprType_Not)) { - return nil, nil + return nil } - child, err := exprToPB(client, expr.Args[0]) - if err != nil { - return nil, errors.Trace(err) - } + child := exprToPB(client, expr.Args[0]) if child == nil { - return nil, nil + return nil } return &tipb.Expr{ Tp: tipb.ExprType_Not, - Children: []*tipb.Expr{child}}, nil + Children: []*tipb.Expr{child}} } -func constListToPB(client kv.Client, list []expression.Expression) (*tipb.Expr, error) { +func constListToPB(client kv.Client, list []expression.Expression) *tipb.Expr { if !client.SupportRequestType(kv.ReqTypeSelect, int64(tipb.ExprType_ValueList)) { - return nil, nil + return nil } // Only list of *expression.Constant can be push down. @@ -249,19 +231,18 @@ func constListToPB(client kv.Client, list []expression.Expression) (*tipb.Expr, for _, expr := range list { v, ok := expr.(*expression.Constant) if !ok { - return nil, nil + return nil } - if d, err := datumToPBExpr(client, v.Value); err != nil { - return nil, errors.Trace(err) - } else if d == nil { - return nil, nil + d := datumToPBExpr(client, v.Value) + if d == nil { + return nil } datums = append(datums, v.Value) } return datumsToValueList(datums) } -func datumsToValueList(datums []types.Datum) (*tipb.Expr, error) { +func datumsToValueList(datums []types.Datum) *tipb.Expr { // Don't push value list that has different datum kind. prevKind := types.KindNull for _, d := range datums { @@ -269,32 +250,39 @@ func datumsToValueList(datums []types.Datum) (*tipb.Expr, error) { prevKind = d.Kind() } if !d.IsNull() && d.Kind() != prevKind { - return nil, nil + return nil } } err := types.SortDatums(datums) if err != nil { - return nil, errors.Trace(err) + log.Error(err.Error()) + return nil } val, err := codec.EncodeValue(nil, datums...) if err != nil { - return nil, errors.Trace(err) + log.Error(err.Error()) + return nil } - return &tipb.Expr{Tp: tipb.ExprType_ValueList, Val: val}, nil + return &tipb.Expr{Tp: tipb.ExprType_ValueList, Val: val} } -func groupByItemToPB(client kv.Client, expr expression.Expression) (*tipb.ByItem, error) { - e, err := exprToPB(client, expr) - if err != nil { - return nil, errors.Trace(err) +func groupByItemToPB(client kv.Client, expr expression.Expression) *tipb.ByItem { + e := exprToPB(client, expr) + if e == nil { + return nil } + return &tipb.ByItem{Expr: e} +} + +func sortByItemToPB(client kv.Client, expr expression.Expression, desc bool) *tipb.ByItem { + e := exprToPB(client, expr) if e == nil { - return nil, nil + return nil } - return &tipb.ByItem{Expr: e}, nil + return &tipb.ByItem{Expr: e, Desc: desc} } -func aggFuncToPBExpr(client kv.Client, aggFunc expression.AggregationFunction) (*tipb.Expr, error) { +func aggFuncToPBExpr(client kv.Client, aggFunc expression.AggregationFunction) *tipb.Expr { var tp tipb.ExprType switch aggFunc.GetName() { case ast.AggFuncCount: @@ -313,19 +301,16 @@ func aggFuncToPBExpr(client kv.Client, aggFunc expression.AggregationFunction) ( tp = tipb.ExprType_Avg } if !client.SupportRequestType(kv.ReqTypeSelect, int64(tp)) { - return nil, nil + return nil } children := make([]*tipb.Expr, 0, len(aggFunc.GetArgs())) for _, arg := range aggFunc.GetArgs() { - pbArg, err := exprToPB(client, arg) - if err != nil { - return nil, errors.Trace(err) - } + pbArg := exprToPB(client, arg) if pbArg == nil { - return nil, nil + return nil } children = append(children, pbArg) } - return &tipb.Expr{Tp: tp, Children: children}, nil + return &tipb.Expr{Tp: tp, Children: children} } diff --git a/plan/expression_rewriter.go b/plan/expression_rewriter.go index b6ee95a39ca4e..e43cbef51f134 100644 --- a/plan/expression_rewriter.go +++ b/plan/expression_rewriter.go @@ -257,8 +257,7 @@ func (er *expressionRewriter) handleExistSubquery(v *ast.ExistsSubqueryExpr) (as er.err = errors.Trace(err) return v, true } - phyPlan := info.p.PushLimit(nil) - d, err := EvalSubquery(phyPlan, er.b.is, er.b.ctx) + d, err := EvalSubquery(info.p, er.b.is, er.b.ctx) if err != nil { er.err = errors.Trace(err) return v, true @@ -376,8 +375,7 @@ func (er *expressionRewriter) handleScalarSubquery(v *ast.SubqueryExpr) (ast.Nod er.err = errors.Trace(err) return v, true } - phyPlan := info.p.PushLimit(nil) - d, err := EvalSubquery(phyPlan, er.b.is, er.b.ctx) + d, err := EvalSubquery(info.p, er.b.is, er.b.ctx) if err != nil { er.err = errors.Trace(err) return v, true diff --git a/plan/match_property.go b/plan/match_property.go index 6e806240a6ab1..2896a5435acbd 100644 --- a/plan/match_property.go +++ b/plan/match_property.go @@ -184,16 +184,15 @@ func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*phys // matchProperty implements PhysicalPlan matchProperty interface. func (p *PhysicalUnionScan) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { - if childPlanInfo[0].p == nil { - res := p.GetChildByIndex(0).(PhysicalPlan).matchProperty(prop, childPlanInfo...) - np := *p - np.SetChildren(res.p) - res.p = &np - return res - } + limit := prop.limit + res := p.GetChildByIndex(0).(PhysicalPlan).matchProperty(removeLimit(prop, false), childPlanInfo...) np := *p - np.SetChildren(childPlanInfo[0].p) - return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost} + np.SetChildren(res.p) + res.p = &np + if limit != nil { + res = addPlanToResponse(limit, res) + } + return res } // matchProperty implements PhysicalPlan matchProperty interface. diff --git a/plan/optimizer.go b/plan/optimizer.go index 1a5535e597f0e..fe73f80c1ee78 100644 --- a/plan/optimizer.go +++ b/plan/optimizer.go @@ -53,7 +53,7 @@ func Optimize(ctx context.Context, node ast.Node, is infoschema.InfoSchema) (Pla if err != nil { return nil, errors.Trace(err) } - p = info.p.PushLimit(nil) + p = info.p log.Debugf("[PLAN] %s", ToString(p)) return p, nil } diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 53d9ca47d2a14..1cc8e3f4f03fe 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -80,18 +80,19 @@ func getRowCountByIndexRange(table *statistics.Table, indexRange *IndexRange, in func (p *DataSource) handleTableScan(prop *requiredProperty) (*physicalPlanInfo, error) { table := p.Table + client := p.ctx.GetClient() var resultPlan PhysicalPlan ts := &PhysicalTableScan{ - ctx: p.ctx, - Table: p.Table, - Columns: p.Columns, - TableAsName: p.TableAsName, - DBName: p.DBName, + ctx: p.ctx, + Table: p.Table, + Columns: p.Columns, + TableAsName: p.TableAsName, + DBName: p.DBName, + physicalTableSource: physicalTableSource{client: client}, } ts.SetSchema(p.GetSchema()) resultPlan = ts var oldConditions []expression.Expression - var err error if sel, ok := p.GetParentByIndex(0).(*Selection); ok { newSel := *sel conds := make([]expression.Expression, 0, len(sel.Conditions)) @@ -100,7 +101,6 @@ func (p *DataSource) handleTableScan(prop *requiredProperty) (*physicalPlanInfo, conds = append(conds, cond.DeepCopy()) } ts.AccessCondition, newSel.Conditions = detachTableScanConditions(conds, table) - client := p.ctx.GetClient() if client != nil { var memDB bool switch p.DBName.L { @@ -108,13 +108,10 @@ func (p *DataSource) handleTableScan(prop *requiredProperty) (*physicalPlanInfo, memDB = true } if !memDB && client.SupportRequestType(kv.ReqTypeSelect, 0) { - ts.ConditionPBExpr, newSel.Conditions, err = expressionsToPB(newSel.Conditions, client) - } - if err != nil { - return nil, errors.Trace(err) + ts.ConditionPBExpr, newSel.Conditions = expressionsToPB(newSel.Conditions, client) } } - err = buildTableRange(ts) + err := buildTableRange(ts) if err != nil { return nil, errors.Trace(err) } @@ -180,20 +177,21 @@ func (p *DataSource) handleTableScan(prop *requiredProperty) (*physicalPlanInfo, func (p *DataSource) handleIndexScan(prop *requiredProperty, index *model.IndexInfo) (*physicalPlanInfo, error) { statsTbl := p.statisticTable var resultPlan PhysicalPlan + client := p.ctx.GetClient() is := &PhysicalIndexScan{ - ctx: p.ctx, - Index: index, - Table: p.Table, - Columns: p.Columns, - TableAsName: p.TableAsName, - OutOfOrder: true, - DBName: p.DBName, + ctx: p.ctx, + Index: index, + Table: p.Table, + Columns: p.Columns, + TableAsName: p.TableAsName, + OutOfOrder: true, + DBName: p.DBName, + physicalTableSource: physicalTableSource{client: client}, } is.SetSchema(p.schema) rowCount := uint64(statsTbl.Count) resultPlan = is var oldConditions []expression.Expression - var err error if sel, ok := p.GetParentByIndex(0).(*Selection); ok { rowCount = 0 newSel := *sel @@ -204,7 +202,6 @@ func (p *DataSource) handleIndexScan(prop *requiredProperty, index *model.IndexI } oldConditions = sel.Conditions is.AccessCondition, newSel.Conditions = detachIndexScanConditions(conds, is) - client := p.ctx.GetClient() if client != nil { var memDB bool switch p.DBName.L { @@ -212,13 +209,10 @@ func (p *DataSource) handleIndexScan(prop *requiredProperty, index *model.IndexI memDB = true } if !memDB && client.SupportRequestType(kv.ReqTypeSelect, 0) { - is.ConditionPBExpr, newSel.Conditions, err = expressionsToPB(newSel.Conditions, client) - } - if err != nil { - return nil, errors.Trace(err) + is.ConditionPBExpr, newSel.Conditions = expressionsToPB(newSel.Conditions, client) } } - err = buildIndexRange(is) + err := buildIndexRange(is) if err != nil { return nil, errors.Trace(err) } @@ -323,6 +317,47 @@ func addPlanToResponse(p PhysicalPlan, planInfo *physicalPlanInfo) *physicalPlan return &physicalPlanInfo{p: np, cost: planInfo.cost, count: planInfo.count} } +func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPlanInfo { + if len(prop.props) != 0 { + items := make([]*ByItems, 0, len(prop.props)) + for _, col := range prop.props { + items = append(items, &ByItems{Expr: col.col, Desc: col.desc}) + } + sort := &Sort{ + ByItems: items, + ExecLimit: prop.limit, + } + info = addPlanToResponse(sort, info) + count := info.count + if prop.limit != nil { + count = prop.limit.Offset + prop.limit.Count + } + info.cost += float64(info.count)*cpuFactor + float64(count)*memoryFactor + } else if prop.limit != nil { + info = addPlanToResponse(prop.limit, info) + } + if prop.limit != nil && prop.limit.Count < info.count { + info.count = prop.limit.Count + } + return info +} + +func removeLimit(prop *requiredProperty, removeAll bool) *requiredProperty { + ret := &requiredProperty{ + props: prop.props, + sortKeyLen: prop.sortKeyLen, + } + if removeAll { + return ret + } + if prop.limit != nil { + ret.limit = &Limit{ + Count: prop.limit.Offset + prop.limit.Count, + } + } + return ret +} + // convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. func (p *Limit) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, error) { info, err := p.getPlanInfo(prop) @@ -332,15 +367,11 @@ func (p *Limit) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, if info != nil { return info, nil } - info, err = p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(prop) + info, err = p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(&requiredProperty{limit: &Limit{Offset: p.Offset, Count: p.Count}}) if err != nil { return nil, errors.Trace(err) } - count := p.Offset + p.Count - info = addPlanToResponse(p, info) - if count < info.count { - info.count = count - } + info = enforceProperty(prop, info) p.storePlanInfo(prop, info) return info, nil } @@ -372,21 +403,24 @@ func (p *Join) handleLeftJoin(prop *requiredProperty, innerJoin bool) (*physical } else { join.JoinType = LeftOuterJoin } + lProp := prop if !allLeft { - prop = &requiredProperty{} + lProp = &requiredProperty{} } - lInfo, err := lChild.convert2PhysicalPlan(prop) + lInfo, err := lChild.convert2PhysicalPlan(removeLimit(lProp, innerJoin)) if err != nil { return nil, errors.Trace(err) } - if !allLeft { - lInfo.cost = math.MaxFloat64 - } rInfo, err := rChild.convert2PhysicalPlan(&requiredProperty{}) if err != nil { return nil, errors.Trace(err) } resultInfo := join.matchProperty(prop, lInfo, rInfo) + if !allLeft { + resultInfo = enforceProperty(prop, resultInfo) + } else if prop.limit != nil { + resultInfo = addPlanToResponse(prop.limit, resultInfo) + } return resultInfo, nil } @@ -420,11 +454,20 @@ func (p *Join) handleRightJoin(prop *requiredProperty, innerJoin bool) (*physica if err != nil { return nil, errors.Trace(err) } - rInfo, err := rChild.convert2PhysicalPlan(prop) + rProp := prop + if !allRight { + rProp = &requiredProperty{} + } + rInfo, err := rChild.convert2PhysicalPlan(removeLimit(rProp, innerJoin)) if err != nil { return nil, errors.Trace(err) } resultInfo := join.matchProperty(prop, lInfo, rInfo) + if !allRight { + resultInfo = enforceProperty(prop, resultInfo) + } else if prop.limit != nil { + resultInfo = addPlanToResponse(prop.limit, resultInfo) + } return resultInfo, nil } @@ -456,10 +499,14 @@ func (p *Join) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, Anti: p.anti, } join.SetSchema(p.schema) + lProp := prop if !allLeft { - prop = &requiredProperty{} + lProp = &requiredProperty{} + } + if p.JoinType == SemiJoin { + lProp = removeLimit(lProp, true) } - lInfo, err := lChild.convert2PhysicalPlan(prop) + lInfo, err := lChild.convert2PhysicalPlan(lProp) if err != nil { return nil, errors.Trace(err) } @@ -474,7 +521,9 @@ func (p *Join) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, resultInfo.count = lInfo.count } if !allLeft { - resultInfo.cost = math.MaxFloat64 + resultInfo = enforceProperty(prop, resultInfo) + } else if prop.limit != nil && p.JoinType == SemiJoin { + resultInfo = addPlanToResponse(prop.limit, resultInfo) } p.storePlanInfo(prop, resultInfo) return resultInfo, nil @@ -564,7 +613,7 @@ func (p *Aggregation) handleStreamAgg(prop *requiredProperty) (*physicalPlanInfo return info, nil } -func (p *Aggregation) handleFinalAgg(x physicalDistSQLPlan, childInfo *physicalPlanInfo) (*physicalPlanInfo, error) { +func (p *Aggregation) handleFinalAgg(x physicalDistSQLPlan, childInfo *physicalPlanInfo) *physicalPlanInfo { agg := &PhysicalAggregation{ AggType: FinalAgg, AggFuncs: p.AggFuncs, @@ -572,19 +621,16 @@ func (p *Aggregation) handleFinalAgg(x physicalDistSQLPlan, childInfo *physicalP } agg.SetSchema(p.schema) agg.HasGby = len(p.GroupByItems) > 0 - schema, err := x.addAggregation(agg, p.ctx) - if err != nil { - return nil, errors.Trace(err) - } + schema := x.addAggregation(agg) if len(schema) == 0 { - return nil, nil + return nil } x.(PhysicalPlan).SetSchema(schema) info := addPlanToResponse(agg, childInfo) info.count = uint64(float64(info.count) * aggFactor) // if we build a final agg, it must be the best plan. info.cost = 0 - return info, nil + return info } func (p *Aggregation) handleHashAgg() (*physicalPlanInfo, error) { @@ -601,10 +647,7 @@ func (p *Aggregation) handleHashAgg() (*physicalPlanInfo, error) { } if !distinct { if x, ok := childInfo.p.(physicalDistSQLPlan); ok { - info, err := p.handleFinalAgg(x, childInfo) - if err != nil { - return nil, errors.Trace(err) - } + info := p.handleFinalAgg(x, childInfo) if info != nil { return info, nil } @@ -636,10 +679,12 @@ func (p *Aggregation) convert2PhysicalPlan(prop *requiredProperty) (*physicalPla if err != nil { return nil, errors.Trace(err) } - if len(prop.props) > 0 { - planInfo.cost = math.MaxFloat64 + enforceProperty(prop, planInfo) + limit := prop.limit + streamInfo, err := p.handleStreamAgg(removeLimit(prop, true)) + if limit != nil { + streamInfo = addPlanToResponse(limit, streamInfo) } - streamInfo, err := p.handleStreamAgg(prop) if streamInfo.cost < planInfo.cost { planInfo = streamInfo } @@ -656,13 +701,12 @@ func (p *Union) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, if info != nil { return info, nil } + limit := prop.limit childInfos := make([]*physicalPlanInfo, 0, len(p.children)) var count uint64 for _, child := range p.GetChildren() { - newProp := &requiredProperty{ - props: make([]*columnProp, 0, len(prop.props)), - sortKeyLen: prop.sortKeyLen, - } + newProp := removeLimit(prop, false) + newProp.props = make([]*columnProp, 0, len(prop.props)) for _, c := range prop.props { idx := p.GetSchema().GetIndex(c.col) newProp.props = append(newProp.props, &columnProp{col: child.GetSchema()[idx], desc: c.desc}) @@ -675,6 +719,9 @@ func (p *Union) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, childInfos = append(childInfos, info) } info = p.matchProperty(prop, childInfos...) + if limit != nil { + info = addPlanToResponse(limit, info) + } info.count = count p.storePlanInfo(prop, info) return info, nil @@ -682,7 +729,6 @@ func (p *Union) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, // convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. func (p *Selection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, error) { - var err error info, err := p.getPlanInfo(prop) if err != nil { return nil, errors.Trace(err) @@ -690,18 +736,59 @@ func (p *Selection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanI if info != nil { return info, nil } - info, err = p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(prop) + // Firstly, we try to push order + info, err = p.handlePushOrder(prop) if err != nil { return nil, errors.Trace(err) } - if _, ok := p.GetChildByIndex(0).(*DataSource); ok { - return info, nil + infoEnforce, err := p.handlePushNothing(prop) + if err != nil { + return nil, errors.Trace(err) + } + if infoEnforce.cost < info.cost { + info = infoEnforce + } + if _, ok := p.GetChildByIndex(0).(*DataSource); !ok { + info = p.matchProperty(prop, info) } - info = p.matchProperty(prop, info) p.storePlanInfo(prop, info) return info, nil } +func (p *Selection) handlePushOrder(prop *requiredProperty) (*physicalPlanInfo, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + limit := prop.limit + info, err := child.convert2PhysicalPlan(removeLimit(prop, true)) + if err != nil { + return nil, errors.Trace(err) + } + if limit != nil { + if np, ok := info.p.(physicalDistSQLPlan); ok { + np.addLimit(limit) + info.count = limit.Count + } else { + info = addPlanToResponse(limit, info) + } + } + return info, nil +} + +func (p *Selection) handlePushNothing(prop *requiredProperty) (*physicalPlanInfo, error) { + child := p.GetChildByIndex(0).(LogicalPlan) + info, err := child.convert2PhysicalPlan(&requiredProperty{}) + if err != nil { + return nil, errors.Trace(err) + } + // TODO: Because we haven't support DAG push down, we can only push down topn by this ugly way. + if prop.limit != nil && len(prop.props) > 0 { + if np, ok := info.p.(physicalDistSQLPlan); ok { + np.addTopN(prop) + } + info = enforceProperty(prop, info) + } + return info, nil +} + // convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. func (p *Projection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, error) { info, err := p.getPlanInfo(prop) @@ -713,7 +800,10 @@ func (p *Projection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlan } newProp := &requiredProperty{ props: make([]*columnProp, 0, len(prop.props)), - sortKeyLen: prop.sortKeyLen} + sortKeyLen: prop.sortKeyLen, + limit: prop.limit} + if len(prop.props) > 0 { + } childSchema := p.GetChildByIndex(0).GetSchema() usedCols := make([]bool, len(childSchema)) canPassSort := true @@ -792,6 +882,9 @@ func (p *Sort) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, } } selfProp.sortKeyLen = len(selfProp.props) + if len(prop.props) == 0 && prop.limit != nil { + selfProp.limit = prop.limit + } sortedPlanInfo, err := p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(selfProp) if err != nil { return nil, errors.Trace(err) @@ -802,11 +895,12 @@ func (p *Sort) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, } cnt := float64(unSortedPlanInfo.count) sortCost := cnt*math.Log2(cnt)*cpuFactor + memoryFactor*cnt - if len(selfProp.props) == 0 { - sortedPlanInfo = addPlanToResponse(p, unSortedPlanInfo) - } else if sortCost+unSortedPlanInfo.cost < sortedPlanInfo.cost { + if sortCost+unSortedPlanInfo.cost < sortedPlanInfo.cost { sortedPlanInfo.cost = sortCost + unSortedPlanInfo.cost - sortedPlanInfo = addPlanToResponse(p, unSortedPlanInfo) + np := *p + np.ExecLimit = selfProp.limit + sortedPlanInfo = addPlanToResponse(&np, unSortedPlanInfo) + } if !matchProp(prop, selfProp) { sortedPlanInfo.cost = math.MaxFloat64 @@ -835,11 +929,15 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, InnerPlan: innerInfo.p, } np.SetSchema(p.GetSchema()) - info, err = child.convert2PhysicalPlan(prop) + limit := prop.limit + info, err = child.convert2PhysicalPlan(removeLimit(prop, true)) if err != nil { return nil, errors.Trace(err) } info = addPlanToResponse(np, info) + if limit != nil { + info = addPlanToResponse(limit, info) + } p.storePlanInfo(prop, info) return info, nil } @@ -855,12 +953,16 @@ func (p *Distinct) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanIn return info, nil } child := p.GetChildByIndex(0).(LogicalPlan) - info, err = child.convert2PhysicalPlan(prop) + limit := prop.limit + info, err = child.convert2PhysicalPlan(removeLimit(prop, true)) if err != nil { return nil, errors.Trace(err) } info = addPlanToResponse(p, info) info.count = uint64(float64(info.count) * distinctFactor) + if limit != nil { + info = addPlanToResponse(limit, info) + } p.storePlanInfo(prop, info) return info, nil } diff --git a/plan/physical_plans.go b/plan/physical_plans.go index e7c4b7e6fac50..3d0b7b7fbc70a 100644 --- a/plan/physical_plans.go +++ b/plan/physical_plans.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/util/charset" @@ -29,6 +30,11 @@ import ( "github.com/pingcap/tipb/go-tipb" ) +var ( + _ physicalDistSQLPlan = &PhysicalTableScan{} + _ physicalDistSQLPlan = &PhysicalIndexScan{} +) + // PhysicalIndexScan represents an index scan plan. type PhysicalIndexScan struct { basePlan @@ -52,21 +58,26 @@ type PhysicalIndexScan struct { ConditionPBExpr *tipb.Expr TableAsName *model.CIStr - - LimitCount *int64 } type physicalDistSQLPlan interface { - addAggregation(agg *PhysicalAggregation, ctx context.Context) (expression.Schema, error) + addAggregation(agg *PhysicalAggregation) expression.Schema + addTopN(prop *requiredProperty) + addLimit(limit *Limit) } type physicalTableSource struct { + client kv.Client + Aggregated bool AggFields []*types.FieldType AggFuncs []*tipb.Expr GbyItems []*tipb.ByItem ConditionPBExpr *tipb.Expr + + LimitCount *int64 + SortItems []*tipb.ByItem } func (p *physicalTableSource) clear() { @@ -85,30 +96,41 @@ func needValue(af expression.AggregationFunction) bool { af.GetName() == ast.AggFuncMax || af.GetName() == ast.AggFuncMin || af.GetName() == ast.AggFuncGroupConcat } -func (p *physicalTableSource) addAggregation(agg *PhysicalAggregation, ctx context.Context) (expression.Schema, error) { - client := ctx.GetClient() - if client == nil { - return nil, nil +func (p *physicalTableSource) addLimit(l *Limit) { + count := int64(l.Count + l.Offset) + p.LimitCount = &count +} + +func (p *physicalTableSource) addTopN(prop *requiredProperty) { + if p.client == nil || !p.client.SupportRequestType(kv.ReqTypeSelect, kv.ReqSubTypeTopN) { + return + } + if prop.limit != nil { + count := int64(prop.limit.Count + prop.limit.Offset) + p.LimitCount = &count + } + for _, item := range prop.props { + p.SortItems = append(p.SortItems, sortByItemToPB(p.client, item.col, item.desc)) + } +} + +func (p *physicalTableSource) addAggregation(agg *PhysicalAggregation) expression.Schema { + if p.client == nil { + return nil } for _, f := range agg.AggFuncs { - pb, err := aggFuncToPBExpr(client, f) - if err != nil { - return nil, errors.Trace(err) - } + pb := aggFuncToPBExpr(p.client, f) if pb == nil { p.clear() - return nil, nil + return nil } p.AggFuncs = append(p.AggFuncs, pb) } for _, item := range agg.GroupByItems { - pb, err := groupByItemToPB(client, item) - if err != nil { - return nil, errors.Trace(err) - } + pb := groupByItemToPB(p.client, item) if pb == nil { p.clear() - return nil, nil + return nil } p.GbyItems = append(p.GbyItems, pb) } @@ -146,7 +168,7 @@ func (p *physicalTableSource) addAggregation(agg *PhysicalAggregation, ctx conte newAggFuncs[i] = fun } agg.AggFuncs = newAggFuncs - return schema, nil + return schema } // PhysicalTableScan represents a table scan plan. @@ -168,8 +190,6 @@ type PhysicalTableScan struct { TableAsName *model.CIStr - LimitCount *int64 - // If sort data by scanning pkcol, KeepOrder should be true. KeepOrder bool } @@ -506,7 +526,11 @@ func (p *Limit) Copy() PhysicalPlan { // MarshalJSON implements json.Marshaler interface. func (p *Limit) MarshalJSON() ([]byte, error) { - child, err := json.Marshal(p.children[0].(PhysicalPlan)) + var child PhysicalPlan + if len(p.children) > 0 { + child = p.children[0].(PhysicalPlan) + } + childStr, err := json.Marshal(child) if err != nil { return nil, errors.Trace(err) } @@ -514,7 +538,7 @@ func (p *Limit) MarshalJSON() ([]byte, error) { buffer.WriteString(fmt.Sprintf("\"type\": \"Limit\",\n"+ " \"limit\": %d,\n"+ " \"offset\": %d,\n"+ - " \"child\": %s}", p.Count, p.Offset, child)) + " \"child\": %s}", p.Count, p.Offset, childStr)) return buffer.Bytes(), nil } @@ -536,6 +560,10 @@ func (p *Sort) MarshalJSON() ([]byte, error) { if err != nil { return nil, errors.Trace(err) } + limit, err := json.Marshal(p.ExecLimit) + if err != nil { + return nil, errors.Trace(err) + } exprs, err := json.Marshal(p.ByItems) if err != nil { return nil, errors.Trace(err) @@ -543,7 +571,8 @@ func (p *Sort) MarshalJSON() ([]byte, error) { buffer := bytes.NewBufferString("{") buffer.WriteString(fmt.Sprintf("\"type\": \"Sort\",\n"+ " \"exprs\": %s,\n"+ - " \"child\": %s}", exprs, child)) + " \"limit\": %s,\n"+ + " \"child\": %s}", exprs, limit, child)) return buffer.Bytes(), nil } diff --git a/plan/plan.go b/plan/plan.go index 87f7fbaa5e140..90eb4f469c0c9 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -118,6 +118,7 @@ func (c *columnProp) equal(nc *columnProp) bool { type requiredProperty struct { props []*columnProp sortKeyLen int + limit *Limit } // getHashKey encodes a requiredProperty to a unique hash code. @@ -169,9 +170,6 @@ type PhysicalPlan interface { // Copy copies the current plan. Copy() PhysicalPlan - - // PushLimit tries to push down limit as deeply as possible. - PushLimit(l *Limit) PhysicalPlan } type baseLogicalPlan struct { diff --git a/plan/plan_test.go b/plan/plan_test.go index fa3e379c1a0f9..56166b292fa74 100644 --- a/plan/plan_test.go +++ b/plan/plan_test.go @@ -17,6 +17,7 @@ import ( "fmt" "testing" + "github.com/ngaut/log" . "github.com/pingcap/check" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression" @@ -313,8 +314,7 @@ func (s *testPlanSuite) TestJoinReOrder(c *C) { c.Assert(err, IsNil) info, err := lp.convert2PhysicalPlan(&requiredProperty{}) c.Assert(err, IsNil) - p = info.p.PushLimit(nil) - c.Assert(ToString(p), Equals, ca.best, Commentf("for %s", ca.sql)) + c.Assert(ToString(info.p), Equals, ca.best, Commentf("for %s", ca.sql)) } } @@ -371,7 +371,7 @@ func (s *testPlanSuite) TestCBO(c *C) { }, { sql: "select * from t a where a.c < 10000 order by a.a limit 2", - best: "Table(t)->Selection->Limit->Projection", + best: "Index(t.c_d_e)[[-inf,10000)]->Sort + Limit(2) + Offset(0)->Projection", }, { sql: "select * from (select * from t) a left outer join (select * from t) b on 1 order by a.c", @@ -430,8 +430,7 @@ func (s *testPlanSuite) TestCBO(c *C) { c.Assert(err, IsNil) info, err := lp.convert2PhysicalPlan(&requiredProperty{}) c.Assert(err, IsNil) - p = info.p.PushLimit(nil) - c.Assert(ToString(p), Equals, ca.best, Commentf("for %s", ca.sql)) + c.Assert(ToString(info.p), Equals, ca.best, Commentf("for %s", ca.sql)) } } @@ -592,8 +591,7 @@ func (s *testPlanSuite) TestRefine(c *C) { c.Assert(err, IsNil) info, err := p.convert2PhysicalPlan(&requiredProperty{}) c.Assert(err, IsNil) - np := info.p.PushLimit(nil) - c.Assert(ToString(np), Equals, ca.best, Commentf("for %s", ca.sql)) + c.Assert(ToString(info.p), Equals, ca.best, Commentf("for %s", ca.sql)) } } @@ -951,7 +949,8 @@ func (s *testPlanSuite) TestTableScanWithOrder(c *C) { c.Assert(err, IsNil) // Limit->Projection->PhysicalTableScan // Get PhysicalTableScan plan. - cpp, ok := info.p.GetChildByIndex(0).GetChildByIndex(0).(*PhysicalTableScan) + cpp, ok := info.p.GetChildByIndex(0).(*PhysicalTableScan) + log.Warnf("%s", ToString(info.p)) c.Assert(cpp, NotNil) c.Assert(ok, IsTrue) // Make sure KeepOrder is true. diff --git a/plan/planbuilder.go b/plan/planbuilder.go index c68c65cf027c1..d884d1e8a1ac4 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -491,7 +491,7 @@ func (b *planBuilder) buildExplain(explain *ast.ExplainStmt) Plan { b.err = errors.Trace(err) return nil } - targetPlan = info.p.PushLimit(nil) + targetPlan = info.p } p := &Explain{StmtPlan: targetPlan} addChild(p, targetPlan) diff --git a/plan/push_limit.go b/plan/push_limit.go deleted file mode 100644 index c5e4e94b8843c..0000000000000 --- a/plan/push_limit.go +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright 2016 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package plan - -func insertLimit(p PhysicalPlan, l *Limit) *Limit { - l.SetSchema(p.GetSchema()) - l.SetChildren(p) - p.SetParents(l) - return l -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Limit) PushLimit(l *Limit) PhysicalPlan { - child := p.GetChildByIndex(0).(PhysicalPlan) - newChild := child.PushLimit(p) - if l != nil { - p.Count = l.Count - p.Offset = l.Offset - p.SetChildren(newChild) - newChild.SetParents(p) - return p - } - return newChild -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Sort) PushLimit(l *Limit) PhysicalPlan { - child := p.GetChildByIndex(0).(PhysicalPlan) - newChild := child.PushLimit(nil) - p.ExecLimit = l - p.SetChildren(newChild) - newChild.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Selection) PushLimit(l *Limit) PhysicalPlan { - child := p.GetChildByIndex(0).(PhysicalPlan) - newChild := child.PushLimit(nil) - if l != nil { - return insertLimit(p, l) - } - p.SetChildren(newChild) - newChild.SetParents(l) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalHashSemiJoin) PushLimit(l *Limit) PhysicalPlan { - lChild := p.GetChildByIndex(0).(PhysicalPlan) - rChild := p.GetChildByIndex(1).(PhysicalPlan) - var newLChild, newRChild PhysicalPlan - if p.WithAux { - newLChild = lChild.PushLimit(l) - newRChild = rChild.PushLimit(nil) - } else { - newLChild = lChild.PushLimit(nil) - newRChild = rChild.PushLimit(nil) - } - p.SetChildren(newLChild, newRChild) - newLChild.SetParents(p) - newRChild.SetParents(p) - if l != nil && !p.WithAux { - return insertLimit(p, l) - } - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalHashJoin) PushLimit(l *Limit) PhysicalPlan { - lChild := p.GetChildByIndex(0).(PhysicalPlan) - rChild := p.GetChildByIndex(1).(PhysicalPlan) - var newLChild, newRChild PhysicalPlan - if p.JoinType == LeftOuterJoin { - if l != nil { - limit2Push := *l - limit2Push.Count += limit2Push.Offset - limit2Push.Offset = 0 - newLChild = lChild.PushLimit(&limit2Push) - } else { - newLChild = lChild.PushLimit(nil) - } - newRChild = rChild.PushLimit(nil) - } else if p.JoinType == RightOuterJoin { - if l != nil { - limit2Push := *l - limit2Push.Count += limit2Push.Offset - limit2Push.Offset = 0 - newRChild = rChild.PushLimit(&limit2Push) - } else { - newRChild = rChild.PushLimit(nil) - } - newLChild = lChild.PushLimit(nil) - } else { - newLChild = lChild.PushLimit(nil) - newRChild = rChild.PushLimit(nil) - } - p.SetChildren(newLChild, newRChild) - newLChild.SetParents(p) - newRChild.SetParents(p) - if l != nil { - return insertLimit(p, l) - } - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Union) PushLimit(l *Limit) PhysicalPlan { - for i, child := range p.GetChildren() { - if l != nil { - p.children[i] = child.(PhysicalPlan).PushLimit(&Limit{Count: l.Count + l.Offset}) - } else { - p.children[i] = child.(PhysicalPlan).PushLimit(nil) - } - p.children[i].SetParents(p) - } - if l != nil { - return insertLimit(p, l) - } - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Projection) PushLimit(l *Limit) PhysicalPlan { - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) - p.SetChildren(newChild) - newChild.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Trim) PushLimit(l *Limit) PhysicalPlan { - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) - p.SetChildren(newChild) - newChild.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *SelectLock) PushLimit(l *Limit) PhysicalPlan { - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) - p.SetChildren(newChild) - newChild.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalApply) PushLimit(l *Limit) PhysicalPlan { - p.InnerPlan = p.InnerPlan.PushLimit(nil) - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) - p.SetChildren(newChild) - newChild.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalAggregation) PushLimit(l *Limit) PhysicalPlan { - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) - p.SetChildren(newChild) - newChild.SetParents(p) - if l != nil { - return insertLimit(p, l) - } - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Distinct) PushLimit(l *Limit) PhysicalPlan { - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) - p.SetChildren(newChild) - newChild.SetParents(p) - if l != nil { - return insertLimit(p, l) - } - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *MaxOneRow) PushLimit(_ *Limit) PhysicalPlan { - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(&Limit{Count: 2}) - p.SetChildren(newChild) - newChild.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Exists) PushLimit(_ *Limit) PhysicalPlan { - newChild := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(&Limit{Count: 1}) - p.SetChildren(newChild) - newChild.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalIndexScan) PushLimit(l *Limit) PhysicalPlan { - if l != nil { - count := int64(l.Offset + l.Count) - p.LimitCount = &count - if l.Offset != 0 { - return insertLimit(p, l) - } - } - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalTableScan) PushLimit(l *Limit) PhysicalPlan { - if l != nil { - count := int64(l.Offset + l.Count) - p.LimitCount = &count - if l.Offset != 0 { - return insertLimit(p, l) - } - } - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalDummyScan) PushLimit(l *Limit) PhysicalPlan { - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Insert) PushLimit(_ *Limit) PhysicalPlan { - if len(p.GetChildren()) == 0 { - return p - } - np := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) - p.SetChildren(np) - p.SelectPlan = np - np.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *TableDual) PushLimit(l *Limit) PhysicalPlan { - if l == nil { - return p - } - return insertLimit(p, l) -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Update) PushLimit(_ *Limit) PhysicalPlan { - if len(p.GetChildren()) == 0 { - return p - } - np := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) - p.SetChildren(np) - p.SelectPlan = np - np.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *Delete) PushLimit(_ *Limit) PhysicalPlan { - if len(p.GetChildren()) == 0 { - return p - } - np := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(nil) - p.SetChildren(np) - p.SelectPlan = np - np.SetParents(p) - return p -} - -// PushLimit implements PhysicalPlan PushLimit interface. -func (p *PhysicalUnionScan) PushLimit(l *Limit) PhysicalPlan { - np := p.GetChildByIndex(0).(PhysicalPlan).PushLimit(l) - p.SetChildren(np) - np.SetParents(p) - if l != nil { - return insertLimit(p, l) - } - return p -} From 2a097dbeb641f893b81ac341e01bd9f8761b9dfd Mon Sep 17 00:00:00 2001 From: hanfei Date: Fri, 23 Sep 2016 10:02:46 +0800 Subject: [PATCH 2/8] consider limit during cbo --- executor/builder.go | 8 ++--- executor/explain_test.go | 1 + plan/logical_plan_builder.go | 3 +- plan/logical_plans.go | 4 +-- plan/match_property.go | 21 ++++++++---- plan/physical_plan_builder.go | 61 +++++++++++++++++++---------------- plan/physical_plans.go | 6 ++-- plan/plan_test.go | 10 +++--- plan/planbuilder.go | 12 ++----- plan/plans.go | 1 - 10 files changed, 66 insertions(+), 61 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index aca7af4a5a757..1f4325bedd877 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -262,8 +262,8 @@ func (b *executorBuilder) buildInsert(v *plan.Insert) Executor { Lists: v.Lists, Setlist: v.Setlist, } - if v.SelectPlan != nil { - ivs.SelectExec = b.build(v.SelectPlan) + if len(v.GetChildren()) > 0 { + ivs.SelectExec = b.build(v.GetChildByIndex(0)) } // Get Table ts, ok := v.Table.TableRefs.Left.(*ast.TableSource) @@ -677,7 +677,7 @@ func (b *executorBuilder) buildUnion(v *plan.Union) Executor { } func (b *executorBuilder) buildUpdate(v *plan.Update) Executor { - selExec := b.build(v.SelectPlan) + selExec := b.build(v.GetChildByIndex(0)) return &UpdateExec{ctx: b.ctx, SelectExec: selExec, OrderedList: v.OrderedList} } @@ -688,7 +688,7 @@ func (b *executorBuilder) buildDummyScan(v *plan.PhysicalDummyScan) Executor { } func (b *executorBuilder) buildDelete(v *plan.Delete) Executor { - selExec := b.build(v.SelectPlan) + selExec := b.build(v.GetChildByIndex(0)) return &DeleteExec{ ctx: b.ctx, SelectExec: selExec, diff --git a/executor/explain_test.go b/executor/explain_test.go index 1a30ee3906947..9abb68631c97a 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -82,6 +82,7 @@ func (s *testSuite) TestExplain(c *C) { "Desc": false } ], + "limit": null, "child": { "type": "Projection", "exprs": [ diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index 72fc442a4514f..692b705d63ee6 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -1037,7 +1037,7 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) LogicalPlan { return nil } p = np - updt := &Update{OrderedList: orderedList, SelectPlan: p, baseLogicalPlan: newBaseLogicalPlan(Up, b.allocator)} + updt := &Update{OrderedList: orderedList, baseLogicalPlan: newBaseLogicalPlan(Up, b.allocator)} updt.self = updt updt.initID() addChild(updt, p) @@ -1105,7 +1105,6 @@ func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) LogicalPlan { del := &Delete{ Tables: tables, IsMultiTable: delete.IsMultiTable, - SelectPlan: p, baseLogicalPlan: newBaseLogicalPlan(Del, b.allocator), } del.self = del diff --git a/plan/logical_plans.go b/plan/logical_plans.go index 2200631e20fa5..27884994fdba6 100644 --- a/plan/logical_plans.go +++ b/plan/logical_plans.go @@ -76,6 +76,8 @@ type Selection struct { // but after we converted to CNF(Conjunctive normal form), it can be // split into a list of AND conditions. Conditions []expression.Expression + + aboveTableScan bool } // Apply gets one row from outer executor and gets one row from inner executor according to outer row. @@ -144,7 +146,6 @@ type Sort struct { type Update struct { baseLogicalPlan - SelectPlan Plan OrderedList []*expression.Assignment } @@ -152,7 +153,6 @@ type Update struct { type Delete struct { baseLogicalPlan - SelectPlan Plan Tables []*ast.TableName IsMultiTable bool } diff --git a/plan/match_property.go b/plan/match_property.go index 2896a5435acbd..14d8dd2957d81 100644 --- a/plan/match_property.go +++ b/plan/match_property.go @@ -25,13 +25,16 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy rowCount := float64(infos[0].count) cost := rowCount * netWorkFactor if len(prop.props) == 0 { - return &physicalPlanInfo{p: ts, cost: cost, count: infos[0].count} + return enforceProperty(prop, &physicalPlanInfo{p: ts, cost: cost, count: infos[0].count}) } if len(prop.props) == 1 && ts.pkCol != nil && ts.pkCol == prop.props[0].col { sortedTs := *ts sortedTs.Desc = prop.props[0].desc sortedTs.KeepOrder = true - return &physicalPlanInfo{p: &sortedTs, cost: cost, count: infos[0].count} + return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{ + p: &sortedTs, + cost: cost, + count: infos[0].count}) } return &physicalPlanInfo{p: ts, cost: math.MaxFloat64, count: infos[0].count} } @@ -75,7 +78,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy cost *= 2 } if len(prop.props) == 0 { - return &physicalPlanInfo{p: is, cost: cost, count: infos[0].count} + return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{p: is, cost: cost, count: infos[0].count}) } matchedIdx := 0 matchedList := make([]bool, len(prop.props)) @@ -103,13 +106,19 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy if allAsc { sortedIs := *is sortedIs.OutOfOrder = false - return &physicalPlanInfo{p: &sortedIs, cost: sortedCost, count: infos[0].count} + return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{ + p: &sortedIs, + cost: sortedCost, + count: infos[0].count}) } if allDesc { sortedIs := *is sortedIs.Desc = true sortedIs.OutOfOrder = false - return &physicalPlanInfo{p: &sortedIs, cost: sortedCost, count: infos[0].count} + return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{ + p: &sortedIs, + cost: sortedCost, + count: infos[0].count}) } } return &physicalPlanInfo{p: is, cost: math.MaxFloat64, count: infos[0].count} @@ -168,7 +177,7 @@ func (p *Union) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPla // matchProperty implements PhysicalPlan matchProperty interface. func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { - if childPlanInfo[0].p == nil { + if p.aboveTableScan { res := p.GetChildByIndex(0).(PhysicalPlan).matchProperty(prop, childPlanInfo...) sel := *p sel.SetChildren(res.p) diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 90fae789a13aa..0ad706c459a99 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -90,6 +90,7 @@ func (p *DataSource) handleTableScan(prop *requiredProperty) (*physicalPlanInfo, DBName: p.DBName, physicalTableSource: physicalTableSource{client: client}, } + ts.addLimit(prop.limit) ts.SetSchema(p.GetSchema()) resultPlan = ts var oldConditions []expression.Expression @@ -117,6 +118,7 @@ func (p *DataSource) handleTableScan(prop *requiredProperty) (*physicalPlanInfo, } if len(newSel.Conditions) > 0 { newSel.SetChildren(ts) + newSel.aboveTableScan = true resultPlan = &newSel } } else { @@ -189,6 +191,7 @@ func (p *DataSource) handleIndexScan(prop *requiredProperty, index *model.IndexI DBName: p.DBName, physicalTableSource: physicalTableSource{client: client}, } + is.addLimit(prop.limit) is.SetSchema(p.schema) rowCount := uint64(statsTbl.Count) resultPlan = is @@ -226,6 +229,7 @@ func (p *DataSource) handleIndexScan(prop *requiredProperty, index *model.IndexI } if len(newSel.Conditions) > 0 { newSel.SetChildren(is) + newSel.aboveTableScan = true resultPlan = &newSel } } else { @@ -329,6 +333,7 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl ByItems: items, ExecLimit: prop.limit, } + sort.SetSchema(info.p.GetSchema()) info = addPlanToResponse(sort, info) count := info.count if prop.limit != nil { @@ -336,7 +341,9 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl } info.cost += float64(info.count)*cpuFactor + float64(count)*memoryFactor } else if prop.limit != nil { - info = addPlanToResponse(prop.limit, info) + limit := prop.limit.Copy() + limit.SetSchema(info.p.GetSchema()) + info = addPlanToResponse(limit, info) } if prop.limit != nil && prop.limit.Count < info.count { info.count = prop.limit.Count @@ -420,8 +427,8 @@ func (p *Join) handleLeftJoin(prop *requiredProperty, innerJoin bool) (*physical resultInfo := join.matchProperty(prop, lInfo, rInfo) if !allLeft { resultInfo = enforceProperty(prop, resultInfo) - } else if prop.limit != nil { - resultInfo = addPlanToResponse(prop.limit, resultInfo) + } else { + resultInfo = enforceProperty(&requiredProperty{limit: prop.limit}, resultInfo) } return resultInfo, nil } @@ -467,8 +474,8 @@ func (p *Join) handleRightJoin(prop *requiredProperty, innerJoin bool) (*physica resultInfo := join.matchProperty(prop, lInfo, rInfo) if !allRight { resultInfo = enforceProperty(prop, resultInfo) - } else if prop.limit != nil { - resultInfo = addPlanToResponse(prop.limit, resultInfo) + } else { + resultInfo = enforceProperty(&requiredProperty{limit: prop.limit}, resultInfo) } return resultInfo, nil } @@ -524,8 +531,8 @@ func (p *Join) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, } if !allLeft { resultInfo = enforceProperty(prop, resultInfo) - } else if prop.limit != nil && p.JoinType == SemiJoin { - resultInfo = addPlanToResponse(prop.limit, resultInfo) + } else if p.JoinType == SemiJoin { + resultInfo = enforceProperty(&requiredProperty{limit: prop.limit}, resultInfo) } p.storePlanInfo(prop, resultInfo) return resultInfo, nil @@ -677,19 +684,18 @@ func (p *Aggregation) convert2PhysicalPlan(prop *requiredProperty) (*physicalPla if planInfo != nil { return planInfo, nil } - planInfo, err = p.handleHashAgg() - if err != nil { - return nil, errors.Trace(err) - } - enforceProperty(prop, planInfo) limit := prop.limit - streamInfo, err := p.handleStreamAgg(removeLimit(prop, true)) - if limit != nil { - streamInfo = addPlanToResponse(limit, streamInfo) + if len(prop.props) == 0 { + planInfo, err = p.handleHashAgg() + if err != nil { + return nil, errors.Trace(err) + } } - if streamInfo.cost < planInfo.cost { + streamInfo, err := p.handleStreamAgg(removeLimit(prop, true)) + if planInfo == nil || streamInfo.cost < planInfo.cost { planInfo = streamInfo } + planInfo = enforceProperty(&requiredProperty{limit: limit}, planInfo) err = p.storePlanInfo(prop, planInfo) return planInfo, errors.Trace(err) } @@ -721,9 +727,7 @@ func (p *Union) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, childInfos = append(childInfos, info) } info = p.matchProperty(prop, childInfos...) - if limit != nil { - info = addPlanToResponse(limit, info) - } + info = enforceProperty(&requiredProperty{limit: limit}, info) info.count = count p.storePlanInfo(prop, info) return info, nil @@ -769,7 +773,7 @@ func (p *Selection) handlePushOrder(prop *requiredProperty) (*physicalPlanInfo, np.addLimit(limit) info.count = limit.Count } else { - info = addPlanToResponse(limit, info) + info = enforceProperty(&requiredProperty{limit: limit}, info) } } return info, nil @@ -787,6 +791,8 @@ func (p *Selection) handlePushNothing(prop *requiredProperty) (*physicalPlanInfo np.addTopN(prop) } info = enforceProperty(prop, info) + } else if len(prop.props) != 0 { + info.cost = math.MaxFloat64 } return info, nil } @@ -897,12 +903,15 @@ func (p *Sort) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, } cnt := float64(unSortedPlanInfo.count) sortCost := cnt*math.Log2(cnt)*cpuFactor + memoryFactor*cnt - if sortCost+unSortedPlanInfo.cost < sortedPlanInfo.cost { + if len(selfProp.props) == 0 { + np := p.Copy().(*Sort) + np.ExecLimit = selfProp.limit + sortedPlanInfo = addPlanToResponse(np, sortedPlanInfo) + } else if sortCost+unSortedPlanInfo.cost < sortedPlanInfo.cost { sortedPlanInfo.cost = sortCost + unSortedPlanInfo.cost np := *p np.ExecLimit = selfProp.limit sortedPlanInfo = addPlanToResponse(&np, unSortedPlanInfo) - } if !matchProp(prop, selfProp) { sortedPlanInfo.cost = math.MaxFloat64 @@ -937,9 +946,7 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, return nil, errors.Trace(err) } info = addPlanToResponse(np, info) - if limit != nil { - info = addPlanToResponse(limit, info) - } + info = enforceProperty(&requiredProperty{limit: limit}, info) p.storePlanInfo(prop, info) return info, nil } @@ -962,9 +969,7 @@ func (p *Distinct) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanIn } info = addPlanToResponse(p, info) info.count = uint64(float64(info.count) * distinctFactor) - if limit != nil { - info = addPlanToResponse(limit, info) - } + info = enforceProperty(&requiredProperty{limit: limit}, info) p.storePlanInfo(prop, info) return info, nil } diff --git a/plan/physical_plans.go b/plan/physical_plans.go index 3d0b7b7fbc70a..b4cffcaac79b1 100644 --- a/plan/physical_plans.go +++ b/plan/physical_plans.go @@ -97,8 +97,10 @@ func needValue(af expression.AggregationFunction) bool { } func (p *physicalTableSource) addLimit(l *Limit) { - count := int64(l.Count + l.Offset) - p.LimitCount = &count + if l != nil { + count := int64(l.Count + l.Offset) + p.LimitCount = &count + } } func (p *physicalTableSource) addTopN(prop *requiredProperty) { diff --git a/plan/plan_test.go b/plan/plan_test.go index 2c68007a3740b..82d2bdfe2b345 100644 --- a/plan/plan_test.go +++ b/plan/plan_test.go @@ -15,9 +15,10 @@ package plan import ( "fmt" + "sort" + "strings" "testing" - "github.com/ngaut/log" . "github.com/pingcap/check" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression" @@ -29,8 +30,6 @@ import ( "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/types" - "sort" - "strings" ) var _ = Suite(&testPlanSuite{}) @@ -395,7 +394,7 @@ func (s *testPlanSuite) TestCBO(c *C) { }, { sql: "select exists(select * from t b where a.a = b.a and b.c = 1) from t a order by a.c limit 3", - best: "SemiJoinWithAux{Index(t.c_d_e)[[,+inf]]->Index(t.c_d_e)[[1,1]]}->Projection->Trim", + best: "SemiJoinWithAux{Index(t.c_d_e)[[,+inf]]->Limit->Index(t.c_d_e)[[1,1]]}->Projection->Trim", }, { sql: "select * from (select t.a from t union select t.d from t where t.c = 1 union select t.c from t) k order by a limit 1", @@ -949,8 +948,7 @@ func (s *testPlanSuite) TestTableScanWithOrder(c *C) { c.Assert(err, IsNil) // Limit->Projection->PhysicalTableScan // Get PhysicalTableScan plan. - cpp, ok := info.p.GetChildByIndex(0).(*PhysicalTableScan) - log.Warnf("%s", ToString(info.p)) + cpp, ok := info.p.GetChildByIndex(0).GetChildByIndex(0).(*PhysicalTableScan) c.Assert(cpp, NotNil) c.Assert(ok, IsTrue) // Make sure KeepOrder is true. diff --git a/plan/planbuilder.go b/plan/planbuilder.go index d884d1e8a1ac4..37ef6265b8f8e 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -39,14 +39,6 @@ const ( SystemInternalError terror.ErrCode = 2 ) -// BuildPlan builds a plan from a node. -// It returns ErrUnsupportedType if ast.Node type is not supported yet. -func BuildPlan(node ast.Node) (Plan, error) { - builder := planBuilder{allocator: new(idAllocator)} - p := builder.build(node) - return p, builder.err -} - // planBuilder builds Plan from an ast.Node. // It just builds the ast node straightforwardly. type planBuilder struct { @@ -442,11 +434,11 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan { insertPlan.initID() insertPlan.self = insertPlan if insert.Select != nil { - insertPlan.SelectPlan = b.build(insert.Select) - addChild(insertPlan, insertPlan.SelectPlan) + selectPlan := b.build(insert.Select) if b.err != nil { return nil } + addChild(insertPlan, selectPlan) } return insertPlan } diff --git a/plan/plans.go b/plan/plans.go index 1dfc3bc11a9e8..e2de3ef3ba3ab 100644 --- a/plan/plans.go +++ b/plan/plans.go @@ -182,7 +182,6 @@ type Insert struct { Lists [][]ast.ExprNode Setlist []*ast.Assignment OnDuplicate []*ast.Assignment - SelectPlan Plan IsReplace bool Priority int From 1cbdafc85b1121edf890174d77d3d50285a001d8 Mon Sep 17 00:00:00 2001 From: hanfei Date: Fri, 23 Sep 2016 15:23:48 +0800 Subject: [PATCH 3/8] tiny change. --- plan/logical_plans.go | 1 + plan/physical_plan_builder.go | 30 ++++++++++++++++-------------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/plan/logical_plans.go b/plan/logical_plans.go index 27884994fdba6..cbad62e03f201 100644 --- a/plan/logical_plans.go +++ b/plan/logical_plans.go @@ -77,6 +77,7 @@ type Selection struct { // split into a list of AND conditions. Conditions []expression.Expression + // aboveTableScan means if this selection's child is a table scan or index scan. aboveTableScan bool } diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 0ad706c459a99..e195cef1b4d5f 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -351,6 +351,10 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl return info } +// removeLimit removes limit from prop. For example, When handling Sort,Limit -> Selection, we can't pass the Limit +// across the selection, because selection decreases the size of data, but we can pass the Sort below the selection. In +// this case, we set removeAll true. When handling Limit(1,1) -> LeftOuterJoin, we can pass the limit across join's left +// path, because the left outer join increases the size of data, but we can't pass offset value. So we set remove All to false. func removeLimit(prop *requiredProperty, removeAll bool) *requiredProperty { ret := &requiredProperty{ props: prop.props, @@ -367,6 +371,10 @@ func removeLimit(prop *requiredProperty, removeAll bool) *requiredProperty { return ret } +func limitProperty(limit *Limit) *requiredProperty { + return &requiredProperty{limit: limit} +} + // convert2PhysicalPlan implements LogicalPlan convert2PhysicalPlan interface. func (p *Limit) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, error) { info, err := p.getPlanInfo(prop) @@ -376,7 +384,7 @@ func (p *Limit) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, if info != nil { return info, nil } - info, err = p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(&requiredProperty{limit: &Limit{Offset: p.Offset, Count: p.Count}}) + info, err = p.GetChildByIndex(0).(LogicalPlan).convert2PhysicalPlan(limitProperty(&Limit{Offset: p.Offset, Count: p.Count})) if err != nil { return nil, errors.Trace(err) } @@ -428,7 +436,7 @@ func (p *Join) handleLeftJoin(prop *requiredProperty, innerJoin bool) (*physical if !allLeft { resultInfo = enforceProperty(prop, resultInfo) } else { - resultInfo = enforceProperty(&requiredProperty{limit: prop.limit}, resultInfo) + resultInfo = enforceProperty(limitProperty(prop.limit), resultInfo) } return resultInfo, nil } @@ -475,7 +483,7 @@ func (p *Join) handleRightJoin(prop *requiredProperty, innerJoin bool) (*physica if !allRight { resultInfo = enforceProperty(prop, resultInfo) } else { - resultInfo = enforceProperty(&requiredProperty{limit: prop.limit}, resultInfo) + resultInfo = enforceProperty(limitProperty(prop.limit), resultInfo) } return resultInfo, nil } @@ -532,7 +540,7 @@ func (p *Join) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, if !allLeft { resultInfo = enforceProperty(prop, resultInfo) } else if p.JoinType == SemiJoin { - resultInfo = enforceProperty(&requiredProperty{limit: prop.limit}, resultInfo) + resultInfo = enforceProperty(limitProperty(prop.limit), resultInfo) } p.storePlanInfo(prop, resultInfo) return resultInfo, nil @@ -695,7 +703,7 @@ func (p *Aggregation) convert2PhysicalPlan(prop *requiredProperty) (*physicalPla if planInfo == nil || streamInfo.cost < planInfo.cost { planInfo = streamInfo } - planInfo = enforceProperty(&requiredProperty{limit: limit}, planInfo) + planInfo = enforceProperty(limitProperty(limit), planInfo) err = p.storePlanInfo(prop, planInfo) return planInfo, errors.Trace(err) } @@ -727,7 +735,7 @@ func (p *Union) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, childInfos = append(childInfos, info) } info = p.matchProperty(prop, childInfos...) - info = enforceProperty(&requiredProperty{limit: limit}, info) + info = enforceProperty(limitProperty(limit), info) info.count = count p.storePlanInfo(prop, info) return info, nil @@ -785,11 +793,7 @@ func (p *Selection) handlePushNothing(prop *requiredProperty) (*physicalPlanInfo if err != nil { return nil, errors.Trace(err) } - // TODO: Because we haven't support DAG push down, we can only push down topn by this ugly way. if prop.limit != nil && len(prop.props) > 0 { - if np, ok := info.p.(physicalDistSQLPlan); ok { - np.addTopN(prop) - } info = enforceProperty(prop, info) } else if len(prop.props) != 0 { info.cost = math.MaxFloat64 @@ -810,8 +814,6 @@ func (p *Projection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlan props: make([]*columnProp, 0, len(prop.props)), sortKeyLen: prop.sortKeyLen, limit: prop.limit} - if len(prop.props) > 0 { - } childSchema := p.GetChildByIndex(0).GetSchema() usedCols := make([]bool, len(childSchema)) canPassSort := true @@ -946,7 +948,7 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, return nil, errors.Trace(err) } info = addPlanToResponse(np, info) - info = enforceProperty(&requiredProperty{limit: limit}, info) + info = enforceProperty(limitProperty(limit), info) p.storePlanInfo(prop, info) return info, nil } @@ -969,7 +971,7 @@ func (p *Distinct) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanIn } info = addPlanToResponse(p, info) info.count = uint64(float64(info.count) * distinctFactor) - info = enforceProperty(&requiredProperty{limit: limit}, info) + info = enforceProperty(limitProperty(limit), info) p.storePlanInfo(prop, info) return info, nil } From b2526e9f9969bca41c775736684ee94f2f2f87ac Mon Sep 17 00:00:00 2001 From: hanfei Date: Fri, 23 Sep 2016 18:47:21 +0800 Subject: [PATCH 4/8] fix bug. --- plan/physical_plan_builder.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index e195cef1b4d5f..5962336125ac6 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -931,6 +931,15 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, if info != nil { return info, nil } + allFromOuter := true + for _, col := range prop.props { + if p.InnerPlan.GetSchema().GetIndex(col.col) != -1 { + allFromOuter = false + } + } + if !allFromOuter { + return &physicalPlanInfo{cost: math.MaxFloat64}, err + } child := p.GetChildByIndex(0).(LogicalPlan) innerInfo, err := p.InnerPlan.convert2PhysicalPlan(&requiredProperty{}) if err != nil { From 80977e94d0415c943b3d2a381b12488ef3f239ec Mon Sep 17 00:00:00 2001 From: hanfei Date: Mon, 26 Sep 2016 08:31:29 +0800 Subject: [PATCH 5/8] tiny change. --- plan/logical_plans.go | 4 ++-- plan/match_property.go | 2 +- plan/physical_plan_builder.go | 8 +++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/plan/logical_plans.go b/plan/logical_plans.go index cbad62e03f201..a6bc9e3c6ec73 100644 --- a/plan/logical_plans.go +++ b/plan/logical_plans.go @@ -77,8 +77,8 @@ type Selection struct { // split into a list of AND conditions. Conditions []expression.Expression - // aboveTableScan means if this selection's child is a table scan or index scan. - aboveTableScan bool + // onTable means if this selection's child is a table scan or index scan. + onTable bool } // Apply gets one row from outer executor and gets one row from inner executor according to outer row. diff --git a/plan/match_property.go b/plan/match_property.go index 14d8dd2957d81..2e7468da1d5ff 100644 --- a/plan/match_property.go +++ b/plan/match_property.go @@ -177,7 +177,7 @@ func (p *Union) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPla // matchProperty implements PhysicalPlan matchProperty interface. func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { - if p.aboveTableScan { + if p.onTable { res := p.GetChildByIndex(0).(PhysicalPlan).matchProperty(prop, childPlanInfo...) sel := *p sel.SetChildren(res.p) diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 5962336125ac6..2ed156d56de88 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -118,7 +118,7 @@ func (p *DataSource) handleTableScan(prop *requiredProperty) (*physicalPlanInfo, } if len(newSel.Conditions) > 0 { newSel.SetChildren(ts) - newSel.aboveTableScan = true + newSel.onTable = true resultPlan = &newSel } } else { @@ -229,7 +229,7 @@ func (p *DataSource) handleIndexScan(prop *requiredProperty, index *model.IndexI } if len(newSel.Conditions) > 0 { newSel.SetChildren(is) - newSel.aboveTableScan = true + newSel.onTable = true resultPlan = &newSel } } else { @@ -323,6 +323,7 @@ func addPlanToResponse(p PhysicalPlan, planInfo *physicalPlanInfo) *physicalPlan return &physicalPlanInfo{p: np, cost: planInfo.cost, count: planInfo.count} } +// enforceProperty add an topN or sort upon current operator. func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPlanInfo { if len(prop.props) != 0 { items := make([]*ByItems, 0, len(prop.props)) @@ -354,7 +355,7 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl // removeLimit removes limit from prop. For example, When handling Sort,Limit -> Selection, we can't pass the Limit // across the selection, because selection decreases the size of data, but we can pass the Sort below the selection. In // this case, we set removeAll true. When handling Limit(1,1) -> LeftOuterJoin, we can pass the limit across join's left -// path, because the left outer join increases the size of data, but we can't pass offset value. So we set remove All to false. +// path, because the left outer join increases the size of data, but we can't pass offset value. So we set removeAll to false. func removeLimit(prop *requiredProperty, removeAll bool) *requiredProperty { ret := &requiredProperty{ props: prop.props, @@ -755,6 +756,7 @@ func (p *Selection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanI if err != nil { return nil, errors.Trace(err) } + // Secondly, we will push nothing and enforce this prop. infoEnforce, err := p.handlePushNothing(prop) if err != nil { return nil, errors.Trace(err) From c78ed5a87dd9ba49a7cf43dc211db8dca688029a Mon Sep 17 00:00:00 2001 From: hanfei Date: Mon, 26 Sep 2016 12:08:21 +0800 Subject: [PATCH 6/8] change removeLimit to removeLimit and convertLimitOffsetToCount --- plan/match_property.go | 2 +- plan/physical_plan_builder.go | 44 +++++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/plan/match_property.go b/plan/match_property.go index 2e7468da1d5ff..a81aa1f114d9b 100644 --- a/plan/match_property.go +++ b/plan/match_property.go @@ -194,7 +194,7 @@ func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*phys // matchProperty implements PhysicalPlan matchProperty interface. func (p *PhysicalUnionScan) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { limit := prop.limit - res := p.GetChildByIndex(0).(PhysicalPlan).matchProperty(removeLimit(prop, false), childPlanInfo...) + res := p.GetChildByIndex(0).(PhysicalPlan).matchProperty(convertLimitOffsetToCount(prop), childPlanInfo...) np := *p np.SetChildren(res.p) res.p = &np diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 2ed156d56de88..3f46542d805cf 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -352,17 +352,20 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl return info } -// removeLimit removes limit from prop. For example, When handling Sort,Limit -> Selection, we can't pass the Limit -// across the selection, because selection decreases the size of data, but we can pass the Sort below the selection. In -// this case, we set removeAll true. When handling Limit(1,1) -> LeftOuterJoin, we can pass the limit across join's left -// path, because the left outer join increases the size of data, but we can't pass offset value. So we set removeAll to false. -func removeLimit(prop *requiredProperty, removeAll bool) *requiredProperty { +// removeLimit removes limit from prop. +func removeLimit(prop *requiredProperty) *requiredProperty { ret := &requiredProperty{ props: prop.props, sortKeyLen: prop.sortKeyLen, } - if removeAll { - return ret + return ret +} + +// convertLimitOffsetToCount change limit(offset, count) in prop to limit(0, offset + count) +func convertLimitOffsetToCount(prop *requiredProperty) *requiredProperty { + ret := &requiredProperty{ + props: prop.props, + sortKeyLen: prop.sortKeyLen, } if prop.limit != nil { ret.limit = &Limit{ @@ -425,7 +428,13 @@ func (p *Join) handleLeftJoin(prop *requiredProperty, innerJoin bool) (*physical if !allLeft { lProp = &requiredProperty{} } - lInfo, err := lChild.convert2PhysicalPlan(removeLimit(lProp, innerJoin)) + var lInfo *physicalPlanInfo + var err error + if innerJoin { + lInfo, err = lChild.convert2PhysicalPlan(removeLimit(lProp)) + } else { + lInfo, err = lChild.convert2PhysicalPlan(convertLimitOffsetToCount(lProp)) + } if err != nil { return nil, errors.Trace(err) } @@ -476,7 +485,12 @@ func (p *Join) handleRightJoin(prop *requiredProperty, innerJoin bool) (*physica if !allRight { rProp = &requiredProperty{} } - rInfo, err := rChild.convert2PhysicalPlan(removeLimit(rProp, innerJoin)) + var rInfo *physicalPlanInfo + if innerJoin { + rInfo, err = rChild.convert2PhysicalPlan(removeLimit(rProp)) + } else { + rInfo, err = rChild.convert2PhysicalPlan(convertLimitOffsetToCount(rProp)) + } if err != nil { return nil, errors.Trace(err) } @@ -522,7 +536,7 @@ func (p *Join) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, lProp = &requiredProperty{} } if p.JoinType == SemiJoin { - lProp = removeLimit(lProp, true) + lProp = removeLimit(lProp) } lInfo, err := lChild.convert2PhysicalPlan(lProp) if err != nil { @@ -700,7 +714,7 @@ func (p *Aggregation) convert2PhysicalPlan(prop *requiredProperty) (*physicalPla return nil, errors.Trace(err) } } - streamInfo, err := p.handleStreamAgg(removeLimit(prop, true)) + streamInfo, err := p.handleStreamAgg(removeLimit(prop)) if planInfo == nil || streamInfo.cost < planInfo.cost { planInfo = streamInfo } @@ -722,7 +736,7 @@ func (p *Union) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, childInfos := make([]*physicalPlanInfo, 0, len(p.children)) var count uint64 for _, child := range p.GetChildren() { - newProp := removeLimit(prop, false) + newProp := convertLimitOffsetToCount(prop) newProp.props = make([]*columnProp, 0, len(prop.props)) for _, c := range prop.props { idx := p.GetSchema().GetIndex(c.col) @@ -774,7 +788,7 @@ func (p *Selection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanI func (p *Selection) handlePushOrder(prop *requiredProperty) (*physicalPlanInfo, error) { child := p.GetChildByIndex(0).(LogicalPlan) limit := prop.limit - info, err := child.convert2PhysicalPlan(removeLimit(prop, true)) + info, err := child.convert2PhysicalPlan(removeLimit(prop)) if err != nil { return nil, errors.Trace(err) } @@ -954,7 +968,7 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, } np.SetSchema(p.GetSchema()) limit := prop.limit - info, err = child.convert2PhysicalPlan(removeLimit(prop, true)) + info, err = child.convert2PhysicalPlan(removeLimit(prop)) if err != nil { return nil, errors.Trace(err) } @@ -976,7 +990,7 @@ func (p *Distinct) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanIn } child := p.GetChildByIndex(0).(LogicalPlan) limit := prop.limit - info, err = child.convert2PhysicalPlan(removeLimit(prop, true)) + info, err = child.convert2PhysicalPlan(removeLimit(prop)) if err != nil { return nil, errors.Trace(err) } From 971084d3842a23016b25535d30192d41517d1136 Mon Sep 17 00:00:00 2001 From: hanfei Date: Mon, 26 Sep 2016 15:08:06 +0800 Subject: [PATCH 7/8] address comment. --- plan/physical_plan_builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 3f46542d805cf..9219e1380eda5 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -323,7 +323,7 @@ func addPlanToResponse(p PhysicalPlan, planInfo *physicalPlanInfo) *physicalPlan return &physicalPlanInfo{p: np, cost: planInfo.cost, count: planInfo.count} } -// enforceProperty add an topN or sort upon current operator. +// enforceProperty add an topN or sort or limit upon current operator. func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPlanInfo { if len(prop.props) != 0 { items := make([]*ByItems, 0, len(prop.props)) From 29b9cb6b5908875207e6fe9552b8e96fb0e2c802 Mon Sep 17 00:00:00 2001 From: hanfei Date: Mon, 26 Sep 2016 17:29:55 +0800 Subject: [PATCH 8/8] address comment. --- plan/expr_to_pb.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/plan/expr_to_pb.go b/plan/expr_to_pb.go index cbac53b713b27..a38a4dd11d994 100644 --- a/plan/expr_to_pb.go +++ b/plan/expr_to_pb.go @@ -109,11 +109,7 @@ func columnToPBExpr(client kv.Client, column *expression.Column) *tipb.Expr { id := column.ID // Zero Column ID is not a column from table, can not support for now. - if id == 0 { - return nil - } - // its value is available to use. - if id == -1 { + if id == 0 || id == -1 { return nil }