Skip to content

Commit

Permalink
Merge branch 'master' into fix-41086
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Feb 7, 2023
2 parents d9898f9 + 98aff8c commit 0389e52
Show file tree
Hide file tree
Showing 71 changed files with 850 additions and 430 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ def go_deps():
name = "com_github_cloudfoundry_gosigar",
build_file_proto_mode = "disable",
importpath = "github.com/cloudfoundry/gosigar",
sum = "h1:T3MoGdugg1vdHn8Az7wDn7cZ4+QCjZph+eXf2CjSjo4=",
version = "v1.3.4",
sum = "h1:gIc08FbB3QPb+nAQhINIK/qhf5REKkY0FTGgRGXkcVc=",
version = "v1.3.6",
)

go_repository(
Expand Down Expand Up @@ -1120,8 +1120,8 @@ def go_deps():
name = "com_github_fsnotify_fsnotify",
build_file_proto_mode = "disable_global",
importpath = "github.com/fsnotify/fsnotify",
sum = "h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=",
version = "v1.5.4",
sum = "h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=",
version = "v1.6.0",
)
go_repository(
name = "com_github_fsouza_fake_gcs_server",
Expand Down Expand Up @@ -3118,8 +3118,8 @@ def go_deps():
name = "com_github_nxadm_tail",
build_file_proto_mode = "disable_global",
importpath = "github.com/nxadm/tail",
sum = "h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=",
version = "v1.4.4",
sum = "h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=",
version = "v1.4.8",
)
go_repository(
name = "com_github_oklog_run",
Expand Down
35 changes: 21 additions & 14 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,29 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
return stats, err
}
retryErr := utils.WithRetry(
ctx,
func() error {
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
return stats, err
}

runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
// Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
log.Error("region flashback prepare get error")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))
runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
// Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
log.Warn("region flashback prepare get error")
return errors.Trace(err)
}
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))
return nil
},
utils.NewFlashBackBackoffer(),
)

return nil
recovery.progress.Inc()
return retryErr
}

// flashback the region data to version resolveTS
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
resetTSRetryTimeExt = 600
resetTSWaitIntervalExt = 500 * time.Millisecond
resetTSMaxWaitIntervalExt = 300 * time.Second

// region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV.
flashbackRetryTime = 3
flashbackWaitInterval = 3000 * time.Millisecond
flashbackMaxWaitInterval = 15 * time.Second
)

// RetryState is the mutable state needed for retrying.
Expand Down Expand Up @@ -204,3 +209,34 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
func (bo *pdReqBackoffer) Attempt() int {
return bo.attempt
}

type flashbackBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
}

// NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewFlashBackBackoffer() Backoffer {
return &flashbackBackoffer{
attempt: flashbackRetryTime,
delayTime: flashbackWaitInterval,
maxDelayTime: flashbackMaxWaitInterval,
}
}

// retry 3 times when prepare flashback failure.
func (bo *flashbackBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
log.Warn("region may not ready to serve, retry it...", zap.Error(err))

if bo.delayTime > bo.maxDelayTime {
return bo.maxDelayTime
}
return bo.delayTime
}

func (bo *flashbackBackoffer) Attempt() int {
return bo.attempt
}
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ go_library(
"//util/filter",
"//util/gcutil",
"//util/generic",
"//util/gpool",
"//util/gpool/spmc",
"//util/hack",
"//util/intest",
Expand Down
3 changes: 2 additions & 1 deletion ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gpool"
"github.com/pingcap/tidb/util/gpool/spmc"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -258,7 +259,7 @@ func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, con
// TODO: add test: if all tidbs can't get the unmark backfill job(a tidb mark a backfill job, other tidbs returned, then the tidb can't handle this job.)
if dbterror.ErrDDLJobNotFound.Equal(err) {
logutil.BgLogger().Info("no backfill job, handle backfill task finished")
return nil, err
return nil, gpool.ErrProducerClosed
}
if kv.ErrWriteConflict.Equal(err) {
logutil.BgLogger().Info("GetAndMarkBackfillJobsForOneEle failed", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ go_library(
"//util/logutil",
"//util/memory",
"//util/ranger",
"//util/tracing",
"//util/trxevents",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
9 changes: 3 additions & 6 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strconv"
"unsafe"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
Expand Down Expand Up @@ -64,11 +64,8 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
// Select sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("distsql.Select", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "distsql.Select")
defer r.End()

// For testing purpose.
if hook := ctx.Value("CheckSelectRequestHook"); hook != nil {
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild
if limit != nil && limit.Limit < estimatedRegionRowCount {
builder.Request.Concurrency = 1
}
builder.Request.LimitSize = limit.GetLimit()
}
return builder
}
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ func TestScanLimitConcurrency(t *testing.T) {
Build()
require.NoError(t, err)
require.Equal(t, tt.concurrency, actual.Concurrency)
require.Equal(t, actual.LimitSize, tt.limit)
})
}
}
Expand Down
14 changes: 8 additions & 6 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) {
tne.err = err
return in, true
}
tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L, IsView: isView}
if tp.DBName == "" {
tp.DBName = tne.curDB.L
}
if _, ok := tne.names[tp]; !ok {
tne.names[tp] = struct{}{}
if t.TableInfo != nil {
tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L, IsView: isView}
if tp.DBName == "" {
tp.DBName = tne.curDB.L
}
if _, ok := tne.names[tp]; !ok {
tne.names[tp] = struct{}{}
}
}
} else if s, ok := in.(*ast.SelectStmt); ok {
if s.With != nil && len(s.With.CTEs) > 0 {
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ go_library(
"//util/tls",
"//util/topsql",
"//util/topsql/state",
"//util/tracing",
"@com_github_burntsushi_toml//:toml",
"@com_github_gogo_protobuf//proto",
"@com_github_ngaut_pools//:pools",
Expand Down
19 changes: 8 additions & 11 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand Down Expand Up @@ -65,6 +64,7 @@ import (
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/tracing"
"github.com/prometheus/client_golang/prometheus"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -283,12 +283,12 @@ func (a *ExecStmt) GetStmtNode() ast.StmtNode {

// PointGet short path for point exec directly from plan, keep only necessary steps
func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("ExecStmt.PointGet", opentracing.ChildOf(span.Context()))
span1.LogKV("sql", a.OriginText())
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
r, ctx := tracing.StartRegionEx(ctx, "ExecStmt.PointGet")
defer r.End()
if r.Span != nil {
r.Span.LogKV("sql", a.OriginText())
}

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
Expand Down Expand Up @@ -921,11 +921,8 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
sctx := a.Ctx
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.handleNoDelayExecutor")
defer r.End()

var err error
defer func() {
Expand Down
10 changes: 4 additions & 6 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"strings"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/tracing"
"go.uber.org/zap"
)

Expand All @@ -56,11 +56,9 @@ type Compiler struct {

// Compile compiles an ast.StmtNode to a physical plan.
func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecStmt, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.Compile", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.Compile")
defer r.End()

defer func() {
r := recover()
if r == nil {
Expand Down
21 changes: 7 additions & 14 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -68,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/tracing"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
tikvutil "github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -314,14 +314,10 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
if atomic.LoadUint32(&sessVars.Killed) == 1 {
return ErrQueryInterrupted
}
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("%T.Next", e), opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
if trace.IsEnabled() {
defer trace.StartRegion(ctx, fmt.Sprintf("%T.Next", e)).End()
}

r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e))
defer r.End()

if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
registerSQLAndPlanInExecForTopSQL(sessVars)
}
Expand Down Expand Up @@ -1527,11 +1523,8 @@ func init() {
s.RewritePhaseInfo.DurationPreprocessSubQuery += time.Since(begin)
}(time.Now())

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.EvalSubQuery", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
r, ctx := tracing.StartRegionEx(ctx, "executor.EvalSubQuery")
defer r.End()

e := newExecutorBuilder(sctx, is, nil)
exec := e.build(p)
Expand Down
Loading

0 comments on commit 0389e52

Please sign in to comment.