diff --git a/executor/distsql.go b/executor/distsql.go index 23fa8bd4ce58e..a4d71f45a6a9e 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -196,6 +196,10 @@ type IndexReaderExecutor struct { // Close clears all resources hold by current object. func (e *IndexReaderExecutor) Close() error { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + err := e.result.Close() e.result = nil e.ctx.StoreQueryFeedback(e.feedback) @@ -204,6 +208,11 @@ func (e *IndexReaderExecutor) Close() error { // Next implements the Executor Next interface. func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + req.Reset() + return nil + } + err := e.result.Next(ctx, req) if err != nil { e.feedback.Invalidate() @@ -266,6 +275,11 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.dagPB.CollectExecutionSummaries = &collExec } e.kvRanges = kvRanges + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + // In a test case IndexReaderExecutor is mocked and e.table is nil. + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + return nil + } e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) @@ -381,6 +395,12 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() return err } + + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + if e.table.Meta().TempTableType == model.TempTableGlobal { + return nil + } + err = e.open(ctx) if err != nil { e.feedback.Invalidate() @@ -639,6 +659,10 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { + if e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + if !e.workerStarted || e.finished == nil { return nil } @@ -659,6 +683,11 @@ func (e *IndexLookUpExecutor) Close() error { // Next implements Exec Next interface. func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table.Meta().TempTableType == model.TempTableGlobal { + req.Reset() + return nil + } + if !e.workerStarted { if err := e.startWorkers(ctx, req.RequiredRows()); err != nil { return err diff --git a/executor/executor_test.go b/executor/executor_test.go index c41b6f2a7773e..8d70d7fa16468 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8356,6 +8356,57 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) { c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) } +func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { + // Test that table reader/index reader/index lookup on the temporary table do not need to visit TiKV. + tk := testkit.NewTestKit(c, s.store) + tk1 := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk1.MustExec("use test") + tk.MustExec("create table normal (id int, a int, index(a))") + tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") + + tk.MustExec("begin") + tk.MustExec("insert into tmp_t values (1, 1)") + tk.MustExec("insert into tmp_t values (2, 2)") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil) + }() + + // Make sure the fail point works. + // With that failpoint, all requests to the TiKV is discard. + rs, err := tk1.Exec("select * from normal") + c.Assert(err, IsNil) + blocked := make(chan struct{}) + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + _, err := session.ResultSetToStringSlice(ctx, tk1.Se, rs) + blocked <- struct{}{} + c.Assert(err, NotNil) + }() + select { + case <-blocked: + c.Error("The query should block when the failpoint is enabled") + case <-time.After(200 * time.Millisecond): + } + cancelFunc() + + // Check the temporary table do not send request to TiKV. + // Table reader + tk.HasPlan("select * from tmp_t", "TableReader") + tk.MustQuery("select * from tmp_t").Check(testkit.Rows("1 1", "2 2")) + // Index reader + tk.HasPlan("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t", "IndexReader") + tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t").Check(testkit.Rows("1", "2")) + // Index lookup + tk.HasPlan("select id from tmp_t where a = 1", "IndexLookUp") + tk.MustQuery("select id from tmp_t where a = 1").Check(testkit.Rows("1")) + + tk.MustExec("rollback") +} + func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/table_reader.go b/executor/table_reader.go index a4459ee920291..fc5e5a2c11096 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -153,6 +153,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { } } firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle) + + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + // Calculate the kv ranges here, UnionScan rely on this kv ranges. + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + kvReq, err := e.buildKVReq(ctx, firstPartRanges) + if err != nil { + return err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + if len(secondPartRanges) != 0 { + kvReq, err = e.buildKVReq(ctx, secondPartRanges) + if err != nil { + return err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + } + return nil + } + firstResult, err := e.buildResp(ctx, firstPartRanges) if err != nil { e.feedback.Invalidate() @@ -175,6 +194,12 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + req.Reset() + return nil + } + logutil.Eventf(ctx, "table scan table: %s, range: %v", stringutil.MemoizeStr(func() string { var tableName string if meta := e.table.Meta(); meta != nil { @@ -197,6 +222,10 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + var err error if e.resultHandler != nil { err = e.resultHandler.Close() @@ -209,6 +238,20 @@ func (e *TableReaderExecutor) Close() error { // buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResult returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { + kvReq, err := e.buildKVReq(ctx, ranges) + if err != nil { + return nil, err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + return nil, err + } + return result, nil +} + +func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) { var builder distsql.RequestBuilder var reqBuilder *distsql.RequestBuilder if e.kvRangeBuilder != nil { @@ -231,17 +274,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetMemTracker(e.memTracker). SetStoreType(e.storeType). SetAllowBatchCop(e.batchCop) - kvReq, err := reqBuilder.Build() - if err != nil { - return nil, err - } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) - - result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) - if err != nil { - return nil, err - } - return result, nil + return reqBuilder.Build() } func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnInfo) []int {