Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan,executor: support IndexJoin over UnionScan (#7877) #11358

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -1223,31 +1223,30 @@ id count task operator info
Projection_25 100.00 root tpch.supplier.s_name, 17_col_0
└─TopN_28 100.00 root 17_col_0:desc, tpch.supplier.s_name:asc, offset:0, count:100
└─HashAgg_31 320000.00 root group by:tpch.supplier.s_name, funcs:count(1), firstrow(tpch.supplier.s_name)
└─Selection_32 3786715.90 root not(16_aux_0)
└─IndexJoin_38 4733394.87 root left outer semi join, inner:IndexLookUp_37, outer key:tpch.l1.l_orderkey, inner key:tpch.l3.l_orderkey, other cond:ne(tpch.l3.l_suppkey, tpch.l1.l_suppkey)
├─IndexJoin_82 4733394.87 root semi join, inner:IndexLookUp_81, outer key:tpch.l1.l_orderkey, inner key:tpch.l2.l_orderkey, other cond:ne(tpch.l2.l_suppkey, tpch.l1.l_suppkey), ne(tpch.l2.l_suppkey, tpch.supplier.s_suppkey)
│ ├─HashLeftJoin_88 5916743.59 root inner join, inner:TableReader_117, equal:[eq(tpch.supplier.s_nationkey, tpch.nation.n_nationkey)]
│ │ ├─HashLeftJoin_93 147918589.81 root inner join, inner:TableReader_114, equal:[eq(tpch.l1.l_suppkey, tpch.supplier.s_suppkey)]
│ │ │ ├─IndexJoin_100 147918589.81 root inner join, inner:IndexLookUp_99, outer key:tpch.orders.o_orderkey, inner key:tpch.l1.l_orderkey
│ │ │ │ ├─TableReader_109 36517371.00 root data:Selection_108
│ │ │ │ │ └─Selection_108 36517371.00 cop eq(tpch.orders.o_orderstatus, "F")
│ │ │ │ │ └─TableScan_107 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
│ │ │ │ └─IndexLookUp_99 240004648.80 root
│ │ │ │ ├─IndexScan_96 1.00 cop table:l1, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.orders.o_orderkey], keep order:false
│ │ │ │ └─Selection_98 240004648.80 cop gt(tpch.l1.l_receiptdate, tpch.l1.l_commitdate)
│ │ │ │ └─TableScan_97 1.00 cop table:lineitem, keep order:false
│ │ │ └─TableReader_114 500000.00 root data:TableScan_113
│ │ │ └─TableScan_113 500000.00 cop table:supplier, range:[-inf,+inf], keep order:false
│ │ └─TableReader_117 1.00 root data:Selection_116
│ │ └─Selection_116 1.00 cop eq(tpch.nation.n_name, "EGYPT")
│ │ └─TableScan_115 25.00 cop table:nation, range:[-inf,+inf], keep order:false
│ └─IndexLookUp_81 1.00 root
│ ├─IndexScan_79 1.00 cop table:l2, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.l1.l_orderkey], keep order:false
│ └─TableScan_80 1.00 cop table:lineitem, keep order:false
└─IndexLookUp_37 240004648.80 root
├─IndexScan_34 1.00 cop table:l3, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.l1.l_orderkey], keep order:false
└─Selection_36 240004648.80 cop gt(tpch.l3.l_receiptdate, tpch.l3.l_commitdate)
└─TableScan_35 1.00 cop table:lineitem, keep order:false
└─IndexJoin_37 3786715.90 root anti semi join, inner:IndexLookUp_36, outer key:tpch.l1.l_orderkey, inner key:tpch.l3.l_orderkey, other cond:ne(tpch.l3.l_suppkey, tpch.l1.l_suppkey), ne(tpch.l3.l_suppkey, tpch.supplier.s_suppkey)
├─IndexJoin_81 4733394.87 root semi join, inner:IndexLookUp_80, outer key:tpch.l1.l_orderkey, inner key:tpch.l2.l_orderkey, other cond:ne(tpch.l2.l_suppkey, tpch.l1.l_suppkey), ne(tpch.l2.l_suppkey, tpch.supplier.s_suppkey)
│ ├─HashLeftJoin_87 5916743.59 root inner join, inner:TableReader_116, equal:[eq(tpch.supplier.s_nationkey, tpch.nation.n_nationkey)]
│ │ ├─HashLeftJoin_92 147918589.81 root inner join, inner:TableReader_113, equal:[eq(tpch.l1.l_suppkey, tpch.supplier.s_suppkey)]
│ │ │ ├─IndexJoin_99 147918589.81 root inner join, inner:IndexLookUp_98, outer key:tpch.orders.o_orderkey, inner key:tpch.l1.l_orderkey
│ │ │ │ ├─TableReader_108 36517371.00 root data:Selection_107
│ │ │ │ │ └─Selection_107 36517371.00 cop eq(tpch.orders.o_orderstatus, "F")
│ │ │ │ │ └─TableScan_106 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
│ │ │ │ └─IndexLookUp_98 240004648.80 root
│ │ │ │ ├─IndexScan_95 1.00 cop table:l1, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.orders.o_orderkey], keep order:false
│ │ │ │ └─Selection_97 240004648.80 cop gt(tpch.l1.l_receiptdate, tpch.l1.l_commitdate)
│ │ │ │ └─TableScan_96 1.00 cop table:lineitem, keep order:false
│ │ │ └─TableReader_113 500000.00 root data:TableScan_112
│ │ │ └─TableScan_112 500000.00 cop table:supplier, range:[-inf,+inf], keep order:false
│ │ └─TableReader_116 1.00 root data:Selection_115
│ │ └─Selection_115 1.00 cop eq(tpch.nation.n_name, "EGYPT")
│ │ └─TableScan_114 25.00 cop table:nation, range:[-inf,+inf], keep order:false
│ └─IndexLookUp_80 1.00 root
│ ├─IndexScan_78 1.00 cop table:l2, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.l1.l_orderkey], keep order:false
│ └─TableScan_79 1.00 cop table:lineitem, keep order:false
└─IndexLookUp_36 240004648.80 root
├─IndexScan_33 1.00 cop table:l3, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.l1.l_orderkey], keep order:false
└─Selection_35 240004648.80 cop gt(tpch.l3.l_receiptdate, tpch.l3.l_commitdate)
└─TableScan_34 1.00 cop table:lineitem, keep order:false
/*
Q22 Global Sales Opportunity Query
The Global Sales Opportunity Query identifies geographies where there are customers who may be likely to make a
Expand Down Expand Up @@ -1299,11 +1298,10 @@ Sort_32 1.00 root custsale.cntrycode:asc
└─Projection_34 1.00 root custsale.cntrycode, 28_col_0, 28_col_1
└─HashAgg_37 1.00 root group by:custsale.cntrycode, funcs:count(1), sum(tpch.custsale.c_acctbal), firstrow(custsale.cntrycode)
└─Projection_38 0.00 root substring(tpch.customer.c_phone, 1, 2), tpch.customer.c_acctbal
└─Selection_39 0.00 root not(26_aux_0)
└─HashLeftJoin_40 0.00 root left outer semi join, inner:TableReader_46, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
├─Selection_41 0.00 root in(substring(tpch.customer.c_phone, 1, 2), "20", "40", "22", "30", "39", "42", "21")
│ └─TableReader_44 0.00 root data:Selection_43
│ └─Selection_43 0.00 cop gt(tpch.customer.c_acctbal, NULL)
│ └─TableScan_42 7500000.00 cop table:customer, range:[-inf,+inf], keep order:false
└─TableReader_46 75000000.00 root data:TableScan_45
└─TableScan_45 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
└─HashLeftJoin_39 0.00 root anti semi join, inner:TableReader_45, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
├─Selection_40 0.00 root in(substring(tpch.customer.c_phone, 1, 2), "20", "40", "22", "30", "39", "42", "21")
│ └─TableReader_43 0.00 root data:Selection_42
│ └─Selection_42 0.00 cop gt(tpch.customer.c_acctbal, NULL)
│ └─TableScan_41 7500000.00 cop table:customer, range:[-inf,+inf], keep order:false
└─TableReader_45 75000000.00 root data:TableScan_44
└─TableScan_44 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
6 changes: 4 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,8 +1640,10 @@ func modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollat
if !charset.ValidCharsetAndCollation(toCharset, toCollate) {
return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset, toCollate)
}
if toCharset == charset.CharsetUTF8MB4 && origCharset == charset.CharsetUTF8 {
// TiDB only allow utf8 to be changed to utf8mb4.
if (origCharset == charset.CharsetUTF8 && toCharset == charset.CharsetUTF8MB4) ||
(origCharset == charset.CharsetUTF8 && toCharset == charset.CharsetUTF8) ||
(origCharset == charset.CharsetUTF8MB4 && toCharset == charset.CharsetUTF8MB4) {
// TiDB only allow utf8 to be changed to utf8mb4, or changing the collation when the charset is utf8/utf8mb4.
return nil
}

Expand Down
59 changes: 48 additions & 11 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,24 +705,43 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
src := b.build(v.Children()[0])
reader := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)}
us, err := b.buildUnionScanFromReader(reader, v)
if err != nil {
b.err = err
return nil
}
return us
}

// buildUnionScanFromReader builds union scan executor from child executor.
// Note that this function may be called by inner workers of index lookup join concurrently.
// Be careful to avoid data race.
func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) (Executor, error) {
var err error
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)}
// Get the handle column index of the below plannercore.
// We can guarantee that there must be only one col in the map.
for _, cols := range v.Children()[0].Schema().TblID2Handle {
us.belowHandleIndex = cols[0].Index
}
switch x := src.(type) {
switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
// Union scan can only be in a write transaction, so DirtyDB should has non-nil value now, thus
// GetDirtyDB() is safe here. If this table has been modified in the transaction, non-nil DirtyTable
// can be found in DirtyDB now, so GetDirtyTable is safe; if this table has not been modified in the
// transaction, empty DirtyTable would be inserted into DirtyDB, it does not matter when multiple
// goroutines write empty DirtyTable to DirtyDB for this table concurrently. Thus we don't use lock
// to synchronize here.
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
b.err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows()
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -736,7 +755,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
b.err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows()
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -750,16 +769,16 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
b.err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows()
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return src
return reader, nil
}
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
if err != nil {
err = errors.Trace(err)
return nil, err
}
return us
return us, nil
}

// buildMergeJoin builds MergeJoinExec executor.
Expand Down Expand Up @@ -1864,10 +1883,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder}
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, err
}
e, err := builder.buildUnionScanFromReader(reader, v)
if err != nil {
return nil, err
}
us := e.(*UnionScanExec)
us.snapshotChunkBuffer = us.newFirstChunk()
return us, nil
}

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
Expand Down
80 changes: 80 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,83 @@ func (s *testSuite) TestIndexJoinOverflow(c *C) {
tk.MustExec(`create table t2(a int unsigned, index idx(a));`)
tk.MustQuery(`select /*+ TIDB_INLJ(t2) */ * from t1 join t2 on t1.a = t2.a;`).Check(testkit.Rows())
}

func (s *testSuite) TestIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))")
tk.MustExec("insert into t2 values (1,1,1),(4,2,4)")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(2,2)")
tk.MustExec("insert into t2 values(2,2,2), (3,3,3)")
// TableScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.id",
"├─UnionScan_12 10000.00 root ",
"│ └─TableReader_14 10000.00 root data:TableScan_13",
"│ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_10 1.00 root ",
" └─TableReader_9 1.00 root data:TableScan_8",
" └─TableScan_8 1.00 cop table:t2, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"2 2 2 2 2",
))
// IndexLookUp below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"IndexJoin_12 12500.00 root inner join, inner:UnionScan_11, outer key:test.t1.a, inner key:test.t2.a",
"├─UnionScan_13 10000.00 root ",
"│ └─TableReader_15 10000.00 root data:TableScan_14",
"│ └─TableScan_14 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_11 10.00 root ",
" └─IndexLookUp_10 10.00 root ",
" ├─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
" └─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2 2 2 2",
"2 2 4 2 4",
))
// IndexScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"Projection_7 12500.00 root test.t1.a, test.t2.a",
"└─IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.a",
" ├─UnionScan_12 10000.00 root ",
" │ └─TableReader_14 10000.00 root data:TableScan_13",
" │ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─UnionScan_10 10.00 root ",
" └─IndexReader_9 10.00 root index:IndexScan_8",
" └─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2",
"2 2",
))
tk.MustExec("rollback")
}

func (s *testSuite) TestBatchIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, key idx_a(a))")
tk.MustExec("set @@session.tidb_max_chunk_size=1")
tk.MustExec("set @@session.tidb_index_join_batch_size=1")
tk.MustExec("set @@session.tidb_index_lookup_join_concurrency=4")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(1,1),(2,1),(3,1),(4,1)")
tk.MustExec("insert into t2 values(1,1)")
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"StreamAgg_13 1.00 root funcs:count(1)",
"└─IndexJoin_24 12500.00 root inner join, inner:UnionScan_23, outer key:test.t1.a, inner key:test.t2.a",
" ├─UnionScan_18 10000.00 root ",
" │ └─TableReader_20 10000.00 root data:TableScan_19",
" │ └─TableScan_19 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─UnionScan_23 10.00 root ",
" └─IndexReader_22 10.00 root index:IndexScan_21",
" └─IndexScan_21 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ count(*) from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"4",
))
tk.MustExec("rollback")
}
Loading