Skip to content

Commit

Permalink
executor: avoid distsql request for TableReader/IndexReader/IndexLook…
Browse files Browse the repository at this point in the history
…up on temporary table (#24769)
  • Loading branch information
tiancaiamao committed Jun 2, 2021
1 parent ad7102c commit 8656b5d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 11 deletions.
29 changes: 29 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ type IndexReaderExecutor struct {

// Close clears all resources hold by current object.
func (e *IndexReaderExecutor) Close() error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
return nil
}

err := e.result.Close()
e.result = nil
e.ctx.StoreQueryFeedback(e.feedback)
Expand All @@ -204,6 +208,11 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
req.Reset()
return nil
}

err := e.result.Next(ctx, req)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -266,6 +275,11 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.dagPB.CollectExecutionSummaries = &collExec
}
e.kvRanges = kvRanges
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// In a test case IndexReaderExecutor is mocked and e.table is nil.
if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone {
return nil
}

e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
Expand Down Expand Up @@ -381,6 +395,12 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
e.feedback.Invalidate()
return err
}

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
if e.table.Meta().TempTableType == model.TempTableGlobal {
return nil
}

err = e.open(ctx)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -639,6 +659,10 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.table.Meta().TempTableType != model.TempTableNone {
return nil
}

if !e.workerStarted || e.finished == nil {
return nil
}
Expand All @@ -659,6 +683,11 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta().TempTableType == model.TempTableGlobal {
req.Reset()
return nil
}

if !e.workerStarted {
if err := e.startWorkers(ctx, req.RequiredRows()); err != nil {
return err
Expand Down
51 changes: 51 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8356,6 +8356,57 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) {
c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue)
}

func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) {
// Test that table reader/index reader/index lookup on the temporary table do not need to visit TiKV.
tk := testkit.NewTestKit(c, s.store)
tk1 := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk1.MustExec("use test")
tk.MustExec("create table normal (id int, a int, index(a))")
tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows")

tk.MustExec("begin")
tk.MustExec("insert into tmp_t values (1, 1)")
tk.MustExec("insert into tmp_t values (2, 2)")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil)
}()

// Make sure the fail point works.
// With that failpoint, all requests to the TiKV is discard.
rs, err := tk1.Exec("select * from normal")
c.Assert(err, IsNil)
blocked := make(chan struct{})
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
_, err := session.ResultSetToStringSlice(ctx, tk1.Se, rs)
blocked <- struct{}{}
c.Assert(err, NotNil)
}()
select {
case <-blocked:
c.Error("The query should block when the failpoint is enabled")
case <-time.After(200 * time.Millisecond):
}
cancelFunc()

// Check the temporary table do not send request to TiKV.
// Table reader
tk.HasPlan("select * from tmp_t", "TableReader")
tk.MustQuery("select * from tmp_t").Check(testkit.Rows("1 1", "2 2"))
// Index reader
tk.HasPlan("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t", "IndexReader")
tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t").Check(testkit.Rows("1", "2"))
// Index lookup
tk.HasPlan("select id from tmp_t where a = 1", "IndexLookUp")
tk.MustQuery("select id from tmp_t where a = 1").Check(testkit.Rows("1"))

tk.MustExec("rollback")
}

func (s *testResourceTagSuite) TestResourceGroupTag(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
55 changes: 44 additions & 11 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
}
}
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle)

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// Calculate the kv ranges here, UnionScan rely on this kv ranges.
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
kvReq, err := e.buildKVReq(ctx, firstPartRanges)
if err != nil {
return err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
if len(secondPartRanges) != 0 {
kvReq, err = e.buildKVReq(ctx, secondPartRanges)
if err != nil {
return err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
}
return nil
}

firstResult, err := e.buildResp(ctx, firstPartRanges)
if err != nil {
e.feedback.Invalidate()
Expand All @@ -175,6 +194,12 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
req.Reset()
return nil
}

logutil.Eventf(ctx, "table scan table: %s, range: %v", stringutil.MemoizeStr(func() string {
var tableName string
if meta := e.table.Meta(); meta != nil {
Expand All @@ -197,6 +222,10 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error

// Close implements the Executor Close interface.
func (e *TableReaderExecutor) Close() error {
if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone {
return nil
}

var err error
if e.resultHandler != nil {
err = e.resultHandler.Close()
Expand All @@ -209,6 +238,20 @@ func (e *TableReaderExecutor) Close() error {
// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResult returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
kvReq, err := e.buildKVReq(ctx, ranges)
if err != nil {
return nil, err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)

result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
return result, nil
}

func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) {
var builder distsql.RequestBuilder
var reqBuilder *distsql.RequestBuilder
if e.kvRangeBuilder != nil {
Expand All @@ -231,17 +274,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop)
kvReq, err := reqBuilder.Build()
if err != nil {
return nil, err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)

result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
return result, nil
return reqBuilder.Build()
}

func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnInfo) []int {
Expand Down

0 comments on commit 8656b5d

Please sign in to comment.