Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: avoid distsql request for TableReader/IndexReader/IndexLookup on temporary table #24769

Merged
merged 16 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ type IndexReaderExecutor struct {

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
Copy link
Contributor

@qw4990 qw4990 May 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about implementing a new function in table/table.go for the logic about checking whether the table is a TmpTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, we do not need to check e.table != nil.
It's special here because a test case construct the IndexReaderExecutor manually and don't set e.table field.

The remain part is e.table.Meta().TempTableType != model.TempTableNone.
Define a one line function for that does not seems beneficial: the change will introduce a new exported function without simplifying the code much.

return nil
}

err := e.result.Close()
e.result = nil
e.ctx.StoreQueryFeedback(e.feedback)
Expand All @@ -204,6 +208,11 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
req.Reset()
return nil
}

err := e.result.Next(ctx, req)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -266,6 +275,11 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.dagPB.CollectExecutionSummaries = &collExec
}
e.kvRanges = kvRanges
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// In a test case IndexReaderExecutor is mocked and e.table is nil.
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
return nil
}

e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
Expand Down Expand Up @@ -381,6 +395,12 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
e.feedback.Invalidate()
return err
}

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
if e.table.Meta().TempTableType == model.TempTableGlobal {
return nil
}

err = e.open(ctx)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -639,6 +659,10 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.table.Meta().TempTableType != model.TempTableNone {
return nil
}

if !e.workerStarted || e.finished == nil {
return nil
}
Expand All @@ -659,6 +683,11 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta().TempTableType == model.TempTableGlobal {
req.Reset()
return nil
}

if !e.workerStarted {
if err := e.startWorkers(ctx, req.RequiredRows()); err != nil {
return err
Expand Down
51 changes: 51 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8356,6 +8356,57 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) {
c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue)
}

func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) {
// Test that table reader/index reader/index lookup on the temporary table do not need to visit TiKV.
tk := testkit.NewTestKit(c, s.store)
tk1 := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk1.MustExec("use test")
tk.MustExec("create table normal (id int, a int, index(a))")
tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows")

tk.MustExec("begin")
tk.MustExec("insert into tmp_t values (1, 1)")
tk.MustExec("insert into tmp_t values (2, 2)")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil)
}()

// Make sure the fail point works.
// With that failpoint, all requests to the TiKV is discard.
rs, err := tk1.Exec("select * from normal")
c.Assert(err, IsNil)
blocked := make(chan struct{})
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
_, err := session.ResultSetToStringSlice(ctx, tk1.Se, rs)
blocked <- struct{}{}
c.Assert(err, NotNil)
}()
select {
case <-blocked:
c.Error("The query should block when the failpoint is enabled")
case <-time.After(200 * time.Millisecond):
}
cancelFunc()

// Check the temporary table do not send request to TiKV.
// Table reader
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
tk.HasPlan("select * from tmp_t", "TableReader")
tk.MustQuery("select * from tmp_t").Check(testkit.Rows("1 1", "2 2"))
// Index reader
tk.HasPlan("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t", "IndexReader")
tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t").Check(testkit.Rows("1", "2"))
// Index lookup
tk.HasPlan("select id from tmp_t where a = 1", "IndexLookUp")
tk.MustQuery("select id from tmp_t where a = 1").Check(testkit.Rows("1"))

tk.MustExec("rollback")
}

func (s *testResourceTagSuite) TestResourceGroupTag(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
55 changes: 44 additions & 11 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
}
}
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle)

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// Calculate the kv ranges here, UnionScan rely on this kv ranges.
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
kvReq, err := e.buildKVReq(ctx, firstPartRanges)
if err != nil {
return err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
if len(secondPartRanges) != 0 {
kvReq, err = e.buildKVReq(ctx, secondPartRanges)
if err != nil {
return err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
}
return nil
}

firstResult, err := e.buildResp(ctx, firstPartRanges)
if err != nil {
e.feedback.Invalidate()
Expand All @@ -175,6 +194,12 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
req.Reset()
return nil
}

logutil.Eventf(ctx, "table scan table: %s, range: %v", stringutil.MemoizeStr(func() string {
var tableName string
if meta := e.table.Meta(); meta != nil {
Expand All @@ -197,6 +222,10 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error

// Close implements the Executor Close interface.
func (e *TableReaderExecutor) Close() error {
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
return nil
}

var err error
if e.resultHandler != nil {
err = e.resultHandler.Close()
Expand All @@ -209,6 +238,20 @@ func (e *TableReaderExecutor) Close() error {
// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResult returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
kvReq, err := e.buildKVReq(ctx, ranges)
if err != nil {
return nil, err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)

result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
return result, nil
}

func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) {
var builder distsql.RequestBuilder
var reqBuilder *distsql.RequestBuilder
if e.kvRangeBuilder != nil {
Expand All @@ -231,17 +274,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop)
kvReq, err := reqBuilder.Build()
if err != nil {
return nil, err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)

result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
return result, nil
return reqBuilder.Build()
}

func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnInfo) []int {
Expand Down