Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 committed Jul 17, 2020
1 parent 323355d commit adc2d04
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
2 changes: 1 addition & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
if prop.IsFlashOnlyProp() && ((p.preferJoinType&preferBCJoin) == 0 && p.preferJoinType > 0) {
return nil, false
}
joins := make([]PhysicalPlan, 0, 5)
joins := make([]PhysicalPlan, 0, 8)
if p.ctx.GetSessionVars().AllowBCJ {
broadCastJoins := p.tryToGetBroadCastJoin(prop)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand Down
40 changes: 33 additions & 7 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor.
resp.RegionError = err
return resp
}
dagCtx, e, dagReq, err := h.buildDAGExecutor(req)
dagCtx, e, dagReq, err := h.buildDAGExecutor(req, false)
if err != nil {
resp.OtherError = err.Error()
return resp
Expand Down Expand Up @@ -92,7 +92,7 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor.
return buildResp(selResp, execDetails, err)
}

func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, executor, *tipb.DAGRequest, error) {
func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request, batchCop bool) (*dagContext, executor, *tipb.DAGRequest, error) {
if len(req.Ranges) == 0 {
return nil, nil, nil, errors.New("request range is null")
}
Expand All @@ -118,7 +118,12 @@ func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, ex
startTS: req.StartTs,
evalCtx: &evalContext{sc: sc},
}
e, err := h.buildDAG(ctx, dagReq.Executors)
var e executor
if batchCop {
e, err = h.buildDAGForTiFlash(ctx, dagReq.RootExecutor)
} else {
e, err = h.buildDAG(ctx, dagReq.Executors)
}
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
Expand All @@ -133,7 +138,7 @@ func constructTimeZone(name string, offset int) (*time.Location, error) {
}

func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Request) (tikvpb.Tikv_CoprocessorStreamClient, error) {
dagCtx, e, dagReq, err := h.buildDAGExecutor(req)
dagCtx, e, dagReq, err := h.buildDAGExecutor(req, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -146,36 +151,57 @@ func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Reque
}, nil
}

func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, error) {
func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, *tipb.Executor, error) {
var currExec executor
var err error
var childExec *tipb.Executor
switch curr.GetTp() {
case tipb.ExecType_TypeTableScan:
currExec, err = h.buildTableScan(ctx, curr)
case tipb.ExecType_TypeIndexScan:
currExec, err = h.buildIndexScan(ctx, curr)
case tipb.ExecType_TypeSelection:
currExec, err = h.buildSelection(ctx, curr)
childExec = curr.Selection.Child
case tipb.ExecType_TypeAggregation:
currExec, err = h.buildHashAgg(ctx, curr)
childExec = curr.Aggregation.Child
case tipb.ExecType_TypeStreamAgg:
currExec, err = h.buildStreamAgg(ctx, curr)
childExec = curr.Aggregation.Child
case tipb.ExecType_TypeTopN:
currExec, err = h.buildTopN(ctx, curr)
childExec = curr.TopN.Child
case tipb.ExecType_TypeLimit:
currExec = &limitExec{limit: curr.Limit.GetLimit(), execDetail: new(execDetail)}
childExec = curr.Limit.Child
default:
// TODO: Support other types.
err = errors.Errorf("this exec type %v doesn't support yet.", curr.GetTp())
}

return currExec, errors.Trace(err)
return currExec, childExec, errors.Trace(err)
}

func (h *rpcHandler) buildDAGForTiFlash(ctx *dagContext, farther *tipb.Executor) (executor, error) {
curr, child, err := h.buildExec(ctx, farther)
if err != nil {
return nil, errors.Trace(err)
}
if child != nil {
childExec, err := h.buildDAGForTiFlash(ctx, child)
if err != nil {
return nil, errors.Trace(err)
}
curr.SetSrcExec(childExec)
}
return curr, nil
}

func (h *rpcHandler) buildDAG(ctx *dagContext, executors []*tipb.Executor) (executor, error) {
var src executor
for i := 0; i < len(executors); i++ {
curr, err := h.buildExec(ctx, executors[i])
curr, _, err := h.buildExec(ctx, executors[i])
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func (h *rpcHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor
StartTs: req.StartTs,
Ranges: ri.Ranges,
}
_, exec, dagReq, err := h.buildDAGExecutor(&cop)
_, exec, dagReq, err := h.buildDAGExecutor(&cop, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit adc2d04

Please sign in to comment.