From 8b1694873c1defa0ca97e7b8d496970c88c2a98f Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 19 Jun 2019 12:53:41 +0800 Subject: [PATCH 1/7] tikv: avoid switch peer when batchRequest be cancelled (#10822) --- store/tikv/client.go | 12 ++++++------ store/tikv/client_test.go | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index d4df0be5f4100..f8c0feb9f4d3e 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -35,10 +35,8 @@ import ( "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" "google.golang.org/grpc" - gcodes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - gstatus "google.golang.org/grpc/status" ) // MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than @@ -618,8 +616,9 @@ func sendBatchRequest( select { case connArray.batchCommandsCh <- entry: case <-ctx1.Done(): - logutil.Logger(context.Background()).Warn("send request is timeout", zap.String("to", addr)) - return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout")) + logutil.Logger(context.Background()).Warn("send request is cancelled", + zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) + return nil, errors.Trace(ctx1.Err()) } select { @@ -630,8 +629,9 @@ func sendBatchRequest( return tikvrpc.FromBatchCommandsResponse(res), nil case <-ctx1.Done(): atomic.StoreInt32(&entry.canceled, 1) - logutil.Logger(context.Background()).Warn("send request is canceled", zap.String("to", addr)) - return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout")) + logutil.Logger(context.Background()).Warn("wait response is cancelled", + zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) + return nil, errors.Trace(ctx1.Err()) } } diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index e52498726bef1..4d4d2fa1e7d38 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -14,9 +14,12 @@ package tikv import ( + "context" "testing" + "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" ) @@ -77,3 +80,16 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { newEntryPtr := &entries[0] c.Assert(entryPtr, Equals, newEntryPtr) } + +func (s *testClientSuite) TestCancelTimeoutRetErr(c *C) { + req := new(tikvpb.BatchCommandsRequest_Request) + a := &connArray{batchCommandsCh: make(chan *batchCommandsEntry, 1)} + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + _, err := sendBatchRequest(ctx, "", a, req, 2*time.Second) + c.Assert(errors.Cause(err), Equals, context.Canceled) + + _, err = sendBatchRequest(context.Background(), "", a, req, 0) + c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) +} From eed6a9efa6f0ec15b4c7225d90edb451a7191bd2 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Wed, 19 Jun 2019 13:47:41 +0800 Subject: [PATCH 2/7] session: init expensiveQueryHandle immediately after domain be initiated (#10845) --- session/bootstrap_test.go | 13 +++++++++++++ session/session.go | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index 8004b7923ef3e..333326f1353ff 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -296,3 +296,16 @@ func (s *testBootstrapSuite) TestOldPasswordUpgrade(c *C) { c.Assert(err, IsNil) c.Assert(newpwd, Equals, "*0D3CED9BEC10A777AEC23CCC353A8C08A633045E") } + +func (s *testBootstrapSuite) TestBootstrapInitExpensiveQueryHandle(c *C) { + defer testleak.AfterTest(c)() + store := newStore(c, s.dbName) + defer store.Close() + se, err := createSession(store) + c.Assert(err, IsNil) + dom := domain.GetDomain(se) + c.Assert(dom, NotNil) + defer dom.Close() + dom.InitExpensiveQueryHandle() + c.Assert(dom.ExpensiveQueryHandle(), NotNil) +} diff --git a/session/session.go b/session/session.go index f8be82fe08fe3..56e30f70d3c7b 100644 --- a/session/session.go +++ b/session/session.go @@ -1507,6 +1507,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { timeutil.SetSystemTZ(tz) dom := domain.GetDomain(se) + dom.InitExpensiveQueryHandle() if !config.GetGlobalConfig().Security.SkipGrantTable { err = dom.LoadPrivilegeLoop(se) @@ -1543,7 +1544,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err != nil { return nil, err } - dom.InitExpensiveQueryHandle() if raw, ok := store.(tikv.EtcdBackend); ok { err = raw.StartGCWorker() if err != nil { From b3c40b168e2047f459d87c955ef0f7ba437abb5d Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Wed, 19 Jun 2019 15:01:16 +0800 Subject: [PATCH 3/7] Notify affected regions after sending UnsafeDestroyRange (#10069) --- go.mod | 2 +- go.sum | 4 +- store/tikv/delete_range.go | 91 +++++++++++++++++++++----------- store/tikv/delete_range_test.go | 31 ++++++----- store/tikv/gcworker/gc_worker.go | 29 +++++++--- 5 files changed, 100 insertions(+), 57 deletions(-) diff --git a/go.mod b/go.mod index 310fd1a26595a..b0a09357089ba 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20190422094118-d8535965f59b github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 // indirect github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190528074401-b942b3f4108f + github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190612052718-3b36f86d9b7b github.com/pingcap/pd v2.1.11+incompatible diff --git a/go.sum b/go.sum index 3ef098e116d20..76f7a1045a7bd 100644 --- a/go.sum +++ b/go.sum @@ -129,8 +129,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2 github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190528074401-b942b3f4108f h1:EXZvZmZl+n4PGSRD8fykjHGzXS8QarWYx7KgIBBa7rg= -github.com/pingcap/kvproto v0.0.0-20190528074401-b942b3f4108f/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 h1:92t0y430CJF0tN1lvUhP5fhnYTFmssATJqwxQtvixYU= +github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20190612052718-3b36f86d9b7b h1:4/+CIoFd4AMLZbjDpqhoa9IByT/lVcg+13/W/UgNVXM= diff --git a/store/tikv/delete_range.go b/store/tikv/delete_range.go index 5d75fc5230a09..0154035f70211 100644 --- a/store/tikv/delete_range.go +++ b/store/tikv/delete_range.go @@ -16,9 +16,9 @@ package tikv import ( "bytes" "context" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) @@ -27,40 +27,74 @@ import ( // if the task was canceled or not. type DeleteRangeTask struct { completedRegions int - canceled bool store Storage - ctx context.Context startKey []byte endKey []byte + notifyOnly bool + concurrency int } -// NewDeleteRangeTask creates a DeleteRangeTask. Deleting will not be performed right away. -// WARNING: Currently, this API may leave some waste key-value pairs uncleaned in TiKV. Be careful while using it. -func NewDeleteRangeTask(ctx context.Context, store Storage, startKey []byte, endKey []byte) *DeleteRangeTask { +// NewDeleteRangeTask creates a DeleteRangeTask. Deleting will be performed when `Execute` method is invoked. +// Be careful while using this API. This API doesn't keep recent MVCC versions, but will delete all versions of all keys +// in the range immediately. Also notice that frequent invocation to this API may cause performance problems to TiKV. +func NewDeleteRangeTask(store Storage, startKey []byte, endKey []byte, concurrency int) *DeleteRangeTask { return &DeleteRangeTask{ completedRegions: 0, - canceled: false, store: store, - ctx: ctx, startKey: startKey, endKey: endKey, + notifyOnly: false, + concurrency: concurrency, + } +} + +// NewNotifyDeleteRangeTask creates a task that sends delete range requests to all regions in the range, but with the +// flag `notifyOnly` set. TiKV will not actually delete the range after receiving request, but it will be replicated via +// raft. This is used to notify the involved regions before sending UnsafeDestroyRange requests. +func NewNotifyDeleteRangeTask(store Storage, startKey []byte, endKey []byte, concurrency int) *DeleteRangeTask { + task := NewDeleteRangeTask(store, startKey, endKey, concurrency) + task.notifyOnly = true + return task +} + +// getRunnerName returns a name for RangeTaskRunner. +func (t *DeleteRangeTask) getRunnerName() string { + if t.notifyOnly { + return "delete-range-notify" } + return "delete-range" } // Execute performs the delete range operation. -func (t *DeleteRangeTask) Execute() error { - startKey, rangeEndKey := t.startKey, t.endKey +func (t *DeleteRangeTask) Execute(ctx context.Context) error { + runnerName := t.getRunnerName() + + runner := NewRangeTaskRunner(runnerName, t.store, t.concurrency, t.sendReqOnRange) + err := runner.RunOnRange(ctx, t.startKey, t.endKey) + t.completedRegions = int(runner.CompletedRegions()) + + return err +} + +// Execute performs the delete range operation. +func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (int, error) { + startKey, rangeEndKey := r.StartKey, r.EndKey + completedRegions := 0 for { select { - case <-t.ctx.Done(): - t.canceled = true - return nil + case <-ctx.Done(): + return completedRegions, errors.Trace(ctx.Err()) default: } - bo := NewBackoffer(t.ctx, deleteRangeOneRegionMaxBackoff) + + if bytes.Compare(startKey, rangeEndKey) >= 0 { + break + } + + bo := NewBackoffer(ctx, deleteRangeOneRegionMaxBackoff) loc, err := t.store.GetRegionCache().LocateKey(bo, startKey) if err != nil { - return errors.Trace(err) + return completedRegions, errors.Trace(err) } // Delete to the end of the region, except if it's the last region overlapping the range @@ -73,49 +107,42 @@ func (t *DeleteRangeTask) Execute() error { req := &tikvrpc.Request{ Type: tikvrpc.CmdDeleteRange, DeleteRange: &kvrpcpb.DeleteRangeRequest{ - StartKey: startKey, - EndKey: endKey, + StartKey: startKey, + EndKey: endKey, + NotifyOnly: t.notifyOnly, }, } resp, err := t.store.SendReq(bo, req, loc.Region, ReadTimeoutMedium) if err != nil { - return errors.Trace(err) + return completedRegions, errors.Trace(err) } regionErr, err := resp.GetRegionError() if err != nil { - return errors.Trace(err) + return completedRegions, errors.Trace(err) } if regionErr != nil { err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return errors.Trace(err) + return completedRegions, errors.Trace(err) } continue } deleteRangeResp := resp.DeleteRange if deleteRangeResp == nil { - return errors.Trace(ErrBodyMissing) + return completedRegions, errors.Trace(ErrBodyMissing) } if err := deleteRangeResp.GetError(); err != "" { - return errors.Errorf("unexpected delete range err: %v", err) - } - t.completedRegions++ - if bytes.Equal(endKey, rangeEndKey) { - break + return completedRegions, errors.Errorf("unexpected delete range err: %v", err) } + completedRegions++ startKey = endKey } - return nil + return completedRegions, nil } // CompletedRegions returns the number of regions that are affected by this delete range task func (t *DeleteRangeTask) CompletedRegions() int { return t.completedRegions } - -// IsCanceled returns true if the delete range operation was canceled on the half way -func (t *DeleteRangeTask) IsCanceled() bool { - return t.canceled -} diff --git a/store/tikv/delete_range_test.go b/store/tikv/delete_range_test.go index 1feb88c7eaefc..cbb9206917b9e 100644 --- a/store/tikv/delete_range_test.go +++ b/store/tikv/delete_range_test.go @@ -33,7 +33,7 @@ var _ = Suite(&testDeleteRangeSuite{}) func (s *testDeleteRangeSuite) SetUpTest(c *C) { s.cluster = mocktikv.NewCluster() - mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c")) + mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("b"), []byte("c"), []byte("d")) client, pdClient, err := mocktikv.NewTiKVAndPDClient(s.cluster, nil, "") c.Assert(err, IsNil) @@ -81,12 +81,13 @@ func (s *testDeleteRangeSuite) checkData(c *C, expectedData map[string]string) { c.Assert(data, DeepEquals, expectedData) } -func (s *testDeleteRangeSuite) deleteRange(c *C, startKey []byte, endKey []byte) { - ctx := context.Background() - task := NewDeleteRangeTask(ctx, s.store, startKey, endKey) +func (s *testDeleteRangeSuite) deleteRange(c *C, startKey []byte, endKey []byte) int { + task := NewDeleteRangeTask(s.store, startKey, endKey, 1) - err := task.Execute() + err := task.Execute(context.Background()) c.Assert(err, IsNil) + + return task.CompletedRegions() } // deleteRangeFromMap deletes all keys in a given range from a map @@ -100,10 +101,11 @@ func deleteRangeFromMap(m map[string]string, startKey []byte, endKey []byte) { } // mustDeleteRange does delete range on both the map and the storage, and assert they are equal after deleting -func (s *testDeleteRangeSuite) mustDeleteRange(c *C, startKey []byte, endKey []byte, expected map[string]string) { - s.deleteRange(c, startKey, endKey) +func (s *testDeleteRangeSuite) mustDeleteRange(c *C, startKey []byte, endKey []byte, expected map[string]string, regions int) { + completedRegions := s.deleteRange(c, startKey, endKey) deleteRangeFromMap(expected, startKey, endKey) s.checkData(c, expected) + c.Assert(completedRegions, Equals, regions) } func (s *testDeleteRangeSuite) TestDeleteRange(c *C) { @@ -119,7 +121,8 @@ func (s *testDeleteRangeSuite) TestDeleteRange(c *C) { key := []byte{byte(i), byte(j)} value := []byte{byte(rand.Intn(256)), byte(rand.Intn(256))} testData[string(key)] = string(value) - txn.Set(key, value) + err := txn.Set(key, value) + c.Assert(err, IsNil) } } @@ -128,10 +131,10 @@ func (s *testDeleteRangeSuite) TestDeleteRange(c *C) { s.checkData(c, testData) - s.mustDeleteRange(c, []byte("b"), []byte("c0"), testData) - s.mustDeleteRange(c, []byte("c11"), []byte("c12"), testData) - s.mustDeleteRange(c, []byte("d0"), []byte("d0"), testData) - s.mustDeleteRange(c, []byte("d0\x00"), []byte("d1\x00"), testData) - s.mustDeleteRange(c, []byte("c5"), []byte("d5"), testData) - s.mustDeleteRange(c, []byte("a"), []byte("z"), testData) + s.mustDeleteRange(c, []byte("b"), []byte("c0"), testData, 2) + s.mustDeleteRange(c, []byte("c11"), []byte("c12"), testData, 1) + s.mustDeleteRange(c, []byte("d0"), []byte("d0"), testData, 0) + s.mustDeleteRange(c, []byte("d0\x00"), []byte("d1\x00"), testData, 1) + s.mustDeleteRange(c, []byte("c5"), []byte("d5"), testData, 2) + s.mustDeleteRange(c, []byte("a"), []byte("z"), testData, 4) } diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index f410998acb926..f2a6d9df82a11 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -452,7 +452,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i w.done <- errors.Trace(err) return } - err = w.deleteRanges(ctx, safePoint) + err = w.deleteRanges(ctx, safePoint, concurrency) if err != nil { logutil.Logger(ctx).Error("[gc worker] delete range returns an error", zap.String("uuid", w.uuid), @@ -461,7 +461,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i w.done <- errors.Trace(err) return } - err = w.redoDeleteRanges(ctx, safePoint) + err = w.redoDeleteRanges(ctx, safePoint, concurrency) if err != nil { logutil.Logger(ctx).Error("[gc worker] redo-delete range returns an error", zap.String("uuid", w.uuid), @@ -508,7 +508,8 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i } // deleteRanges processes all delete range records whose ts < safePoint in table `gc_delete_range` -func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error { +// `concurrency` specifies the concurrency to send NotifyDeleteRange. +func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64, concurrency int) error { metrics.GCWorkerCounter.WithLabelValues("delete_range").Inc() se := createSession(w.store) @@ -525,7 +526,7 @@ func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error { for _, r := range ranges { startKey, endKey := r.Range() - err = w.sendUnsafeDestroyRangeRequest(ctx, startKey, endKey) + err = w.doUnsafeDestroyRangeRequest(ctx, startKey, endKey, concurrency) if err != nil { return errors.Trace(err) } @@ -546,7 +547,8 @@ func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error { } // redoDeleteRanges checks all deleted ranges whose ts is at least `lifetime + 24h` ago. See TiKV RFC #2. -func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64) error { +// `concurrency` specifies the concurrency to send NotifyDeleteRange. +func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64, concurrency int) error { metrics.GCWorkerCounter.WithLabelValues("redo_delete_range").Inc() // We check delete range records that are deleted about 24 hours ago. @@ -566,7 +568,7 @@ func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64) error for _, r := range ranges { startKey, endKey := r.Range() - err = w.sendUnsafeDestroyRangeRequest(ctx, startKey, endKey) + err = w.doUnsafeDestroyRangeRequest(ctx, startKey, endKey, concurrency) if err != nil { return errors.Trace(err) } @@ -586,7 +588,7 @@ func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64) error return nil } -func (w *GCWorker) sendUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte) error { +func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte, concurrency int) error { // Get all stores every time deleting a region. So the store list is less probably to be stale. stores, err := w.getUpStores(ctx) if err != nil { @@ -625,6 +627,17 @@ func (w *GCWorker) sendUnsafeDestroyRangeRequest(ctx context.Context, startKey [ wg.Wait() + // Notify all affected regions in the range that UnsafeDestroyRange occurs. + notifyTask := tikv.NewNotifyDeleteRangeTask(w.store, startKey, endKey, concurrency) + err = notifyTask.Execute(ctx) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] failed notifying regions affected by UnsafeDestroyRange", + zap.String("uuid", w.uuid), + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey), + zap.Error(err)) + } + return errors.Trace(err) } @@ -1315,5 +1328,5 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) { // DeleteRanges calls deleteRanges internally, just for test. func (w *MockGCWorker) DeleteRanges(ctx context.Context, safePoint uint64) error { logutil.Logger(ctx).Error("deleteRanges is called") - return w.worker.deleteRanges(ctx, safePoint) + return w.worker.deleteRanges(ctx, safePoint, 1) } From f6be085bc6f08e5365b9e571f4ef39de90b0278b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 19 Jun 2019 15:55:28 +0800 Subject: [PATCH 4/7] executor: tiny refactor the Executor interface (#10846) Introduce a base() method to get the baseExecutor, so we don't need to add method from baseExecutor to the Executor interface any more Before: ``` type Executor interface { Open(context.Context) error Next(ctx context.Context, req *chunk.RecordBatch) error Close() error Schema() *expression.Schema retTypes() []*types.FieldType newFirstChunk() *chunk.Chunk } ``` After: ``` type Executor interface { base() *baseExecutor Open(context.Context) error Next(ctx context.Context, req *chunk.RecordBatch) error Close() error Schema() *expression.Schema } ``` --- executor/adapter.go | 6 +-- executor/admin.go | 2 +- executor/aggregate.go | 10 ++--- executor/benchmark_test.go | 8 ++-- executor/builder.go | 20 ++++----- executor/delete.go | 8 ++-- executor/distsql.go | 4 +- executor/executor.go | 39 +++++++++-------- executor/executor_pkg_test.go | 2 +- executor/executor_required_rows_test.go | 24 +++++----- executor/explain.go | 2 +- executor/index_lookup_join.go | 8 ++-- executor/insert_common.go | 4 +- executor/join.go | 18 ++++---- executor/merge_join.go | 4 +- executor/pkg_test.go | 12 ++--- executor/point_get.go | 46 ++++++-------------- executor/projection.go | 6 +-- executor/radix_hash_join.go | 2 +- executor/show.go | 2 +- executor/sort.go | 12 ++--- executor/table_reader.go | 2 +- executor/table_readers_required_rows_test.go | 4 +- executor/union_scan.go | 8 ++-- executor/update.go | 4 +- executor/window.go | 2 +- 26 files changed, 122 insertions(+), 137 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index aaed323245541..7f3c7417d91d4 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -126,7 +126,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error { // NewRecordBatch create a recordBatch base on top-level executor's newFirstChunk(). func (a *recordSet) NewRecordBatch() *chunk.RecordBatch { - return chunk.NewRecordBatch(a.executor.newFirstChunk()) + return chunk.NewRecordBatch(newFirstChunk(a.executor)) } func (a *recordSet) Close() error { @@ -307,7 +307,7 @@ func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) er } func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch { - return chunk.NewRecordBatch(c.e.newFirstChunk()) + return chunk.NewRecordBatch(newFirstChunk(c.e)) } func (c *chunkRowRecordSet) Close() error { @@ -385,7 +385,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex a.logAudit() }() - err = e.Next(ctx, chunk.NewRecordBatch(e.newFirstChunk())) + err = e.Next(ctx, chunk.NewRecordBatch(newFirstChunk(e))) if err != nil { return nil, err } diff --git a/executor/admin.go b/executor/admin.go index e6721dd976f89..f4944ba17f916 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error { FieldType: *colTypeForHandle, }) - e.srcChunk = e.newFirstChunk() + e.srcChunk = newFirstChunk(e) dagPB, err := e.buildDAGPB() if err != nil { return err diff --git a/executor/aggregate.go b/executor/aggregate.go index e8fa560797dab..dda7d395599b3 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -239,7 +239,7 @@ func (e *HashAggExec) initForUnparallelExec() { e.partialResultMap = make(aggPartialResultMapper) e.groupKeyBuffer = make([]byte, 0, 8) e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer)) - e.childResult = e.children[0].newFirstChunk() + e.childResult = newFirstChunk(e.children[0]) } func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { @@ -275,12 +275,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { partialResultsMap: make(aggPartialResultMapper), groupByItems: e.GroupByItems, groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)), - chk: e.children[0].newFirstChunk(), + chk: newFirstChunk(e.children[0]), } e.partialWorkers[i] = w e.inputCh <- &HashAggInput{ - chk: e.children[0].newFirstChunk(), + chk: newFirstChunk(e.children[0]), giveBackCh: w.inputCh, } } @@ -295,7 +295,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { outputCh: e.finalOutputCh, finalResultHolderCh: e.finalInputCh, rowBuffer: make([]types.Datum, 0, e.Schema().Len()), - mutableRow: chunk.MutRowFromTypes(e.retTypes()), + mutableRow: chunk.MutRowFromTypes(retTypes(e)), } } } @@ -772,7 +772,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } - e.childResult = e.children[0].newFirstChunk() + e.childResult = newFirstChunk(e.children[0]) e.executed = false e.isChildReturnEmpty = true e.inputIter = chunk.NewIterator4Chunk(e.childResult) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index e13e0eaf6f137..4d38af79e887f 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -129,7 +129,7 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.RecordBatch) err func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource { baseExec := newBaseExecutor(opt.ctx, opt.schema, nil) m := &mockDataSource{baseExec, opt, nil, nil, 0} - types := m.retTypes() + types := retTypes(m) colData := make([][]interface{}, len(types)) for i := 0; i < len(types); i++ { colData[i] = m.genColDatums(i) @@ -137,12 +137,12 @@ func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource { m.genData = make([]*chunk.Chunk, (m.p.rows+m.initCap-1)/m.initCap) for i := range m.genData { - m.genData[i] = chunk.NewChunkWithCapacity(m.retTypes(), m.ctx.GetSessionVars().MaxChunkSize) + m.genData[i] = chunk.NewChunkWithCapacity(retTypes(m), m.ctx.GetSessionVars().MaxChunkSize) } for i := 0; i < m.p.rows; i++ { idx := i / m.maxChunkSize - retTypes := m.retTypes() + retTypes := retTypes(m) for colIdx := 0; colIdx < len(types); colIdx++ { switch retTypes[colIdx].Tp { case mysql.TypeLong, mysql.TypeLonglong: @@ -259,7 +259,7 @@ func benchmarkAggExecWithCase(b *testing.B, casTest *aggTestCase) { b.StopTimer() // prepare a new agg-executor aggExec := buildAggExecutor(b, casTest, dataSource) tmpCtx := context.Background() - chk := aggExec.newFirstChunk() + chk := newFirstChunk(aggExec) dataSource.prepareChunks() b.StartTimer() diff --git a/executor/builder.go b/executor/builder.go index 9d424d86ea0d9..25e340ff361fe 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -872,8 +872,8 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu v.JoinType == plannercore.RightOuterJoin, defaultValues, v.OtherConditions, - leftExec.retTypes(), - rightExec.retTypes(), + retTypes(leftExec), + retTypes(rightExec), ), isOuterJoin: v.JoinType.IsOuterJoin(), } @@ -946,7 +946,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo } defaultValues := v.DefaultValues - lhsTypes, rhsTypes := leftExec.retTypes(), rightExec.retTypes() + lhsTypes, rhsTypes := retTypes(leftExec), retTypes(rightExec) if v.InnerChildIdx == 0 { if len(v.LeftConditions) > 0 { b.err = errors.Annotate(ErrBuildExecutor, "join's inner condition should be empty") @@ -1020,7 +1020,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { e.defaultVal = nil } else { - e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1) + e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) } for _, aggDesc := range v.AggFuncs { if aggDesc.HasDistinct { @@ -1079,7 +1079,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { e.defaultVal = nil } else { - e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1) + e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1) } for i, aggDesc := range v.AggFuncs { aggFunc := aggfuncs.Build(b.ctx, aggDesc, i) @@ -1220,7 +1220,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) *NestedLoopAp defaultValues = make([]types.Datum, v.Children()[v.InnerChildIdx].Schema().Len()) } tupleJoiner := newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, - defaultValues, otherConditions, leftChild.retTypes(), rightChild.retTypes()) + defaultValues, otherConditions, retTypes(leftChild), retTypes(rightChild)) outerExec, innerExec := leftChild, rightChild outerFilter, innerFilter := v.LeftConditions, v.RightConditions if v.InnerChildIdx == 0 { @@ -1703,7 +1703,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) if b.err != nil { return nil } - outerTypes := outerExec.retTypes() + outerTypes := retTypes(outerExec) innerPlan := v.Children()[1-v.OuterIndex] innerTypes := make([]*types.FieldType, innerPlan.Schema().Len()) for i, col := range innerPlan.Schema().Columns { @@ -1761,7 +1761,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) innerKeyCols[i] = v.InnerJoinKeys[i].Index } e.innerCtx.keyCols = innerKeyCols - e.joinResult = e.newFirstChunk() + e.joinResult = newFirstChunk(e) executorCounterIndexLookUpJoin.Inc() return e } @@ -2015,7 +2015,7 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context return nil, err } us := e.(*UnionScanExec) - us.snapshotChunkBuffer = us.newFirstChunk() + us.snapshotChunkBuffer = newFirstChunk(us) return us, nil } @@ -2050,7 +2050,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex return nil, err } e.resultHandler = &tableResultHandler{} - result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) + result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) if err != nil { return nil, err } diff --git a/executor/delete.go b/executor/delete.go index c3c52363dff2b..ae1620dfe14e0 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -100,8 +100,8 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { // If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode. batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize - fields := e.children[0].retTypes() - chk := e.children[0].newFirstChunk() + fields := retTypes(e.children[0]) + chk := newFirstChunk(e.children[0]) for { iter := chunk.NewIterator4Chunk(chk) @@ -183,8 +183,8 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { e.initialMultiTableTblMap() colPosInfos := e.getColPosInfos(e.children[0].Schema()) tblRowMap := make(tableRowMapType) - fields := e.children[0].retTypes() - chk := e.children[0].newFirstChunk() + fields := retTypes(e.children[0]) + chk := newFirstChunk(e.children[0]) for { iter := chunk.NewIterator4Chunk(chk) err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk)) diff --git a/executor/distsql.go b/executor/distsql.go index 5bd77376292c1..2b380313355d0 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -330,7 +330,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.feedback.Invalidate() return err } - e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) if err != nil { e.feedback.Invalidate() return err @@ -794,7 +794,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er handleCnt := len(task.handles) task.rows = make([]chunk.Row, 0, handleCnt) for { - chk := tableReader.newFirstChunk() + chk := newFirstChunk(tableReader) err = tableReader.Next(ctx, chunk.NewRecordBatch(chk)) if err != nil { logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err)) diff --git a/executor/executor.go b/executor/executor.go index 5a26b246f3adc..9991a2d7343a3 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -86,6 +86,11 @@ type baseExecutor struct { runtimeStats *execdetails.RuntimeStats } +// base returns the baseExecutor of an executor, don't override this method! +func (e *baseExecutor) base() *baseExecutor { + return e +} + // Open initializes children recursively and "childrenResults" according to children's schemas. func (e *baseExecutor) Open(ctx context.Context) error { for _, child := range e.children { @@ -117,13 +122,15 @@ func (e *baseExecutor) Schema() *expression.Schema { } // newFirstChunk creates a new chunk to buffer current executor's result. -func (e *baseExecutor) newFirstChunk() *chunk.Chunk { - return chunk.New(e.retTypes(), e.initCap, e.maxChunkSize) +func newFirstChunk(e Executor) *chunk.Chunk { + base := e.base() + return chunk.New(base.retFieldTypes, base.initCap, base.maxChunkSize) } // retTypes returns all output column types. -func (e *baseExecutor) retTypes() []*types.FieldType { - return e.retFieldTypes +func retTypes(e Executor) []*types.FieldType { + base := e.base() + return base.retFieldTypes } // Next fills mutiple rows into a chunk. @@ -166,13 +173,11 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id fmt.S // return a batch of rows, other than a single row in Volcano. // NOTE: Executors must call "chk.Reset()" before appending their results to it. type Executor interface { + base() *baseExecutor Open(context.Context) error Next(ctx context.Context, req *chunk.RecordBatch) error Close() error Schema() *expression.Schema - - retTypes() []*types.FieldType - newFirstChunk() *chunk.Chunk } // CancelDDLJobsExec represents a cancel DDL jobs executor. @@ -552,7 +557,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, req *chunk.RecordBatch) error if err != nil { return err } - chk := e.src.newFirstChunk() + chk := newFirstChunk(e.src) for { err := e.src.Next(ctx, chunk.NewRecordBatch(chk)) if err != nil { @@ -770,7 +775,7 @@ func (e *LimitExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } - e.childResult = e.children[0].newFirstChunk() + e.childResult = newFirstChunk(e.children[0]) e.cursor = 0 e.meetFirstBatch = e.begin == 0 return nil @@ -816,7 +821,7 @@ func init() { if err != nil { return rows, err } - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for { err = exec.Next(ctx, chunk.NewRecordBatch(chk)) if err != nil { @@ -827,7 +832,7 @@ func init() { } iter := chunk.NewIterator4Chunk(chk) for r := iter.Begin(); r != iter.End(); r = iter.Next() { - row := r.GetDatumRow(exec.retTypes()) + row := r.GetDatumRow(retTypes(exec)) rows = append(rows, row) } chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) @@ -892,7 +897,7 @@ func (e *SelectionExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } - e.childResult = e.children[0].newFirstChunk() + e.childResult = newFirstChunk(e.children[0]) e.batched = expression.Vectorizable(e.filters) if e.batched { e.selected = make([]bool, 0, chunk.InitialCapacity) @@ -1011,7 +1016,7 @@ func (e *TableScanExec) Next(ctx context.Context, req *chunk.RecordBatch) error return err } - mutableRow := chunk.MutRowFromTypes(e.retTypes()) + mutableRow := chunk.MutRowFromTypes(retTypes(e)) for req.NumRows() < req.Capacity() { row, err := e.getRow(handle) if err != nil { @@ -1027,12 +1032,12 @@ func (e *TableScanExec) Next(ctx context.Context, req *chunk.RecordBatch) error func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error { chk.GrowAndReset(e.maxChunkSize) if e.virtualTableChunkList == nil { - e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) + e.virtualTableChunkList = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize) columns := make([]*table.Column, e.schema.Len()) for i, colInfo := range e.columns { columns[i] = table.ToColumn(colInfo) } - mutableRow := chunk.MutRowFromTypes(e.retTypes()) + mutableRow := chunk.MutRowFromTypes(retTypes(e)) err := e.t.IterRecords(e.ctx, nil, columns, func(h int64, rec []types.Datum, cols []*table.Column) (bool, error) { mutableRow.SetDatums(rec...) e.virtualTableChunkList.AppendRow(mutableRow.ToRow()) @@ -1129,7 +1134,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, req *chunk.RecordBatch) error return errors.New("subquery returns more than 1 row") } - childChunk := e.children[0].newFirstChunk() + childChunk := newFirstChunk(e.children[0]) err = e.children[0].Next(ctx, chunk.NewRecordBatch(childChunk)) if err != nil { return err @@ -1194,7 +1199,7 @@ func (e *UnionExec) Open(ctx context.Context) error { return err } for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newFirstChunk()) + e.childrenResults = append(e.childrenResults, newFirstChunk(child)) } e.stopFetchData.Store(false) e.initialized = false diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 23ba938192a08..47c66e4eb5798 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -98,7 +98,7 @@ func (s *testExecSuite) TestShowProcessList(c *C) { err := e.Open(ctx) c.Assert(err, IsNil) - chk := e.newFirstChunk() + chk := newFirstChunk(e) it := chunk.NewIterator4Chunk(chk) // Run test and check results. for _, p := range ps { diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 5cdd3cfe2898e..70cf56031e79e 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -93,9 +93,9 @@ func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.RecordBatc } func (r *requiredRowsDataSource) genOneRow() chunk.Row { - row := chunk.MutRowFromTypes(r.retTypes()) - for i := range r.retTypes() { - row.SetValue(i, r.generator(r.retTypes()[i])) + row := chunk.MutRowFromTypes(retTypes(r)) + for i, tp := range retTypes(r) { + row.SetValue(i, r.generator(tp)) } return row.ToRow() } @@ -177,7 +177,7 @@ func (s *testExecSuite) TestLimitRequiredRows(c *C) { ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) exec := buildLimitExec(sctx, ds, testCase.limitOffset, testCase.limitCount) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], sctx.GetSessionVars().MaxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -260,7 +260,7 @@ func (s *testExecSuite) TestSortRequiredRows(c *C) { } exec := buildSortExec(sctx, byItems, ds) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -367,7 +367,7 @@ func (s *testExecSuite) TestTopNRequiredRows(c *C) { } exec := buildTopNExec(sctx, testCase.topNOffset, testCase.topNCount, byItems, ds) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -460,7 +460,7 @@ func (s *testExecSuite) TestSelectionRequiredRows(c *C) { } exec := buildSelectionExec(sctx, filters, ds) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -518,7 +518,7 @@ func (s *testExecSuite) TestProjectionUnparallelRequiredRows(c *C) { } exec := buildProjectionExec(sctx, exprs, ds, 0) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -574,7 +574,7 @@ func (s *testExecSuite) TestProjectionParallelRequiredRows(c *C) { } exec := buildProjectionExec(sctx, exprs, ds, testCase.numWorkers) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -663,7 +663,7 @@ func (s *testExecSuite) TestStreamAggRequiredRows(c *C) { aggFuncs := []*aggregation.AggFuncDesc{aggFunc} exec := buildStreamAggExecutor(sctx, ds, schema, aggFuncs, groupBy) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -722,7 +722,7 @@ func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) { aggFuncs := []*aggregation.AggFuncDesc{aggFunc} exec := buildHashAggExecutor(sctx, ds, schema, aggFuncs, groupBy) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -758,7 +758,7 @@ func (s *testExecSuite) TestMergeJoinRequiredRows(c *C) { exec := buildMergeJoinExec(ctx, joinType, innerSrc, outerSrc) c.Assert(exec.Open(context.Background()), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range required { chk.SetRequiredRows(required[i], ctx.GetSessionVars().MaxChunkSize) c.Assert(exec.Next(context.Background(), chunk.NewRecordBatch(chk)), IsNil) diff --git a/executor/explain.go b/executor/explain.go index 61ced6d564b62..37ef8273a55c2 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -72,7 +72,7 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.RecordBatch) error { func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) { if e.analyzeExec != nil { - chk := e.analyzeExec.newFirstChunk() + chk := newFirstChunk(e.analyzeExec) for { err := e.analyzeExec.Next(ctx, chunk.NewRecordBatch(chk)) if err != nil { diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 3d9371d14d396..ac8d82c4b42d6 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -365,11 +365,11 @@ func (ow *outerWorker) pushToChan(ctx context.Context, task *lookUpJoinTask, dst // buildTask builds a lookUpJoinTask and read outer rows. // When err is not nil, task must not be nil to send the error to the main thread via task. func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { - ow.executor.newFirstChunk() + newFirstChunk(ow.executor) task := &lookUpJoinTask{ doneCh: make(chan error, 1), - outerResult: ow.executor.newFirstChunk(), + outerResult: newFirstChunk(ow.executor), encodedLookUpKeys: chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, ow.ctx.GetSessionVars().MaxChunkSize), lookupMap: mvmap.NewMVMap(), } @@ -582,7 +582,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa return err } defer terror.Call(innerExec.Close) - innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize) + innerResult := chunk.NewList(retTypes(innerExec), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize) innerResult.GetMemTracker().SetLabel(innerResultLabel) innerResult.GetMemTracker().AttachTo(task.memTracker) for { @@ -594,7 +594,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa break } innerResult.Add(iw.executorChk) - iw.executorChk = innerExec.newFirstChunk() + iw.executorChk = newFirstChunk(innerExec) } task.innerResult = innerResult return nil diff --git a/executor/insert_common.go b/executor/insert_common.go index 49ac8953b5319..2939d946ee0c5 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -295,8 +295,8 @@ func (e *InsertValues) setValueForRefColumn(row []types.Datum, hasValue []bool) func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(ctx context.Context, rows [][]types.Datum) error) error { // process `insert|replace into ... select ... from ...` selectExec := e.children[0] - fields := selectExec.retTypes() - chk := selectExec.newFirstChunk() + fields := retTypes(selectExec) + chk := newFirstChunk(selectExec) iter := chunk.NewIterator4Chunk(chk) rows := make([][]types.Datum, 0, chk.Capacity()) diff --git a/executor/join.go b/executor/join.go index 3352a3307161a..37b1b6991c39f 100644 --- a/executor/join.go +++ b/executor/join.go @@ -178,10 +178,10 @@ func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyB var allTypes []*types.FieldType if isOuterKey { keyColIdx = e.outerKeyColIdx - allTypes = e.outerExec.retTypes() + allTypes = retTypes(e.outerExec) } else { keyColIdx = e.innerKeyColIdx - allTypes = e.innerExec.retTypes() + allTypes = retTypes(e.innerExec) } for _, i := range keyColIdx { @@ -268,7 +268,7 @@ var innerResultLabel fmt.Stringer = stringutil.StringerStr("innerResult") // fetchInnerRows fetches all rows from inner executor, and append them to // e.innerResult. func (e *HashJoinExec) fetchInnerRows(ctx context.Context) error { - e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) + e.innerResult = chunk.NewList(retTypes(e.innerExec), e.initCap, e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel(innerResultLabel) var err error @@ -276,7 +276,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context) error { if e.finished.Load().(bool) { return nil } - chk := e.children[e.innerIdx].newFirstChunk() + chk := newFirstChunk(e.children[e.innerIdx]) err = e.innerExec.Next(ctx, chunk.NewRecordBatch(chk)) if err != nil || chk.NumRows() == 0 { return err @@ -299,7 +299,7 @@ func (e *HashJoinExec) initializeForProbe() { e.outerChkResourceCh = make(chan *outerChkResource, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.outerChkResourceCh <- &outerChkResource{ - chk: e.outerExec.newFirstChunk(), + chk: newFirstChunk(e.outerExec), dest: e.outerResultChs[i], } } @@ -309,7 +309,7 @@ func (e *HashJoinExec) initializeForProbe() { e.joinChkResourceCh = make([]chan *chunk.Chunk, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) - e.joinChkResourceCh[i] <- e.newFirstChunk() + e.joinChkResourceCh[i] <- newFirstChunk(e) } // e.joinResultCh is for transmitting the join result chunks to the main @@ -625,9 +625,9 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error { } e.cursor = 0 e.innerRows = e.innerRows[:0] - e.outerChunk = e.outerExec.newFirstChunk() - e.innerChunk = e.innerExec.newFirstChunk() - e.innerList = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) + e.outerChunk = newFirstChunk(e.outerExec) + e.innerChunk = newFirstChunk(e.innerExec) + e.innerList = chunk.NewList(retTypes(e.innerExec), e.initCap, e.maxChunkSize) e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) diff --git a/executor/merge_join.go b/executor/merge_join.go index 2eca140bed902..b972c607ef1de 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -185,7 +185,7 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Create a new Chunk and append it to "resourceQueue" if there is no more // available chunk in "resourceQueue". if len(t.resourceQueue) == 0 { - newChunk := t.reader.newFirstChunk() + newChunk := newFirstChunk(t.reader) t.memTracker.Consume(newChunk.MemoryUsage()) t.resourceQueue = append(t.resourceQueue, newChunk) } @@ -222,7 +222,7 @@ func (e *MergeJoinExec) Open(ctx context.Context) error { e.childrenResults = make([]*chunk.Chunk, 0, len(e.children)) for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newFirstChunk()) + e.childrenResults = append(e.childrenResults, newFirstChunk(child)) } e.innerTable.memTracker = memory.NewTracker(innerTableLabel, -1) diff --git a/executor/pkg_test.go b/executor/pkg_test.go index 74a478aadce48..f7b73e09aad04 100644 --- a/executor/pkg_test.go +++ b/executor/pkg_test.go @@ -35,7 +35,7 @@ type MockExec struct { func (m *MockExec) Next(ctx context.Context, req *chunk.RecordBatch) error { req.Reset() - colTypes := m.retTypes() + colTypes := retTypes(m) for ; m.curRowIdx < len(m.Rows) && req.NumRows() < req.Capacity(); m.curRowIdx++ { curRow := m.Rows[m.curRowIdx] for i := 0; i < curRow.Len(); i++ { @@ -88,7 +88,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) { innerFilter := outerFilter.Clone() otherFilter := expression.NewFunctionInternal(sctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), col0, col1) joiner := newJoiner(sctx, plannercore.InnerJoin, false, - make([]types.Datum, innerExec.Schema().Len()), []expression.Expression{otherFilter}, outerExec.retTypes(), innerExec.retTypes()) + make([]types.Datum, innerExec.Schema().Len()), []expression.Expression{otherFilter}, retTypes(outerExec), retTypes(innerExec)) joinSchema := expression.NewSchema(col0, col1) join := &NestedLoopApplyExec{ baseExecutor: newBaseExecutor(sctx, joinSchema, nil), @@ -98,10 +98,10 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) { innerFilter: []expression.Expression{innerFilter}, joiner: joiner, } - join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.initCap, innerExec.maxChunkSize) - join.innerChunk = innerExec.newFirstChunk() - join.outerChunk = outerExec.newFirstChunk() - joinChk := join.newFirstChunk() + join.innerList = chunk.NewList(retTypes(innerExec), innerExec.initCap, innerExec.maxChunkSize) + join.innerChunk = newFirstChunk(innerExec) + join.outerChunk = newFirstChunk(outerExec) + joinChk := newFirstChunk(join) it := chunk.NewIterator4Chunk(joinChk) for rowIdx := 1; ; { err := join.Next(ctx, chunk.NewRecordBatch(joinChk)) diff --git a/executor/point_get.go b/executor/point_get.go index e9063386fbe0c..1d1d9881869c2 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -19,10 +19,8 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -38,22 +36,23 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { b.err = err return nil } - return &PointGetExecutor{ - ctx: b.ctx, - schema: p.Schema(), - tblInfo: p.TblInfo, - idxInfo: p.IndexInfo, - idxVals: p.IndexValues, - handle: p.Handle, - startTS: startTS, - } + e := &PointGetExecutor{ + baseExecutor: newBaseExecutor(b.ctx, p.Schema(), p.ExplainID()), + tblInfo: p.TblInfo, + idxInfo: p.IndexInfo, + idxVals: p.IndexValues, + handle: p.Handle, + startTS: startTS, + } + e.base().initCap = 1 + e.base().maxChunkSize = 1 + return e } // PointGetExecutor executes point select query. type PointGetExecutor struct { - ctx sessionctx.Context - schema *expression.Schema - tps []*types.FieldType + baseExecutor + tblInfo *model.TableInfo handle int64 idxInfo *model.IndexInfo @@ -241,22 +240,3 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo { } return nil } - -// Schema implements the Executor interface. -func (e *PointGetExecutor) Schema() *expression.Schema { - return e.schema -} - -func (e *PointGetExecutor) retTypes() []*types.FieldType { - if e.tps == nil { - e.tps = make([]*types.FieldType, e.schema.Len()) - for i := range e.schema.Columns { - e.tps[i] = e.schema.Columns[i].RetType - } - } - return e.tps -} - -func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk { - return chunk.New(e.retTypes(), 1, 1) -} diff --git a/executor/projection.go b/executor/projection.go index 01d3aff16b7b9..e22b22d3e4fc3 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -91,7 +91,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error { } if e.isUnparallelExec() { - e.childResult = e.children[0].newFirstChunk() + e.childResult = newFirstChunk(e.children[0]) } return nil @@ -236,11 +236,11 @@ func (e *ProjectionExec) prepare(ctx context.Context) { }) e.fetcher.inputCh <- &projectionInput{ - chk: e.children[0].newFirstChunk(), + chk: newFirstChunk(e.children[0]), targetWorker: e.workers[i], } e.fetcher.outputCh <- &projectionOutput{ - chk: e.newFirstChunk(), + chk: newFirstChunk(e), done: make(chan error, 1), } } diff --git a/executor/radix_hash_join.go b/executor/radix_hash_join.go index cc0633f391bb8..c32e229bfa2cc 100644 --- a/executor/radix_hash_join.go +++ b/executor/radix_hash_join.go @@ -186,7 +186,7 @@ func (e *RadixHashJoinExec) preAlloc4InnerParts() (err error) { func (e *RadixHashJoinExec) getPartition(idx uint32) partition { if e.innerParts[idx] == nil { e.numNonEmptyPart++ - e.innerParts[idx] = chunk.New(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) + e.innerParts[idx] = chunk.New(retTypes(e.innerExec), e.initCap, e.maxChunkSize) } return e.innerParts[idx] } diff --git a/executor/show.go b/executor/show.go index f25dcd603fdbe..b61941c203856 100644 --- a/executor/show.go +++ b/executor/show.go @@ -79,7 +79,7 @@ type ShowExec struct { func (e *ShowExec) Next(ctx context.Context, req *chunk.RecordBatch) error { req.GrowAndReset(e.maxChunkSize) if e.result == nil { - e.result = e.newFirstChunk() + e.result = newFirstChunk(e) err := e.fetchAll() if err != nil { return errors.Trace(err) diff --git a/executor/sort.go b/executor/sort.go index 8e2e221a6828a..3e41a1b1b4aac 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -105,12 +105,12 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.RecordBatch) error { } func (e *SortExec) fetchRowChunks(ctx context.Context) error { - fields := e.retTypes() + fields := retTypes(e) e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize) e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel) for { - chk := e.children[0].newFirstChunk() + chk := newFirstChunk(e.children[0]) err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk)) if err != nil { return err @@ -275,11 +275,11 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.RecordBatch) error { func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { e.chkHeap = &topNChunkHeap{e} - e.rowChunks = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) + e.rowChunks = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize) e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel) for uint64(e.rowChunks.Len()) < e.totalLimit { - srcChk := e.children[0].newFirstChunk() + srcChk := newFirstChunk(e.children[0]) // adjust required rows by total limit srcChk.SetRequiredRows(int(e.totalLimit-uint64(e.rowChunks.Len())), e.maxChunkSize) err := e.children[0].Next(ctx, chunk.NewRecordBatch(srcChk)) @@ -305,7 +305,7 @@ func (e *TopNExec) executeTopN(ctx context.Context) error { // The number of rows we loaded may exceeds total limit, remove greatest rows by Pop. heap.Pop(e.chkHeap) } - childRowChk := e.children[0].newFirstChunk() + childRowChk := newFirstChunk(e.children[0]) for { err := e.children[0].Next(ctx, chunk.NewRecordBatch(childRowChk)) if err != nil { @@ -349,7 +349,7 @@ func (e *TopNExec) processChildChk(childRowChk *chunk.Chunk) error { // but we want descending top N, then we will keep all data in memory. // But if data is distributed randomly, this function will be called log(n) times. func (e *TopNExec) doCompaction() error { - newRowChunks := chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) + newRowChunks := chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize) newRowPtrs := make([]chunk.RowPtr, 0, e.rowChunks.Len()) for _, rowPtr := range e.rowPtrs { newRowPtr := newRowChunks.AppendRow(e.rowChunks.GetRow(rowPtr)) diff --git a/executor/table_reader.go b/executor/table_reader.go index 327b148b02de7..6ad3eec52e918 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -175,7 +175,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if err != nil { return nil, err } - result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans)) if err != nil { return nil, err } diff --git a/executor/table_readers_required_rows_test.go b/executor/table_readers_required_rows_test.go index 21819329d6a82..0d7163d431c0c 100644 --- a/executor/table_readers_required_rows_test.go +++ b/executor/table_readers_required_rows_test.go @@ -178,7 +178,7 @@ func (s *testExecSuite) TestTableReaderRequiredRows(c *C) { ctx := mockDistsqlSelectCtxSet(testCase.totalRows, testCase.expectedRowsDS) exec := buildTableReader(sctx) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) @@ -230,7 +230,7 @@ func (s *testExecSuite) TestIndexReaderRequiredRows(c *C) { ctx := mockDistsqlSelectCtxSet(testCase.totalRows, testCase.expectedRowsDS) exec := buildIndexReader(sctx) c.Assert(exec.Open(ctx), IsNil) - chk := exec.newFirstChunk() + chk := newFirstChunk(exec) for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) diff --git a/executor/union_scan.go b/executor/union_scan.go index 9f95c88d075e0..5a8de698f49ac 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -117,7 +117,7 @@ func (us *UnionScanExec) Open(ctx context.Context) error { if err := us.baseExecutor.Open(ctx); err != nil { return err } - us.snapshotChunkBuffer = us.newFirstChunk() + us.snapshotChunkBuffer = newFirstChunk(us) return nil } @@ -133,7 +133,7 @@ func (us *UnionScanExec) Next(ctx context.Context, req *chunk.RecordBatch) error defer func() { us.runtimeStats.Record(time.Since(start), req.NumRows()) }() } req.GrowAndReset(us.maxChunkSize) - mutableRow := chunk.MutRowFromTypes(us.retTypes()) + mutableRow := chunk.MutRowFromTypes(retTypes(us)) for i, batchSize := 0, req.Capacity(); i < batchSize; i++ { row, err := us.getOneRow(ctx) if err != nil { @@ -214,7 +214,7 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err // commit, but for simplicity, we don't handle it here. continue } - us.snapshotRows = append(us.snapshotRows, row.GetDatumRow(us.children[0].retTypes())) + us.snapshotRows = append(us.snapshotRows, row.GetDatumRow(retTypes(us.children[0]))) } } return us.snapshotRows[0], nil @@ -295,7 +295,7 @@ func (us *UnionScanExec) rowWithColsInTxn(t table.Table, h int64, cols []*table. func (us *UnionScanExec) buildAndSortAddedRows(t table.Table) error { us.addedRows = make([][]types.Datum, 0, len(us.dirty.addedRows)) - mutableRow := chunk.MutRowFromTypes(us.retTypes()) + mutableRow := chunk.MutRowFromTypes(retTypes(us)) cols := t.WritableCols() for h := range us.dirty.addedRows { newData := make([]types.Datum, 0, us.schema.Len()) diff --git a/executor/update.go b/executor/update.go index c2cdbaf9f9d55..f6840f68ff197 100644 --- a/executor/update.go +++ b/executor/update.go @@ -165,7 +165,7 @@ func (e *UpdateExec) Next(ctx context.Context, req *chunk.RecordBatch) error { } func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { - fields := e.children[0].retTypes() + fields := retTypes(e.children[0]) schema := e.children[0].Schema() colsInfo := make([]*table.Column, len(fields)) for id, cols := range schema.TblID2Handle { @@ -178,7 +178,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { } } globalRowIdx := 0 - chk := e.children[0].newFirstChunk() + chk := newFirstChunk(e.children[0]) e.evalBuffer = chunk.MutRowFromTypes(fields) for { err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk)) diff --git a/executor/window.go b/executor/window.go index bf4e5a2dab0b1..0b51691f139e0 100644 --- a/executor/window.go +++ b/executor/window.go @@ -130,7 +130,7 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk return errors.Trace(err) } - childResult := e.children[0].newFirstChunk() + childResult := newFirstChunk(e.children[0]) err = e.children[0].Next(ctx, &chunk.RecordBatch{Chunk: childResult}) if err != nil { return errors.Trace(err) From 67b47bc829717ecb05b61232ec128108207c70dc Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 19 Jun 2019 16:37:40 +0800 Subject: [PATCH 5/7] ddl: skip race test for Tests that change global config (#10855) --- ddl/db_integration_test.go | 4 ++++ ddl/db_test.go | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 74a6dac86f674..f7fbf9a24b0b2 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/israce" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" @@ -1494,6 +1495,9 @@ func (s *testIntegrationSuite5) TestFulltextIndexIgnore(c *C) { } func (s *testIntegrationSuite1) TestTreatOldVersionUTF8AsUTF8MB4(c *C) { + if israce.RaceEnabled { + c.Skip("skip race test") + } s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use test") s.tk.MustExec("drop table if exists t") diff --git a/ddl/db_test.go b/ddl/db_test.go index d176d60130f0b..e9d58f679a9c3 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/admin" + "github.com/pingcap/tidb/util/israce" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" @@ -2764,6 +2765,9 @@ func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) { } func (s *testDBSuite2) TestLockTables(c *C) { + if israce.RaceEnabled { + c.Skip("skip race test") + } s.tk = testkit.NewTestKit(c, s.store) tk := s.tk tk.MustExec("use test") @@ -2960,6 +2964,9 @@ func (s *testDBSuite2) TestLockTables(c *C) { // TestConcurrentLockTables test concurrent lock/unlock tables. func (s *testDBSuite2) TestConcurrentLockTables(c *C) { + if israce.RaceEnabled { + c.Skip("skip race test") + } s.tk = testkit.NewTestKit(c, s.store) tk2 := testkit.NewTestKit(c, s.store) tk := s.tk From 90096b3da50b1480ab73440116e7c089dd3f99a8 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Wed, 19 Jun 2019 19:45:58 +0800 Subject: [PATCH 6/7] executor: fix wrong row count in fast analyze (#10859) --- executor/analyze.go | 9 ++++++--- executor/analyze_test.go | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/executor/analyze.go b/executor/analyze.go index 62bd924ce4da1..d6f650a723e5a 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -990,11 +990,14 @@ func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollec } data = append(data, bytes) } - stats := domain.GetDomain(e.ctx).StatsHandle() + handle := domain.GetDomain(e.ctx).StatsHandle() + tblStats := handle.GetTableStats(e.tblInfo) rowCount := int64(e.rowCount) - if stats.Lease() > 0 { - rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount) + if handle.Lease() > 0 && !tblStats.Pseudo { + rowCount = mathutil.MinInt64(tblStats.Count, rowCount) } + // Adjust the row count in case the count of `tblStats` is not accurate and too small. + rowCount = mathutil.MaxInt64(rowCount, int64(len(collector.Samples))) // build CMSketch var ndv, scaleRatio uint64 collector.CMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount)) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 5c9babd34a980..5076e06c5738d 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -402,6 +402,7 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int primary key)") + c.Assert(s.dom.StatsHandle().Update(s.dom.InfoSchema()), IsNil) tk.MustExec("set @@session.tidb_enable_fast_analyze=1") tk.MustExec("set @@session.tidb_build_stats_concurrency=1") tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) From 711582a674572ef5b1424eed943ae28f63612171 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Wed, 19 Jun 2019 20:07:31 +0800 Subject: [PATCH 7/7] executor: log inconsistent handles when inconsistent-check fail in `IndexLookupReader` (#10838) --- executor/distsql.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/executor/distsql.go b/executor/distsql.go index 2b380313355d0..b7972d292e6f0 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -839,6 +839,17 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er } if len(w.idxLookup.tblPlans) == 1 { + obtainedHandlesMap := make(map[int64]struct{}, len(task.rows)) + for _, row := range task.rows { + handle := row.GetInt64(w.handleIdx) + obtainedHandlesMap[handle] = struct{}{} + } + + logutil.Logger(ctx).Error("inconsistent index handles", zap.String("index", w.idxLookup.index.Name.O), + zap.Int("index_cnt", handleCnt), zap.Int("table_cnt", len(task.rows)), + zap.Int64s("missing_handles", GetLackHandles(task.handles, obtainedHandlesMap)), + zap.Int64s("total_handles", task.handles)) + // table scan in double read can never has conditions according to convertToIndexScan. // if this table scan has no condition, the number of rows it returns must equal to the length of handles. return errors.Errorf("inconsistent index %s handle count %d isn't equal to value count %d",