Skip to content

Commit

Permalink
planner,executor: fix cached table query with filter condition (#32590)
Browse files Browse the repository at this point in the history
close #32422
  • Loading branch information
tiancaiamao authored Mar 9, 2022
1 parent b88d62e commit e4ced47
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 67 deletions.
94 changes: 38 additions & 56 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (b *MockExecutorBuilder) Build(p plannercore.Plan) Executor {
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
var e Executor
switch v := p.(type) {
case nil:
return nil
Expand Down Expand Up @@ -267,13 +266,13 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
case *plannercore.Analyze:
return b.buildAnalyze(v)
case *plannercore.PhysicalTableReader:
e = b.buildTableReader(v)
return b.buildTableReader(v)
case *plannercore.PhysicalTableSample:
return b.buildTableSample(v)
case *plannercore.PhysicalIndexReader:
e = b.buildIndexReader(v)
return b.buildIndexReader(v)
case *plannercore.PhysicalIndexLookUpReader:
e = b.buildIndexLookUpReader(v)
return b.buildIndexLookUpReader(v)
case *plannercore.PhysicalWindow:
return b.buildWindow(v)
case *plannercore.PhysicalShuffle:
Expand Down Expand Up @@ -304,57 +303,6 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p)
return nil
}

if tblExec, ok := e.(dataSourceExecutor); ok {
tbl := tblExec.Table()
tableInfo := tbl.Meta()
// When reading from a cached table, check whether it satisfies the conditions of read cache.
if tableInfo.TableCacheStatusType == model.TableCacheStatusEnable {
physicalPlan := p.(plannercore.PhysicalPlan)
return b.buildCachedTableExecutor(tbl, physicalPlan, e)
}
}

return e
}

// buildCachedTableExecutor adds an UnionScan to the original Executor to make the reader read from table cache.
func (b *executorBuilder) buildCachedTableExecutor(tbl table.Table, p plannercore.PhysicalPlan, e Executor) Executor {
if b.ctx.GetSessionVars().SnapshotTS != 0 || b.ctx.GetSessionVars().StmtCtx.IsStaleness {
return e
}

cachedTable := tbl.(table.CachedTable)
startTS, err := b.getSnapshotTS()
if err != nil {
b.err = errors.Trace(err)
return nil
}

leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
sessionVars := b.ctx.GetSessionVars()
// Use the TS of the transaction to determine whether the cache can be used.
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessionVars.StmtCtx.ReadFromTableCache = true
switch raw := e.(type) {
case *TableReaderExecutor:
raw.dummy = true
case *IndexReaderExecutor:
raw.dummy = true
case *IndexLookUpExecutor:
raw.dummy = true
}
us := plannercore.PhysicalUnionScan{CacheTable: cacheData}.Init(b.ctx, nil, -1)
us.SetChildren(p)
e = b.buildUnionScanFromReader(e, us)
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !sessionVars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
return e
}

func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Executor {
Expand Down Expand Up @@ -1118,7 +1066,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
return x
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), reader)}
us.cacheTable = v.CacheTable
// Get the handle column index of the below Plan.
us.belowHandleCols = v.HandleCols
us.mutableRow = chunk.MutRowFromTypes(retTypes(us))
Expand All @@ -1134,13 +1081,21 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.collators = append(us.collators, collate.GetCollator(tp.Collate))
}

startTS, err := b.getSnapshotTS()
sessionVars := b.ctx.GetSessionVars()
if err != nil {
b.err = err
return nil
}

switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = x.virtualColumnIndex
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1154,6 +1109,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -1168,6 +1124,7 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc and useIndex.
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
Expand All @@ -1181,6 +1138,31 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
return us
}

type bypassDataSourceExecutor interface {
dataSourceExecutor
setDummy()
}

func (us *UnionScanExec) handleCachedTable(b *executorBuilder, x bypassDataSourceExecutor, vars *variable.SessionVars, startTS uint64) {
tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
cachedTable := tbl.(table.CachedTable)
// Determine whether the cache can be used.
leaseDuration := time.Duration(variable.TableCacheLease.Load()) * time.Second
cacheData := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
vars.StmtCtx.ReadFromTableCache = true
x.setDummy()
us.cacheTable = cacheData
} else {
if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !vars.StmtCtx.InExplainStmt {
store := b.ctx.GetStore()
cachedTable.UpdateLockForRead(context.Background(), store, startTS, leaseDuration)
}
}
}
}

// buildMergeJoin builds MergeJoinExec executor.
func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Executor {
leftExec := b.build(v.Children()[0])
Expand Down
8 changes: 8 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func (e *IndexReaderExecutor) Table() table.Table {
return e.table
}

func (e *IndexReaderExecutor) setDummy() {
e.dummy = true
}

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() (err error) {
if e.dummy {
Expand Down Expand Up @@ -412,6 +416,10 @@ func (e *IndexLookUpExecutor) Table() table.Table {
return e.table
}

func (e *IndexLookUpExecutor) setDummy() {
e.dummy = true
}

// Open implements the Executor Open interface.
func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
var err error
Expand Down
4 changes: 4 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (e *TableReaderExecutor) Table() table.Table {
return e.table
}

func (e *TableReaderExecutor) setDummy() {
e.dummy = true
}

// Open initializes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down
63 changes: 63 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -457,6 +458,68 @@ func TestIssue28073(t *testing.T) {
require.False(t, exist)
}

func TestIssue32422(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")

tk.MustExec("create table t (id int, c int, index(id));")
tk.MustExec("insert into t values (3,3), (4,4), (5,5);")
tk.MustExec("alter table t cache;")

var cacheUsed bool
for i := 0; i < 20; i++ {
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache {
cacheUsed = true
break
}
time.Sleep(50 * time.Millisecond)
}
require.True(t, cacheUsed)

tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))

// Some extra tests.
// Since cached table use UnionScanExec utilities, check what happens when they work together.
// In these cases, the cache data serve as the snapshot, tikv is skipped, and txn membuffer works the same way.
tk.MustExec("begin")
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
tk.MustExec("insert into t values (6, 6)")
// Check for the new added data.
tk.HasPlan("select id+1, c from t where c = 6;", "UnionScan")
tk.MustQuery("select id+1, c from t where c = 6;").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
// Check for the old data.
tk.MustQuery("select id+1, c from t where c = 4;").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Point get
tk.HasPlan("select id+1, c from t where id = 6", "PointGet")
tk.MustQuery("select id+1, c from t where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Lookup
tk.HasPlan("select id+1, c from t where id = 6", "IndexLookUp")
tk.MustQuery("select id+1, c from t use index(id) where id = 6").Check(testkit.Rows("7 6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id+1, c from t use index(id) where id = 4").Check(testkit.Rows("5 4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

// Index Reader
tk.HasPlan("select id from t where id = 6", "IndexReader")
tk.MustQuery("select id from t use index(id) where id = 6").Check(testkit.Rows("6"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)
tk.MustQuery("select id from t use index(id) where id = 4").Check(testkit.Rows("4"))
require.True(t, tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache)

tk.MustExec("rollback")
}

func BenchmarkUnionScanRead(b *testing.B) {
store, clean := testkit.CreateMockStore(b)
defer clean()
Expand Down
1 change: 0 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty)
us := PhysicalUnionScan{
Conditions: p.conditions,
HandleCols: p.handleCols,
CacheTable: p.cacheTable,
}.Init(p.ctx, p.stats, p.blockOffset, childProp)
return []PhysicalPlan{us}, true, nil
}
Expand Down
7 changes: 4 additions & 3 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6098,9 +6098,10 @@ func TestAggPushToCopForCachedTable(t *testing.T) {

tk.MustQuery("explain format = 'brief' select /*+AGG_TO_COP()*/ count(*) from t32157 ignore index(primary) where process_code = 'GDEP0071'").Check(testkit.Rows(
"StreamAgg 1.00 root funcs:count(1)->Column#8]\n" +
"[└─TableReader 10.00 root data:Selection]\n" +
"[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo"))
"[└─UnionScan 10.00 root eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableReader 10.00 root data:Selection]\n" +
"[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo"))

var readFromCacheNoPanic bool
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4233,7 +4233,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as

var result LogicalPlan = ds
dirty := tableHasDirtyContent(b.ctx, tableInfo)
if dirty || tableInfo.TempTableType == model.TempTableLocal {
if dirty || tableInfo.TempTableType == model.TempTableLocal || tableInfo.TableCacheStatusType == model.TableCacheStatusEnable {
us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset())
us.SetChildren(ds)
if tableInfo.Partition != nil && b.optFlag&flagPartitionProcessor == 0 {
Expand Down
4 changes: 0 additions & 4 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -533,9 +532,6 @@ type LogicalUnionScan struct {
conditions []expression.Expression

handleCols HandleCols

// cacheTable not nil means it's reading from cached table.
cacheTable kv.MemBuffer
}

// DataSource represents a tableScan without condition push down.
Expand Down
2 changes: 0 additions & 2 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,6 @@ type PhysicalUnionScan struct {
Conditions []expression.Expression

HandleCols HandleCols

CacheTable kv.MemBuffer
}

// ExtractCorrelatedCols implements PhysicalPlan interface.
Expand Down

0 comments on commit e4ced47

Please sign in to comment.