Skip to content

Commit

Permalink
planner: fix ExtractCorrelatedCols method in PhysicalTableScan (#51205)
Browse files Browse the repository at this point in the history
close #51204
  • Loading branch information
Lloyd-Pottiger authored Feb 21, 2024
1 parent 72517a9 commit a0e0969
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
58 changes: 58 additions & 0 deletions pkg/planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tipb/go-tipb"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -437,3 +438,60 @@ func TestPhysicalPlanMemoryTrace(t *testing.T) {
pp.MPPPartitionCols = append(pp.MPPPartitionCols, &property.MPPPartitionColumn{})
require.Greater(t, pp.MemoryUsage(), size)
}

func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int, client_type tinyint, client_no char(18), taxpayer_no varchar(50), status tinyint, update_time datetime)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("create table t2 (id int, company_no char(18), name varchar(200), tax_registry_no varchar(30))")
tk.MustExec("insert into t1(id, taxpayer_no, client_no, client_type, status, update_time) values (1, 'TAX001', 'Z9005', 1, 1, '2024-02-18 10:00:00'), (2, 'TAX002', 'Z9005', 1, 0, '2024-02-18 09:00:00'), (3, 'TAX003', 'Z9005', 2, 1, '2024-02-18 08:00:00'), (4, 'TAX004', 'Z9006', 1, 1, '2024-02-18 12:00:00')")
tk.MustExec("insert into t2(id, company_no, name, tax_registry_no) values (1, 'Z9005', 'AA', 'aaa'), (2, 'Z9006', 'BB', 'bbb'), (3, 'Z9007', 'CC', 'ccc')")

sql := "select company_no, ifnull((select /*+ read_from_storage(tiflash[test.t1]) */ taxpayer_no from test.t1 where client_no = c.company_no and client_type = 1 and status = 1 order by update_time desc limit 1), tax_registry_no) as tax_registry_no from test.t2 c where company_no = 'Z9005' limit 1"
tk.MustExec(sql)
info := tk.Session().ShowProcess()
require.NotNil(t, info)
p, ok := info.Plan.(core.Plan)
require.True(t, ok)

var findTableScan func(p core.Plan) *core.PhysicalTableScan
findTableScan = func(p core.Plan) *core.PhysicalTableScan {
if p == nil {
return nil
}
switch v := p.(type) {
case *core.PhysicalTableScan:
if v.Table.Name.L == "t1" {
return v
}
return nil
case *core.PhysicalTableReader:
return findTableScan(v.TablePlans[0])
default:
physicayPlan := p.(core.PhysicalPlan)
for _, child := range physicayPlan.Children() {
if ts := findTableScan(child); ts != nil {
return ts
}
}
return nil
}
}
ts := findTableScan(p)
require.NotNil(t, ts)

pb, err := ts.ToPB(tk.Session(), kv.TiFlash)
require.NoError(t, err)
// make sure the pushed down filter condition is correct
require.Equal(t, 1, len(pb.TblScan.PushedDownFilterConditions))
require.Equal(t, tipb.ExprType_ColumnRef, pb.TblScan.PushedDownFilterConditions[0].Children[0].Tp)
// make sure the correlated columns are extracted correctly
correlated := ts.ExtractCorrelatedCols()
require.Equal(t, 1, len(correlated))
require.Equal(t, "test.t2.company_no", correlated[0].String())
}
8 changes: 6 additions & 2 deletions pkg/planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ func (ts *PhysicalTableScan) Clone() (PhysicalPlan, error) {
clonedScan.physicalSchemaProducer = *prod
clonedScan.AccessCondition = util.CloneExprs(ts.AccessCondition)
clonedScan.filterCondition = util.CloneExprs(ts.filterCondition)
clonedScan.LateMaterializationFilterCondition = util.CloneExprs(ts.LateMaterializationFilterCondition)
if ts.Table != nil {
clonedScan.Table = ts.Table.Clone()
}
Expand All @@ -935,11 +936,11 @@ func (ts *PhysicalTableScan) Clone() (PhysicalPlan, error) {

// ExtractCorrelatedCols implements PhysicalPlan interface.
func (ts *PhysicalTableScan) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
corCols := make([]*expression.CorrelatedColumn, 0, len(ts.AccessCondition)+len(ts.filterCondition))
corCols := make([]*expression.CorrelatedColumn, 0, len(ts.AccessCondition)+len(ts.LateMaterializationFilterCondition))
for _, expr := range ts.AccessCondition {
corCols = append(corCols, expression.ExtractCorColumns(expr)...)
}
for _, expr := range ts.filterCondition {
for _, expr := range ts.LateMaterializationFilterCondition {
corCols = append(corCols, expression.ExtractCorColumns(expr)...)
}
return corCols
Expand Down Expand Up @@ -1049,6 +1050,9 @@ func (ts *PhysicalTableScan) MemoryUsage() (sum int64) {
for _, cond := range ts.filterCondition {
sum += cond.MemoryUsage()
}
for _, cond := range ts.LateMaterializationFilterCondition {
sum += cond.MemoryUsage()
}
for _, rang := range ts.Ranges {
sum += rang.MemUsage()
}
Expand Down

0 comments on commit a0e0969

Please sign in to comment.