Skip to content

Commit

Permalink
*: Log SQL statement when coprocessor encounteres lock (#27735) (#27924)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Oct 19, 2021
1 parent 436c666 commit 333edd8
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 35 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/memory"
tmock "github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -394,7 +395,7 @@ type mockChecksumKVClient struct {
}

// a mock client for checksum request
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response {
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
if c.curErrCount < c.maxErrCount {
c.curErrCount++
return &mockErrorResponse{err: "tikv timeout"}
Expand Down
19 changes: 16 additions & 3 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
Expand Down Expand Up @@ -74,7 +77,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
kvReq.Streaming = false
}
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction)
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
// Note: Do not assume this callback will be invoked within the same goroutine.
if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil {
logutil.Logger(ctx).Debug("coprocessor encounters lock",
zap.Uint64("startTS", kvReq.StartTs),
zap.Stringer("lock", copMeetLock.LockInfo),
zap.String("stmt", originalSQL))
}
}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down Expand Up @@ -136,7 +149,7 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq
// Analyze do a analyze request.
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{},
isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false)
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand All @@ -159,7 +172,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars inte
func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{}) (SelectResult, error) {
// FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit
// Checksum function signature. The two-way dependence should be removed in future.
resp := client.Send(ctx, kvReq, vars, nil, false)
resp := client.Send(ctx, kvReq, vars, nil, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
2 changes: 1 addition & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,7 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G
return err
}

resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false)
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false, nil)
if resp == nil {
err := errors.New("client returns nil response")
return err
Expand Down
3 changes: 2 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -202,7 +203,7 @@ type Transaction interface {
// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Send(ctx context.Context, req *Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response
Send(ctx context.Context, req *Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) Response

// IsRequestTypeSupported checks if reqType and subType is supported.
IsRequestTypeSupported(reqType, subType int64) bool
Expand Down
27 changes: 19 additions & 8 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
Expand All @@ -67,7 +68,7 @@ type CopClient struct {
}

// Send builds the request and gets the coprocessor iterator response.
func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response {
func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
vars, ok := variables.(*tikv.Variables)
if !ok {
return copErrorResponse{errors.Errorf("unsupported variables:%+v", variables)}
Expand All @@ -79,7 +80,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -137,6 +138,8 @@ type copTask struct {
storeAddr string
cmdType tikvrpc.CmdType
storeType kv.StoreType

eventCb trxevents.EventCallback
}

func (r *copTask) String() string {
Expand All @@ -147,7 +150,7 @@ func (r *copTask) String() string {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand Down Expand Up @@ -180,6 +183,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
eventCb: eventCb,
})
i = nextI
}
Expand Down Expand Up @@ -878,11 +882,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req)
return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
// Be care that we didn't redact the SQL statement because the log is DEBUG level.
if task.eventCb != nil {
task.eventCb(trxevents.WrapCopMeetLock(&trxevents.CopMeetLock{
LockInfo: lockErr,
}))
} else {
logutil.Logger(bo.GetCtx()).Debug("coprocessor encounters lock",
zap.Stringer("lock", lockErr))
}
msBeforeExpired, err1 := worker.kvclient.ResolveLocks(bo.TiKVBackoffer(), worker.req.StartTs, []*txnlock.Lock{txnlock.NewLock(lockErr)})
err1 = derr.ToTiDBErr(err1)
if err1 != nil {
Expand All @@ -897,7 +908,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
logutil.BgLogger().Warn("other error",
logutil.Logger(bo.GetCtx()).Warn("other error",
zap.Uint64("txnStartTS", worker.req.StartTs),
zap.Uint64("regionID", task.region.GetID()),
zap.String("storeAddr", task.storeAddr),
Expand Down Expand Up @@ -1026,7 +1037,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
}
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req)
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.eventCb)
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
Expand Down
40 changes: 20 additions & 20 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,103 +49,103 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
req := &kv.Request{}
flashReq := &kv.Request{}
flashReq.StoreType = kv.TiFlash
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
Expand Down Expand Up @@ -218,7 +218,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
Expand All @@ -232,7 +232,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
cache.InvalidateCachedRegion(tasks[1].region)

req.Desc = true
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 3)
s.taskEqual(c, tasks[2], regionIDs[0], "a", "m")
Expand Down
3 changes: 2 additions & 1 deletion util/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
)

// Client implement kv.Client interface, mocked from "CopClient" defined in
Expand All @@ -28,6 +29,6 @@ type Client struct {
}

// Send implement kv.Client interface.
func (c *Client) Send(ctx context.Context, req *kv.Request, kv interface{}, sessionMemTracker *memory.Tracker, enabledRateLimit bool) kv.Response {
func (c *Client) Send(ctx context.Context, req *kv.Request, kv interface{}, sessionMemTracker *memory.Tracker, enabledRateLimit bool, eventCb trxevents.EventCallback) kv.Response {
return c.MockResponse
}
58 changes: 58 additions & 0 deletions util/trxevents/trx_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trxevents

import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

// EventType represents the type of a transaction event.
type EventType = int

const (
// EventTypeCopMeetLock stands for the CopMeetLock event type.
EventTypeCopMeetLock = iota
)

// CopMeetLock represents an event that coprocessor reading encounters lock.
type CopMeetLock struct {
LockInfo *kvrpcpb.LockInfo
}

// TransactionEvent represents a transaction event that may belong to any of the possible types.
type TransactionEvent struct {
eventType EventType
inner interface{}
}

// GetCopMeetLock tries to extract the inner CopMeetLock event from a TransactionEvent. Returns nil if it's not a
// CopMeetLock event.
func (e TransactionEvent) GetCopMeetLock() *CopMeetLock {
if e.eventType == EventTypeCopMeetLock {
return e.inner.(*CopMeetLock)
}
return nil
}

// WrapCopMeetLock wraps a CopMeetLock event into a TransactionEvent object.
func WrapCopMeetLock(copMeetLock *CopMeetLock) TransactionEvent {
return TransactionEvent{
eventType: EventTypeCopMeetLock,
inner: copMeetLock,
}
}

// EventCallback is the callback type that handles `TransactionEvent`s.
type EventCallback = func(event TransactionEvent)

0 comments on commit 333edd8

Please sign in to comment.