Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan/executor: make semi joins null and empty aware (#9051) #9449

Merged
merged 3 commits into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ set @@session.tidb_opt_insubquery_unfold = 0;
explain select sum(t1.c1 in (select c1 from t2)) from t1;
id count task operator info
StreamAgg_12 1.00 root funcs:sum(col_0)
└─Projection_33 10000.00 root cast(5_aux_0)
└─MergeJoin_26 10000.00 root left outer semi join, left key:test.t1.c1, right key:test.t2.c1
├─TableReader_19 10000.00 root data:TableScan_18
│ └─TableScan_18 10000.00 cop table:t1, range:[-inf,+inf], keep order:true, stats:pseudo
└─IndexReader_21 10000.00 root index:IndexScan_20
└─IndexScan_20 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─Projection_21 10000.00 root cast(5_aux_0)
└─HashLeftJoin_18 10000.00 root left outer semi join, inner:TableReader_17, other cond:eq(test.t1.c1, test.t2.c1)
├─TableReader_20 10000.00 root data:TableScan_19
│ └─TableScan_19 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_17 10000.00 root data:TableScan_16
└─TableScan_16 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
explain select 1 in (select c2 from t2) from t1;
id count task operator info
Projection_6 10000.00 root 5_aux_0
Expand Down Expand Up @@ -217,25 +217,25 @@ subgraph cluster12{
node [style=filled, color=lightgrey]
color=black
label = "root"
"StreamAgg_12" -> "Projection_33"
"Projection_33" -> "MergeJoin_26"
"MergeJoin_26" -> "TableReader_19"
"MergeJoin_26" -> "IndexReader_21"
"StreamAgg_12" -> "Projection_21"
"Projection_21" -> "HashLeftJoin_18"
"HashLeftJoin_18" -> "TableReader_20"
"HashLeftJoin_18" -> "TableReader_17"
}
subgraph cluster18{
subgraph cluster19{
node [style=filled, color=lightgrey]
color=black
label = "cop"
"TableScan_18"
"TableScan_19"
}
subgraph cluster20{
subgraph cluster16{
node [style=filled, color=lightgrey]
color=black
label = "cop"
"IndexScan_20"
"TableScan_16"
}
"TableReader_19" -> "TableScan_18"
"IndexReader_21" -> "IndexScan_20"
"TableReader_20" -> "TableScan_19"
"TableReader_17" -> "TableScan_16"
}

explain format="dot" select 1 in (select c2 from t2) from t1;
Expand Down Expand Up @@ -272,7 +272,7 @@ create table t(a int primary key, b int, c int, index idx(b));
explain select t.c in (select count(*) from t s ignore index(idx), t t1 where s.a = t.a and s.a = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, other cond:eq(test.t.c, count(*))
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand All @@ -285,7 +285,7 @@ Projection_11 10000.00 root 9_aux_0
explain select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.a = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, other cond:eq(test.t.c, count(*))
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand All @@ -297,7 +297,7 @@ Projection_11 10000.00 root 9_aux_0
explain select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.c = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, other cond:eq(test.t.c, count(*))
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand Down
32 changes: 32 additions & 0 deletions cmd/explaintest/r/select.result
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,35 @@ Projection_9 10000.00 root or(and(and(le(col_count, 1), eq(t1.a, col_firstrow)),
└─Projection_27 10000.00 root t2.a, t2.a, cast(isnull(t2.a))
└─TableReader_24 10000.00 root data:TableScan_23
└─TableScan_23 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
drop table if exists t;
create table t(a int, b int);
drop table if exists s;
create table s(a varchar(20), b varchar(20));
explain select a in (select a from s where s.b = t.b) from t;
id count task operator info
Projection_9 10000.00 root 6_aux_0
└─HashLeftJoin_10 10000.00 root left outer semi join, inner:Projection_14, equal:[eq(cast(test.t.b), cast(test.s.b))], other cond:eq(cast(test.t.a), cast(test.s.a))
├─Projection_11 10000.00 root test.t.a, test.t.b, cast(test.t.b)
│ └─TableReader_13 10000.00 root data:TableScan_12
│ └─TableScan_12 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─Projection_14 10000.00 root test.s.a, test.s.b, cast(test.s.b)
└─TableReader_16 10000.00 root data:TableScan_15
└─TableScan_15 10000.00 cop table:s, range:[-inf,+inf], keep order:false, stats:pseudo
explain select a in (select a+b from t t2 where t2.b = t1.b) from t t1;
id count task operator info
Projection_7 10000.00 root 6_aux_0
└─HashLeftJoin_8 10000.00 root left outer semi join, inner:TableReader_12, equal:[eq(t1.b, t2.b)], other cond:eq(t1.a, plus(t2.a, t2.b))
├─TableReader_10 10000.00 root data:TableScan_9
│ └─TableScan_9 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_12 10000.00 root data:TableScan_11
└─TableScan_11 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
drop table t;
create table t(a int not null, b int);
explain select a in (select a from t t2 where t2.b = t1.b) from t t1;
id count task operator info
Projection_7 10000.00 root 6_aux_0
└─HashLeftJoin_8 10000.00 root left outer semi join, inner:TableReader_12, equal:[eq(t1.b, t2.b) eq(t1.a, t2.a)]
├─TableReader_10 10000.00 root data:TableScan_9
│ └─TableScan_9 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_12 10000.00 root data:TableScan_11
└─TableScan_11 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
10 changes: 10 additions & 0 deletions cmd/explaintest/t/select.test
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,13 @@ drop table if exists t;
create table t(a int, b int);
explain select a != any (select a from t t2) from t t1;
explain select a = all (select a from t t2) from t t1;

drop table if exists t;
create table t(a int, b int);
drop table if exists s;
create table s(a varchar(20), b varchar(20));
explain select a in (select a from s where s.b = t.b) from t;
explain select a in (select a+b from t t2 where t2.b = t1.b) from t t1;
drop table t;
create table t(a int not null, b int);
explain select a in (select a from t t2 where t2.b = t1.b) from t t1;
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 5 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type lookUpJoinTask struct {
doneCh chan error
cursor int
hasMatch bool
hasNull bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -234,18 +235,20 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {

outerRow := task.outerResult.GetRow(task.cursor)
if e.innerIter.Current() != e.innerIter.End() {
matched, err := e.joiner.tryToMatch(outerRow, e.innerIter, chk)
matched, isNull, err := e.joiner.tryToMatch(outerRow, e.innerIter, chk)
if err != nil {
return errors.Trace(err)
}
task.hasMatch = task.hasMatch || matched
task.hasNull = task.hasNull || isNull
}
if e.innerIter.Current() == e.innerIter.End() {
if !task.hasMatch {
e.joiner.onMissMatch(outerRow, chk)
e.joiner.onMissMatch(task.hasNull, outerRow, chk)
}
task.cursor++
task.hasMatch = false
task.hasNull = false
}
if chk.NumRows() == e.maxChunkSize {
return nil
Expand Down
23 changes: 13 additions & 10 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyB
return true, keyBuf, nil
}
}

keyBuf = keyBuf[:0]
keyBuf, err = codec.HashChunkRow(e.ctx.GetSessionVars().StmtCtx, keyBuf, row, allTypes, keyColIdx)
if err != nil {
Expand Down Expand Up @@ -408,13 +407,13 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
return false, joinResult
}
if hasNull {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerRow, joinResult.chk)
return true, joinResult
}
e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0])
innerPtrs := e.hashTableValBufs[workerID]
if len(innerPtrs) == 0 {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerRow, joinResult.chk)
return true, joinResult
}
innerRows := make([]chunk.Row, 0, len(innerPtrs))
Expand All @@ -424,14 +423,15 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
innerRows = append(innerRows, matchedInner)
}
iter := chunk.NewIterator4Slice(innerRows)
hasMatch := false
hasMatch, hasNull := false, false
for iter.Begin(); iter.Current() != iter.End(); {
matched, err := e.joiners[workerID].tryToMatch(outerRow, iter, joinResult.chk)
matched, isNull, err := e.joiners[workerID].tryToMatch(outerRow, iter, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
hasMatch = hasMatch || matched
hasNull = hasNull || isNull

if joinResult.chk.NumRows() == e.maxChunkSize {
ok := true
Expand All @@ -443,7 +443,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
}
}
if !hasMatch {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(hasNull, outerRow, joinResult.chk)
}
return true, joinResult
}
Expand Down Expand Up @@ -471,7 +471,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
}
for i := range selected {
if !selected[i] { // process unmatched outer rows
e.joiners[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerChk.GetRow(i), joinResult.chk)
} else { // process matched outer rows
ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult)
if !ok {
Expand Down Expand Up @@ -609,6 +609,7 @@ type NestedLoopApplyExec struct {
innerIter chunk.Iterator
outerRow *chunk.Row
hasMatch bool
hasNull bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -666,7 +667,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
if selected {
return &outerRow, nil
} else if e.outer {
e.joiner.onMissMatch(outerRow, chk)
e.joiner.onMissMatch(false, outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
return nil, nil
}
Expand Down Expand Up @@ -714,13 +715,14 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
if e.outerRow != nil && !e.hasMatch {
e.joiner.onMissMatch(*e.outerRow, chk)
e.joiner.onMissMatch(e.hasNull, *e.outerRow, chk)
}
e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk)
if e.outerRow == nil || err != nil {
return errors.Trace(err)
}
e.hasMatch = false
e.hasNull = false

for _, col := range e.outerSchema {
*col.Data = e.outerRow.GetDatum(col.Index, col.RetType)
Expand All @@ -733,8 +735,9 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
e.innerIter.Begin()
}

matched, err := e.joiner.tryToMatch(*e.outerRow, e.innerIter, chk)
matched, isNull, err := e.joiner.tryToMatch(*e.outerRow, e.innerIter, chk)
e.hasMatch = e.hasMatch || matched
e.hasNull = e.hasNull || isNull

if err != nil || chk.NumRows() == e.maxChunkSize {
return errors.Trace(err)
Expand Down
Loading