Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jan 18, 2024
2 parents 4ce3276 + c181d4f commit 1e6ac4a
Show file tree
Hide file tree
Showing 239 changed files with 13,525 additions and 9,953 deletions.
504 changes: 252 additions & 252 deletions DEPS.bzl

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ gen_mock: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension > pkg/disttask/framework/scheduler/mock/scheduler_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute SubtaskExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute StepExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor > pkg/disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec > pkg/disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go
Expand Down
20 changes: 10 additions & 10 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@ versions.check(minimum_bazel_version = "6.0.0")

http_archive(
name = "io_bazel_rules_go",
sha256 = "7c76d6236b28ff695aa28cf35f95de317a9472fd1fb14ac797c9bf684f09b37c",
sha256 = "de7974538c31f76658e0d333086c69efdf6679dbc6a466ac29e65434bf47076d",
urls = [
"http://bazel-cache.pingcap.net:8080/bazelbuild/rules_go/releases/download/v0.44.2/rules_go-v0.44.2.zip",
"http://ats.apps.svc/bazelbuild/rules_go/releases/download/v0.44.2/rules_go-v0.44.2.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.44.2/rules_go-v0.44.2.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.44.2/rules_go-v0.44.2.zip",
"http://bazel-cache.pingcap.net:8080/bazelbuild/rules_go/releases/download/v0.45.0/rules_go-v0.45.0.zip",
"http://ats.apps.svc/bazelbuild/rules_go/releases/download/v0.45.0/rules_go-v0.45.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.45.0/rules_go-v0.45.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.45.0/rules_go-v0.45.0.zip",
],
)

http_archive(
name = "bazel_gazelle",
sha256 = "b7387f72efb59f876e4daae42f1d3912d0d45563eac7cb23d1de0b094ab588cf",
sha256 = "32938bda16e6700063035479063d9d24c60eda8d79fd4739563f50d331cb3209",
urls = [
"http://bazel-cache.pingcap.net:8080/bazelbuild/bazel-gazelle/releases/download/v0.34.0/bazel-gazelle-v0.34.0.tar.gz",
"https://mirror.bazel.build/github.com/bazelbuild/bazel-gazelle/releases/download/v0.34.0/bazel-gazelle-v0.34.0.tar.gz",
"https://github.com/bazelbuild/bazel-gazelle/releases/download/v0.34.0/bazel-gazelle-v0.34.0.tar.gz",
"http://ats.apps.svc/bazelbuild/bazel-gazelle/releases/download/v0.34.0/bazel-gazelle-v0.34.0.tar.gz",
"http://bazel-cache.pingcap.net:8080/bazelbuild/bazel-gazelle/releases/download/v0.35.0/bazel-gazelle-v0.35.0.tar.gz",
"https://mirror.bazel.build/github.com/bazelbuild/bazel-gazelle/releases/download/v0.35.0/bazel-gazelle-v0.35.0.tar.gz",
"https://github.com/bazelbuild/bazel-gazelle/releases/download/v0.35.0/bazel-gazelle-v0.35.0.tar.gz",
"http://ats.apps.svc/bazelbuild/bazel-gazelle/releases/download/v0.35.0/bazel-gazelle-v0.35.0.tar.gz",
],
)

Expand Down
8 changes: 4 additions & 4 deletions br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {
return nil
}

if cfg.IgnoreStats {
// Do not run stat worker in BR.
session.DisableStats4Test()
}
// No need to cache the coproceesor result
config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB = 0

if err := task.RunBackup(ctx, tidbGlue, cmdName, &cfg); err != nil {
log.Error("failed to backup", zap.Error(err))
Expand Down Expand Up @@ -110,6 +108,8 @@ func NewBackupCommand() *cobra.Command {
build.LogInfo(build.BR)
utils.LogEnvVariables()
task.LogArguments(c)
// Do not run stat worker in BR.
session.DisableStats4Test()

// Do not run ddl worker in BR.
config.GetGlobalConfig().Instance.TiDBEnableDDL.Store(false)
Expand Down
5 changes: 4 additions & 1 deletion br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
return nil
}

// No need to cache the coproceesor result
config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB = 0

if err := task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg); err != nil {
log.Error("failed to restore", zap.Error(err))
printWorkaroundOnFullRestoreError(command, err)
Expand All @@ -85,7 +88,7 @@ func printWorkaroundOnFullRestoreError(command *cobra.Command, err error) {
fmt.Println("# you can drop existing databases and tables and start restore again")
case errors.ErrorEqual(err, berrors.ErrRestoreIncompatibleSys):
fmt.Println("# the target cluster is not compatible with the backup data,")
fmt.Println("# you can remove 'with-sys-table' flag to skip restoring system tables")
fmt.Println("# you can use '--with-sys-table=false' to skip restoring system tables")
}
fmt.Println("#######################################################################")
}
Expand Down
2 changes: 2 additions & 0 deletions br/cmd/tidb-lightning-ctl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
"//br/pkg/lightning/importer",
"//br/pkg/lightning/tikv",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_tikv_pd_client//http",
],
)

Expand Down
35 changes: 24 additions & 11 deletions br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/importer"
"github.com/pingcap/tidb/br/pkg/lightning/tikv"
pdhttp "github.com/tikv/pd/client/http"
)

func main() {
Expand Down Expand Up @@ -92,14 +95,24 @@ func run() error {
return err
}

var opts []pdhttp.ClientOption
if tls != nil {
opts = append(opts, pdhttp.WithTLSConfig(tls.TLSConfig()))
}
cli := pdhttp.NewClient(
"lightning-ctl",
strings.Split(cfg.TiDB.PdAddr, ","),
opts...)
defer cli.Close()

if *compact {
return errors.Trace(compactCluster(ctx, cfg, tls))
return errors.Trace(compactCluster(ctx, cli, tls))
}
if *flagFetchMode {
return errors.Trace(fetchMode(ctx, cfg, tls))
return errors.Trace(fetchMode(ctx, cli, tls))
}
if len(*mode) != 0 {
return errors.Trace(lightning.SwitchMode(ctx, cfg, tls, *mode))
return errors.Trace(lightning.SwitchMode(ctx, cli, tls, *mode))
}

if len(*cpRemove) != 0 {
Expand All @@ -122,23 +135,23 @@ func run() error {
return nil
}

func compactCluster(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
func compactCluster(ctx context.Context, cli pdhttp.Client, tls *common.TLS) error {
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
cli,
metapb.StoreState_Offline,
func(c context.Context, store *pdhttp.MetaStore) error {
return tikv.Compact(c, tls, store.Address, importer.FullLevelCompact, "")
},
)
}

func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
func fetchMode(ctx context.Context, cli pdhttp.Client, tls *common.TLS) error {
return tikv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
cli,
metapb.StoreState_Offline,
func(c context.Context, store *pdhttp.MetaStore) error {
mode, err := tikv.FetchMode(c, tls, store.Address)
if err != nil {
fmt.Fprintf(os.Stderr, "%-30s | Error: %v\n", store.Address, err)
Expand Down
53 changes: 53 additions & 0 deletions br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "prepare_snap",
srcs = [
"env.go",
"errors.go",
"prepare.go",
"stream.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/backup/prepare_snap",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//br/pkg/utils",
"//pkg/util/engine",
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

go_test(
name = "prepare_snap_test",
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 7,
deps = [
":prepare_snap",
"//br/pkg/utils",
"//pkg/store/mockstore/unistore",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_zap//zapcore",
],
)
172 changes: 172 additions & 0 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package preparesnap

import (
"context"
"slices"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
brpb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
// default max gRPC message size is 10MiB.
// split requests to chunks of 1MiB will reduce the possibility of being rejected
// due to max gRPC message size.
maxRequestSize = units.MiB
)

type Env interface {
ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error)
GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error)

LoadRegionsInKeyRange(ctx context.Context, startKey, endKey []byte) (regions []Region, err error)
}

type PrepareClient interface {
Send(*brpb.PrepareSnapshotBackupRequest) error
Recv() (*brpb.PrepareSnapshotBackupResponse, error)
}

type SplitRequestClient struct {
PrepareClient
MaxRequestSize int
}

func (s SplitRequestClient) Send(req *brpb.PrepareSnapshotBackupRequest) error {
// Try best to keeping the request untouched.
if req.Ty == brpb.PrepareSnapshotBackupRequestType_WaitApply && req.Size() > s.MaxRequestSize {
rs := req.Regions
findSplitIndex := func() int {
if len(rs) == 0 {
return -1
}

// Select at least one request.
// So we won't get sutck if there were a really huge (!) request.
collected := 0
lastI := 1
for i := 2; i < len(rs) && collected+rs[i].Size() < s.MaxRequestSize; i++ {
lastI = i
collected += rs[i].Size()
}
return lastI
}
for splitIdx := findSplitIndex(); splitIdx > 0; splitIdx = findSplitIndex() {
split := &brpb.PrepareSnapshotBackupRequest{
Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply,
Regions: rs[:splitIdx],
}
rs = rs[splitIdx:]
if err := s.PrepareClient.Send(split); err != nil {
return err
}
}
return nil
}
return s.PrepareClient.Send(req)
}

type Region interface {
GetMeta() *metapb.Region
GetLeaderStoreID() uint64
}

type CliEnv struct {
Cache *tikv.RegionCache
Mgr *utils.StoreManager
}

func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, err
}
withoutTiFlash := slices.DeleteFunc(stores, engine.IsTiFlash)
return withoutTiFlash, err
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
bcli := brpb.NewBackupClient(cc)
c, err := bcli.PrepareSnapshotBackup(ctx)
if err != nil {
return errors.Annotatef(err, "failed to create prepare backup stream")
}
cli = c
return nil
})
if err != nil {
return nil, err
}
return cli, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
bo := tikv.NewBackoffer(ctx, regionCacheMaxBackoffMs)
if len(endKey) == 0 {
// This is encoded [0xff; 8].
// Workaround for https://github.com/tikv/client-go/issues/1051.
endKey = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
}
rs, err := c.Cache.LoadRegionsInKeyRange(bo, startKey, endKey)
if err != nil {
return nil, err
}
rrs := make([]Region, 0, len(rs))
for _, r := range rs {
rrs = append(rrs, r)
}
return rrs, nil
}

type RetryAndSplitRequestEnv struct {
Env
GetBackoffer func() utils.Backoffer
}

func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
// Retry for about 2 minutes.
rs := utils.InitialRetryState(12, 10*time.Second, 10*time.Second)
bo := utils.Backoffer(&rs)
if r.GetBackoffer != nil {
bo = r.GetBackoffer()
}
cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) {
cli, err := r.Env.ConnectToStore(ctx, storeID)
if err != nil {
log.Warn("Failed to connect to store, will retry.", zap.Uint64("store", storeID), logutil.ShortError(err))
return nil, err
}
return cli, nil
})
if err != nil {
return nil, err
}
return SplitRequestClient{PrepareClient: cli, MaxRequestSize: maxRequestSize}, nil
}
Loading

0 comments on commit 1e6ac4a

Please sign in to comment.