Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB committed Apr 19, 2023
2 parents ce45939 + 8eae738 commit 8172174
Show file tree
Hide file tree
Showing 113 changed files with 2,687 additions and 829 deletions.
43 changes: 33 additions & 10 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ def go_deps():
sum = "h1:wVe6/Ea46ZMeNkQjjBW6xcqyQA/j5e0D6GytH95g0gQ=",
version = "v0.0.0-20180226025133-644b8db467af",
)
go_repository(
name = "com_github_alecthomas_kingpin_v2",
build_file_proto_mode = "disable",
importpath = "github.com/alecthomas/kingpin/v2",
sum = "h1:ANLJcKmQm4nIaog7xdr/id6FM6zm5hHnfZrvtKPxqGg=",
version = "v2.3.1",
)

go_repository(
name = "com_github_alecthomas_template",
build_file_proto_mode = "disable_global",
Expand All @@ -93,8 +101,8 @@ def go_deps():
name = "com_github_alecthomas_units",
build_file_proto_mode = "disable_global",
importpath = "github.com/alecthomas/units",
sum = "h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=",
version = "v0.0.0-20190924025748-f65c72e2690d",
sum = "h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=",
version = "v0.0.0-20211218093645-b94a6e3cc137",
)
go_repository(
name = "com_github_aleksi_gocov_xml",
Expand Down Expand Up @@ -2545,8 +2553,8 @@ def go_deps():
name = "com_github_kr_pretty",
build_file_proto_mode = "disable_global",
importpath = "github.com/kr/pretty",
sum = "h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=",
version = "v0.3.0",
sum = "h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=",
version = "v0.3.1",
)
go_repository(
name = "com_github_kr_pty",
Expand Down Expand Up @@ -3412,6 +3420,14 @@ def go_deps():
sum = "h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=",
version = "v0.0.0-20180916011732-0a3d74bf9ce4",
)
go_repository(
name = "com_github_pkg_diff",
build_file_proto_mode = "disable",
importpath = "github.com/pkg/diff",
sum = "h1:aoZm08cpOy4WuID//EZDgcC4zIxODThtZNPirFr42+A=",
version = "v0.0.0-20210226163009-20ebb0f2a09e",
)

go_repository(
name = "com_github_pkg_errors",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -3467,8 +3483,8 @@ def go_deps():
name = "com_github_prometheus_client_golang",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/client_golang",
sum = "h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw=",
version = "v1.14.0",
sum = "h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM=",
version = "v1.15.0",
)
go_repository(
name = "com_github_prometheus_client_model",
Expand All @@ -3481,8 +3497,8 @@ def go_deps():
name = "com_github_prometheus_common",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/common",
sum = "h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI=",
version = "v0.39.0",
sum = "h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=",
version = "v0.42.0",
)
go_repository(
name = "com_github_prometheus_procfs",
Expand Down Expand Up @@ -3605,8 +3621,8 @@ def go_deps():
name = "com_github_rogpeppe_go_internal",
build_file_proto_mode = "disable_global",
importpath = "github.com/rogpeppe/go-internal",
sum = "h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=",
version = "v1.6.1",
sum = "h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=",
version = "v1.9.0",
)
go_repository(
name = "com_github_rs_cors",
Expand Down Expand Up @@ -4394,6 +4410,13 @@ def go_deps():
sum = "h1:L8IbaI/W6h5Cwgh0n4zGeZpVK78r/jBf9ASurHo9+/o=",
version = "v0.0.0-20200623134604-12b17a7ff502",
)
go_repository(
name = "com_github_xhit_go_str2duration",
build_file_proto_mode = "disable",
importpath = "github.com/xhit/go-str2duration",
sum = "h1:BcV5u025cITWxEQKGWr1URRzrcXtu7uk8+luz3Yuhwc=",
version = "v1.2.0",
)

go_repository(
name = "com_github_xiang90_probing",
Expand Down
36 changes: 36 additions & 0 deletions br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,26 @@ func runBackupRawCommand(command *cobra.Command, cmdName string) error {
return nil
}

func runBackupTxnCommand(command *cobra.Command, cmdName string) error {
cfg := task.TxnKvConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseBackupConfigFromFlags(command.Flags()); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}

ctx := GetDefaultContext()
if cfg.EnableOpenTracing {
var store *appdash.MemoryStore
ctx, store = trace.TracerStartSpan(ctx)
defer trace.TracerFinishSpan(ctx, store)
}
if err := task.RunBackupTxn(ctx, gluetikv.Glue{}, cmdName, &cfg); err != nil {
log.Error("failed to backup txn kv", zap.Error(err))
return errors.Trace(err)
}
return nil
}

// NewBackupCommand return a full backup subcommand.
func NewBackupCommand() *cobra.Command {
command := &cobra.Command{
Expand All @@ -103,6 +123,7 @@ func NewBackupCommand() *cobra.Command {
newDBBackupCommand(),
newTableBackupCommand(),
newRawBackupCommand(),
newTxnBackupCommand(),
)

task.DefineBackupFlags(command.PersistentFlags())
Expand Down Expand Up @@ -170,3 +191,18 @@ func newRawBackupCommand() *cobra.Command {
task.DefineRawBackupFlags(command)
return command
}

// newTxnBackupCommand return a txn kv range backup subcommand.
func newTxnBackupCommand() *cobra.Command {
command := &cobra.Command{
Use: "txn",
Short: "(experimental) backup a txn kv range from TiKV cluster",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runBackupTxnCommand(command, task.TxnBackupCmd)
},
}

task.DefineTxnBackupFlags(command)
return command
}
35 changes: 35 additions & 0 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ func runRestoreRawCommand(command *cobra.Command, cmdName string) error {
return nil
}

func runRestoreTxnCommand(command *cobra.Command, cmdName string) error {
cfg := task.Config{LogProgress: HasLogFile()}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}

ctx := GetDefaultContext()
if cfg.EnableOpenTracing {
var store *appdash.MemoryStore
ctx, store = trace.TracerStartSpan(ctx)
defer trace.TracerFinishSpan(ctx, store)
}
if err := task.RunRestoreTxn(GetDefaultContext(), gluetikv.Glue{}, cmdName, &cfg); err != nil {
log.Error("failed to restore txn kv", zap.Error(err))
return errors.Trace(err)
}
return nil
}

// NewRestoreCommand returns a restore subcommand.
func NewRestoreCommand() *cobra.Command {
command := &cobra.Command{
Expand All @@ -132,6 +152,7 @@ func NewRestoreCommand() *cobra.Command {
newDBRestoreCommand(),
newTableRestoreCommand(),
newRawRestoreCommand(),
newTxnRestoreCommand(),
newStreamRestoreCommand(),
)
task.DefineRestoreFlags(command.PersistentFlags())
Expand Down Expand Up @@ -193,6 +214,20 @@ func newRawRestoreCommand() *cobra.Command {
return command
}

func newTxnRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "txn",
Short: "(experimental) restore txn kv to TiKV cluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreTxnCommand(cmd, task.TxnRestoreCmd)
},
}

task.DefineRawRestoreFlags(command)
return command
}

func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "point",
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func (bc *Client) SetCipher(cipher *backuppb.CipherInfo) {
bc.cipher = cipher
}

// GetTS gets a new timestamp from PD.
func (bc *Client) GetCurerntTS(ctx context.Context) (uint64, error) {
// GetCurrentTS gets a new timestamp from PD.
func (bc *Client) GetCurrentTS(ctx context.Context) (uint64, error) {
p, l, err := bc.mgr.GetPDClient().GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type BaseKVEncoder struct {

logger *zap.Logger
recordCache []types.Datum
// the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
LastInsertID uint64
}

// NewBaseKVEncoder creates a new BaseKVEncoder.
Expand Down Expand Up @@ -241,6 +244,9 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(e.SessionCtx,
types.NewIntDatum(rowID), col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case e.IsAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := e.AutoIDFn(rowID)
Expand All @@ -250,6 +256,9 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/tikv",
"//br/pkg/lightning/verification",
"//br/pkg/logutil",
"//br/pkg/membuf",
"//br/pkg/pdutil",
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/backend/local/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -60,6 +61,13 @@ type RemoteChecksum struct {
TotalBytes uint64
}

// IsEqual checks whether the checksum is equal to the other.
func (rc *RemoteChecksum) IsEqual(other *verification.KVChecksum) bool {
return rc.Checksum == other.Sum() &&
rc.TotalKVs == other.SumKVS() &&
rc.TotalBytes == other.SumSize()
}

// ChecksumManager is a manager that manages checksums.
type ChecksumManager interface {
Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,11 @@ func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize) {
return
}

// GetPDClient returns the PD client.
func (local *Backend) GetPDClient() pd.Client {
return local.pdCtl.GetPDClient()
}

var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//br/pkg/lightning/log",
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/precheck",
"//br/pkg/lightning/tikv",
"//br/pkg/lightning/verification",
"//br/pkg/lightning/web",
Expand Down Expand Up @@ -122,6 +123,7 @@ go_test(
"//br/pkg/lightning/log",
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/precheck",
"//br/pkg/lightning/verification",
"//br/pkg/lightning/web",
"//br/pkg/lightning/worker",
Expand Down
Loading

0 comments on commit 8172174

Please sign in to comment.