From 6b2555d13b47aace4b2bf957f0a84fa2011f5454 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 19 May 2021 22:27:09 +0800 Subject: [PATCH 1/7] executor: avoid distsql request for TableReader/IndexReader/IndexLookup on temporary table --- executor/distsql.go | 28 ++++++++++++++++++++ executor/executor_test.go | 46 ++++++++++++++++++++++++++++++++ executor/table_reader.go | 55 +++++++++++++++++++++++++++++++-------- go.sum | 1 + 4 files changed, 119 insertions(+), 11 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index bd422a0458ef1..bf82acfb09827 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -197,6 +197,10 @@ type IndexReaderExecutor struct { // Close clears all resources hold by current object. func (e *IndexReaderExecutor) Close() error { + if e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + err := e.result.Close() e.result = nil e.ctx.StoreQueryFeedback(e.feedback) @@ -205,6 +209,11 @@ func (e *IndexReaderExecutor) Close() error { // Next implements the Executor Next interface. func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table.Meta().TempTableType != model.TempTableNone { + req.Reset() + return nil + } + err := e.result.Next(ctx, req) if err != nil { e.feedback.Invalidate() @@ -267,6 +276,10 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.dagPB.CollectExecutionSummaries = &collExec } e.kvRanges = kvRanges + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + if e.table.Meta().TempTableType != model.TempTableNone { + return nil + } e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) @@ -385,6 +398,12 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() return err } + + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + if e.table.Meta().TempTableType == model.TempTableGlobal { + return nil + } + err = e.open(ctx) if err != nil { e.feedback.Invalidate() @@ -646,6 +665,10 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { + if e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + if !e.workerStarted || e.finished == nil { return nil } @@ -666,6 +689,11 @@ func (e *IndexLookUpExecutor) Close() error { // Next implements Exec Next interface. func (e *IndexLookUpExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table.Meta().TempTableType == model.TempTableGlobal { + req.Reset() + return nil + } + if !e.workerStarted { if err := e.startWorkers(ctx, req.RequiredRows()); err != nil { return err diff --git a/executor/executor_test.go b/executor/executor_test.go index 730de6dd70bde..50cd005aa7e6d 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8229,3 +8229,49 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) { rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a > 'a'").Rows() c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) } + +func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { + // Test that table reader/index reader/index lookup on the temporary table do not need to visit TiKV. + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("create table normal (id int, a int, index(a))") + tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") + + tk.MustExec("begin") + tk.MustExec("insert into tmp_t values (1, 1)") + tk.MustExec("insert into tmp_t values (2, 2)") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil) + }() + + // Make sure the fail point works. + // With that failpoint, all requests to the TiKV is discard. + rs, err := tk.Exec("select * from normal") + c.Assert(err, IsNil) + blocked := make(chan struct{}) + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + _, err := session.ResultSetToStringSlice(ctx, tk.Se, rs) + blocked <- struct{}{} + c.Assert(err, NotNil) + }() + select { + case <-blocked: + c.Error("The query should block when the failpoint is enabled") + case <-time.After(200 * time.Millisecond): + } + cancelFunc() + + // Check the temporary table do not send request to TiKV. + // Table reader + tk.MustQuery("select * from tmp_t").Check(testkit.Rows("1 1", "2 2")) + // Index reader + tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t").Check(testkit.Rows("1", "2")) + // Index lookup + tk.MustQuery("select id from tmp_t where a = 1").Check(testkit.Rows("1")) + + tk.MustExec("rollback") +} diff --git a/executor/table_reader.go b/executor/table_reader.go index 1a76598fb2250..d91ed46e4c819 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -154,6 +154,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { } } firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle) + + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + // Calculate the kv ranges here, UnionScan rely on this kv ranges. + if e.table.Meta().TempTableType != model.TempTableNone { + kvReq, err := e.buildKVReq(ctx, firstPartRanges) + if err != nil { + return err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + if len(secondPartRanges) != 0 { + kvReq, err = e.buildKVReq(ctx, secondPartRanges) + if err != nil { + return err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + } + return nil + } + firstResult, err := e.buildResp(ctx, firstPartRanges) if err != nil { e.feedback.Invalidate() @@ -176,6 +195,12 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + if e.table.Meta().TempTableType != model.TempTableNone { + // Treat temporary table as dummy table, avoid sending distsql request to TiKV. + req.Reset() + return nil + } + logutil.Eventf(ctx, "table scan table: %s, range: %v", stringutil.MemoizeStr(func() string { var tableName string if meta := e.table.Meta(); meta != nil { @@ -198,6 +223,10 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { + if e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + var err error if e.resultHandler != nil { err = e.resultHandler.Close() @@ -210,6 +239,20 @@ func (e *TableReaderExecutor) Close() error { // buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResult returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { + kvReq, err := e.buildKVReq(ctx, ranges) + if err != nil { + return nil, err + } + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + return nil, err + } + return result, nil +} + +func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.Range) (*kv.Request, error) { var builder distsql.RequestBuilder var reqBuilder *distsql.RequestBuilder if e.kvRangeBuilder != nil { @@ -235,17 +278,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if is, ok := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok { reqBuilder.SetFromInfoSchema(is) } - kvReq, err := reqBuilder.Build() - if err != nil { - return nil, err - } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) - - result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) - if err != nil { - return nil, err - } - return result, nil + return reqBuilder.Build() } func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnInfo) []int { diff --git a/go.sum b/go.sum index 14986c3d1f025..e2874f949d8a4 100644 --- a/go.sum +++ b/go.sum @@ -500,6 +500,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A= From 21ed1ab1c5732245d31628cb59ddc9d5614f1c29 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 20 May 2021 15:39:22 +0800 Subject: [PATCH 2/7] fix CI --- executor/distsql.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index bf82acfb09827..d85a2bf52a3c2 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -197,7 +197,7 @@ type IndexReaderExecutor struct { // Close clears all resources hold by current object. func (e *IndexReaderExecutor) Close() error { - if e.table.Meta().TempTableType != model.TempTableNone { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } @@ -209,7 +209,7 @@ func (e *IndexReaderExecutor) Close() error { // Next implements the Executor Next interface. func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if e.table.Meta().TempTableType != model.TempTableNone { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { req.Reset() return nil } @@ -277,7 +277,8 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } e.kvRanges = kvRanges // Treat temporary table as dummy table, avoid sending distsql request to TiKV. - if e.table.Meta().TempTableType != model.TempTableNone { + // In a test case IndexReaderExecutor is mocked and e.table is nil. + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } From 7f9f66ece251b73e9a39f0abd2d7ce917a764e7f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 20 May 2021 16:37:25 +0800 Subject: [PATCH 3/7] fix CI --- executor/table_reader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/table_reader.go b/executor/table_reader.go index d91ed46e4c819..47f3ba7f9a7d9 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -157,7 +157,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Treat temporary table as dummy table, avoid sending distsql request to TiKV. // Calculate the kv ranges here, UnionScan rely on this kv ranges. - if e.table.Meta().TempTableType != model.TempTableNone { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { kvReq, err := e.buildKVReq(ctx, firstPartRanges) if err != nil { return err @@ -195,7 +195,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - if e.table.Meta().TempTableType != model.TempTableNone { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { // Treat temporary table as dummy table, avoid sending distsql request to TiKV. req.Reset() return nil @@ -223,7 +223,7 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { - if e.table.Meta().TempTableType != model.TempTableNone { + if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } From a05ad8ff89ca536be7603d0ef8abd12268d03437 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 25 May 2021 16:32:07 +0800 Subject: [PATCH 4/7] fix CI --- executor/table_reader.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/executor/table_reader.go b/executor/table_reader.go index 88faa796243f8..889f534fceefd 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -274,10 +274,6 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R SetMemTracker(e.memTracker). SetStoreType(e.storeType). SetAllowBatchCop(e.batchCop) - // infoschema maybe null for tests - if is, ok := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok { - reqBuilder.SetFromInfoSchema(is) - } return reqBuilder.Build() } From d947cbe36d13e83eacf71bc2d345eadda6160906 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 25 May 2021 16:36:37 +0800 Subject: [PATCH 5/7] go mod tidy --- go.sum | 1 - 1 file changed, 1 deletion(-) diff --git a/go.sum b/go.sum index b697a51817d5d..664a52bdca20d 100644 --- a/go.sum +++ b/go.sum @@ -497,7 +497,6 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= -github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A= From fdab57a542ed78b6efcbc401602972fd12b12168 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 25 May 2021 20:38:25 +0800 Subject: [PATCH 6/7] address comment --- executor/executor_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/executor/executor_test.go b/executor/executor_test.go index 1326fd7f9321b..fed5a2b1359bb 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8353,10 +8353,13 @@ func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { // Check the temporary table do not send request to TiKV. // Table reader + tk.HasPlan("select * from tmp_t", "TableReader") tk.MustQuery("select * from tmp_t").Check(testkit.Rows("1 1", "2 2")) // Index reader + tk.HasPlan("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t", "IndexReader") tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ a from tmp_t").Check(testkit.Rows("1", "2")) // Index lookup + tk.HasPlan("select id from tmp_t where a = 1", "IndexLookUp") tk.MustQuery("select id from tmp_t where a = 1").Check(testkit.Rows("1")) tk.MustExec("rollback") From daaf04294eddcf3c6024bc142f7364edd1708cb5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 2 Jun 2021 17:56:08 +0800 Subject: [PATCH 7/7] fix a data race --- executor/executor_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 949603575b5e6..8d70d7fa16468 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8359,8 +8359,10 @@ func (s testSerialSuite) TestExprBlackListForEnum(c *C) { func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { // Test that table reader/index reader/index lookup on the temporary table do not need to visit TiKV. tk := testkit.NewTestKit(c, s.store) + tk1 := testkit.NewTestKit(c, s.store) tk.MustExec("use test") + tk1.MustExec("use test") tk.MustExec("create table normal (id int, a int, index(a))") tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") @@ -8375,12 +8377,12 @@ func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { // Make sure the fail point works. // With that failpoint, all requests to the TiKV is discard. - rs, err := tk.Exec("select * from normal") + rs, err := tk1.Exec("select * from normal") c.Assert(err, IsNil) blocked := make(chan struct{}) ctx, cancelFunc := context.WithCancel(context.Background()) go func() { - _, err := session.ResultSetToStringSlice(ctx, tk.Se, rs) + _, err := session.ResultSetToStringSlice(ctx, tk1.Se, rs) blocked <- struct{}{} c.Assert(err, NotNil) }()