Skip to content

Commit

Permalink
Merge branch 'master' into ref_40330_2
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Jan 17, 2023
2 parents ffbe8fd + 4620df6 commit fbc64e6
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 5 deletions.
21 changes: 20 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,9 @@ type LimitExec struct {

// columnIdxsUsedByChild keep column indexes of child executor used for inline projection
columnIdxsUsedByChild []int

// Log the close time when opentracing is enabled.
span opentracing.Span
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -1470,13 +1473,29 @@ func (e *LimitExec) Open(ctx context.Context) error {
e.childResult = tryNewCacheChunk(e.children[0])
e.cursor = 0
e.meetFirstBatch = e.begin == 0
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
e.span = span
}
return nil
}

// Close implements the Executor Close interface.
func (e *LimitExec) Close() error {
start := time.Now()

e.childResult = nil
return e.baseExecutor.Close()
err := e.baseExecutor.Close()

elapsed := time.Since(start)
if elapsed > time.Millisecond {
logutil.BgLogger().Info("limit executor close takes a long time",
zap.Duration("elapsed", elapsed))
if e.span != nil {
span1 := e.span.Tracer().StartSpan("limitExec.Close", opentracing.ChildOf(e.span.Context()), opentracing.StartTime(start))
defer span1.Finish()
}
}
return err
}

func (e *LimitExec) adjustRequiredRows(chk *chunk.Chunk) *chunk.Chunk {
Expand Down
2 changes: 2 additions & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//util/memory",
"//util/ranger",
"//util/sqlexec",
"//util/syncutil",
"//util/timeutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down Expand Up @@ -72,6 +73,7 @@ go_test(
],
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//config",
Expand Down
12 changes: 10 additions & 2 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/syncutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
atomic2 "go.uber.org/atomic"
Expand All @@ -70,7 +71,7 @@ type Handle struct {
initStatsCtx sessionctx.Context

mu struct {
sync.RWMutex
syncutil.RWMutex
ctx sessionctx.Context
// rateMap contains the error rate delta from feedback.
rateMap errorRateDeltaMap
Expand Down Expand Up @@ -361,8 +362,15 @@ func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.Ta
return "", err
}

// IsTableLocked check whether table is locked in handle
// IsTableLocked check whether table is locked in handle with Handle.Mutex
func (h *Handle) IsTableLocked(tableID int64) bool {
h.mu.RLock()
defer h.mu.RUnlock()
return h.isTableLocked(tableID)
}

// IsTableLocked check whether table is locked in handle without Handle.Mutex
func (h *Handle) isTableLocked(tableID int64) bool {
return isTableLocked(h.tableLocked, tableID)
}

Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up
startTS := txn.StartTS()
updateStatsMeta := func(id int64) error {
var err error
if h.IsTableLocked(id) {
// This lock is already locked on it so it use isTableLocked without lock.
if h.isTableLocked(id) {
if delta.Delta < 0 {
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_table_locked set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?", startTS, -delta.Delta, delta.Count, id, -delta.Delta)
} else {
Expand Down
1 change: 1 addition & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//util/trxevents",
"@com_github_dgraph_io_ristretto//:ristretto",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
Expand Down
11 changes: 10 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"unsafe"

"github.com/gogo/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
Expand Down Expand Up @@ -384,12 +385,20 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
builder.reverse()
}
tasks := builder.build()
if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
elapsed := time.Since(start)
if elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildCopTasks takes too much time",
zap.Duration("elapsed", elapsed),
zap.Int("range len", rangesLen),
zap.Int("task len", len(tasks)))
}
if elapsed > time.Millisecond {
ctx := bo.GetCtx()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("copr.buildCopTasks", opentracing.ChildOf(span.Context()), opentracing.StartTime(start))
defer span1.Finish()
}
}
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum()))
return tasks, nil
}
Expand Down

0 comments on commit fbc64e6

Please sign in to comment.