Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: calculate global-checkpoint from pd written by tikv #36305

Merged
merged 16 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type TaskStatus struct {
Info backuppb.StreamBackupTaskInfo
// paused checks whether the task is paused.
paused bool
// global checkpoint from storage
globalCheckpoint uint64
// Checkpoints collects the checkpoints.
Checkpoints []Checkpoint
// Total QPS of the task in recent seconds.
Expand Down Expand Up @@ -130,14 +132,13 @@ func (p *printByTable) AddTask(task TaskStatus) {
pTime := oracle.GetTimeFromTS(ts)
gap := now.Sub(pTime).Round(time.Second)
gapColor := color.New(color.FgGreen)
if gap > 5*time.Minute {
if gap > 10*time.Minute {
gapColor = color.New(color.FgRed)
}
info := fmt.Sprintf("%s; gap=%s", pTime, gapColor.Sprint(gap))
return info
}
cp := task.GetMinStoreCheckpoint()
table.Add("checkpoint[global]", formatTS(cp.TS))
table.Add("checkpoint[global]", formatTS(task.globalCheckpoint))
p.addCheckpoints(&task, table, formatTS)
for store, e := range task.LastErrors {
table.Add(fmt.Sprintf("error[store=%d]", store), e.ErrorCode)
Expand Down Expand Up @@ -191,16 +192,15 @@ func (p *printByJSON) PrintTasks() {
LastError backuppb.StreamBackupError `json:"last_error"`
}
type jsonTask struct {
Name string `json:"name"`
StartTS uint64 `json:"start_ts,omitempty"`
EndTS uint64 `json:"end_ts,omitempty"`
TableFilter []string `json:"table_filter"`
Progress []storeProgress `json:"progress"`
Storage string `json:"storage"`
CheckpointTS uint64 `json:"checkpoint"`
EstQPS float64 `json:"estimate_qps"`
LastErrors []storeLastError `json:"last_errors"`
AllCheckpoints []Checkpoint `json:"all_checkpoints"`
Name string `json:"name"`
StartTS uint64 `json:"start_ts,omitempty"`
EndTS uint64 `json:"end_ts,omitempty"`
TableFilter []string `json:"table_filter"`
Progress []storeProgress `json:"progress"`
Storage string `json:"storage"`
CheckpointTS uint64 `json:"checkpoint"`
EstQPS float64 `json:"estimate_qps"`
LastErrors []storeLastError `json:"last_errors"`
}
taskToJSON := func(t TaskStatus) jsonTask {
s := storage.FormatBackendURL(t.Info.GetStorage())
Expand All @@ -220,18 +220,16 @@ func (p *printByJSON) PrintTasks() {
LastError: lastError,
})
}
cp := t.GetMinStoreCheckpoint()
return jsonTask{
Name: t.Info.GetName(),
StartTS: t.Info.GetStartTs(),
EndTS: t.Info.GetEndTs(),
TableFilter: t.Info.GetTableFilter(),
Progress: sp,
Storage: s.String(),
CheckpointTS: cp.TS,
EstQPS: t.QPS,
LastErrors: se,
AllCheckpoints: t.Checkpoints,
Name: t.Info.GetName(),
StartTS: t.Info.GetStartTs(),
EndTS: t.Info.GetEndTs(),
TableFilter: t.Info.GetTableFilter(),
Progress: sp,
Storage: s.String(),
CheckpointTS: t.globalCheckpoint,
EstQPS: t.QPS,
LastErrors: se,
}
}
mustMarshal := func(i interface{}) string {
Expand Down Expand Up @@ -359,6 +357,10 @@ func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatu
return s, errors.Annotatef(err, "failed to get progress of task %s", s.Info.Name)
}

if s.globalCheckpoint, err = task.GetStorageCheckpoint(ctx); err != nil {
return s, errors.Annotatef(err, "failed to get storage checkpoint of task %s", s.Info.Name)
}

s.LastErrors, err = task.LastError(ctx)
if err != nil {
return s, err
Expand Down
31 changes: 31 additions & 0 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -299,6 +300,13 @@ type Task struct {
Info backuppb.StreamBackupTaskInfo
}

func NewTask(client *MetaDataClient, info backuppb.StreamBackupTaskInfo) *Task {
return &Task{
cli: client,
Info: info,
}
}

// Pause is a shorthand for `metaCli.PauseTask`.
func (t *Task) Pause(ctx context.Context) error {
return t.cli.PauseTask(ctx, t.Info.Name)
Expand Down Expand Up @@ -352,6 +360,29 @@ func (t *Task) NextBackupTSList(ctx context.Context) ([]Checkpoint, error) {
return cps, nil
}

func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error) {
prefix := StorageCheckpointOf(t.Info.Name)
scanner := scanEtcdPrefix(t.cli.Client, prefix)
kvs, err := scanner.AllPages(ctx, 1024)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}

var storageCheckpoint = t.Info.StartTs
for _, kv := range kvs {
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the value isn't 64bits (it is %d bytes, value = %s)",
len(kv.Value),
redact.Key(kv.Value))
}
ts := binary.BigEndian.Uint64(kv.Value)
storageCheckpoint = mathutil.Max(storageCheckpoint, ts)
}

return storageCheckpoint, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
key := CheckPointOf(t.Info.Name, store)
Expand Down
46 changes: 41 additions & 5 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package streamhelper_test

import (
"context"
"encoding/binary"
"fmt"
"net"
"net/url"
"path"
"testing"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -77,7 +79,7 @@ func simpleRanges(tableCount int) streamhelper.Ranges {

func simpleTask(name string, tableCount int) streamhelper.TaskInfo {
backend, _ := storage.ParseBackend("noop://", nil)
task, err := streamhelper.NewTask(name).
task, err := streamhelper.NewTaskInfo(name).
FromTS(1).
UntilTS(1000).
WithRanges(simpleRanges(tableCount)...).
Expand Down Expand Up @@ -136,31 +138,32 @@ func TestIntegration(t *testing.T) {
metaCli := streamhelper.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli, etcd) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.TaskEventClient{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
noop, _ := storage.ParseBackend("noop://", nil)
// The name must not contains slash.
_, err := streamhelper.NewTask("/root").
_, err := streamhelper.NewTaskInfo("/root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
ToStorage(noop).
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Must specify the external storage.
_, err = streamhelper.NewTask("root").
_, err = streamhelper.NewTaskInfo("root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Must specift the table filter and range?
_, err = streamhelper.NewTask("root").
_, err = streamhelper.NewTaskInfo("root").
ToStorage(noop).
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Happy path.
_, err = streamhelper.NewTask("root").
_, err = streamhelper.NewTaskInfo("root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
ToStorage(noop).
Expand Down Expand Up @@ -229,6 +232,39 @@ func testForwardProgress(t *testing.T, metaCli streamhelper.MetaDataClient, etcd
require.Equal(t, store2Checkpoint, uint64(40))
}

func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
var (
taskName = "my_task"
ctx = context.Background()
value = make([]byte, 8)
)

cases := []struct {
storeID string
storageCheckPoint uint64
}{
{
"1",
10001,
}, {
"2",
10002,
},
}
for _, c := range cases {
key := path.Join(streamhelper.StorageCheckpointOf(taskName), c.storeID)
binary.BigEndian.PutUint64(value, c.storageCheckPoint)
_, err := metaCli.Put(ctx, key, string(value))
require.NoError(t, err)
}

taskInfo := simpleTask(taskName, 1)
task := streamhelper.NewTask(&metaCli, taskInfo.PBInfo)
ts, err := task.GetStorageCheckpoint(ctx)
require.NoError(t, err)
require.Equal(t, uint64(10002), ts)
}

func testStreamListening(t *testing.T, metaCli streamhelper.TaskEventClient) {
ctx, cancel := context.WithCancel(context.Background())
taskName := "simple"
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/streamhelper/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
taskInfoPath = "/info"
// nolint:deadcode,varcheck
taskCheckpointPath = "/checkpoint"
storageCheckPoint = "/storage-checkpoint"
taskRangesPath = "/ranges"
taskPausePath = "/pause"
taskLastErrorPath = "/last-error"
Expand Down Expand Up @@ -86,6 +87,11 @@ func GlobalCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, taskCheckpointPath, task, checkpointTypeGlobal)
}

// StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task.
func StorageCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, storageCheckPoint, task)
}

// CheckpointOf returns the checkpoint prefix of some store.
// Normally it would be <prefix>/checkpoint/<task-name>/<store-id(binary-u64)>.
func CheckPointOf(task string, store uint64) string {
Expand Down Expand Up @@ -119,7 +125,7 @@ type TaskInfo struct {
}

// NewTask creates a new task with the name.
func NewTask(name string) *TaskInfo {
func NewTaskInfo(name string) *TaskInfo {
return &TaskInfo{
PBInfo: backuppb.StreamBackupTaskInfo{
Name: name,
Expand Down