Skip to content

Commit

Permalink
log-backup: calculate global-checkpoint from pd written by tikv (#36305)
Browse files Browse the repository at this point in the history
* calcualte global-checkpoint from pd written by tikv

Signed-off-by: joccau <zak.zhao@pingcap.com>

* log-status shows storage checkpoint

Signed-off-by: joccau <zak.zhao@pingcap.com>

* add test case

Signed-off-by: joccau <zak.zhao@pingcap.com>

* show red status if the checkpoint gap > 10min

Signed-off-by: joccau <zak.zhao@pingcap.com>

Co-authored-by: 3pointer <luancheng@pingcap.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people authored Jul 20, 2022
1 parent 8baa287 commit 676ecab
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 30 deletions.
50 changes: 26 additions & 24 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type TaskStatus struct {
Info backuppb.StreamBackupTaskInfo
// paused checks whether the task is paused.
paused bool
// global checkpoint from storage
globalCheckpoint uint64
// Checkpoints collects the checkpoints.
Checkpoints []Checkpoint
// Total QPS of the task in recent seconds.
Expand Down Expand Up @@ -130,14 +132,13 @@ func (p *printByTable) AddTask(task TaskStatus) {
pTime := oracle.GetTimeFromTS(ts)
gap := now.Sub(pTime).Round(time.Second)
gapColor := color.New(color.FgGreen)
if gap > 5*time.Minute {
if gap > 10*time.Minute {
gapColor = color.New(color.FgRed)
}
info := fmt.Sprintf("%s; gap=%s", pTime, gapColor.Sprint(gap))
return info
}
cp := task.GetMinStoreCheckpoint()
table.Add("checkpoint[global]", formatTS(cp.TS))
table.Add("checkpoint[global]", formatTS(task.globalCheckpoint))
p.addCheckpoints(&task, table, formatTS)
for store, e := range task.LastErrors {
table.Add(fmt.Sprintf("error[store=%d]", store), e.ErrorCode)
Expand Down Expand Up @@ -191,16 +192,15 @@ func (p *printByJSON) PrintTasks() {
LastError backuppb.StreamBackupError `json:"last_error"`
}
type jsonTask struct {
Name string `json:"name"`
StartTS uint64 `json:"start_ts,omitempty"`
EndTS uint64 `json:"end_ts,omitempty"`
TableFilter []string `json:"table_filter"`
Progress []storeProgress `json:"progress"`
Storage string `json:"storage"`
CheckpointTS uint64 `json:"checkpoint"`
EstQPS float64 `json:"estimate_qps"`
LastErrors []storeLastError `json:"last_errors"`
AllCheckpoints []Checkpoint `json:"all_checkpoints"`
Name string `json:"name"`
StartTS uint64 `json:"start_ts,omitempty"`
EndTS uint64 `json:"end_ts,omitempty"`
TableFilter []string `json:"table_filter"`
Progress []storeProgress `json:"progress"`
Storage string `json:"storage"`
CheckpointTS uint64 `json:"checkpoint"`
EstQPS float64 `json:"estimate_qps"`
LastErrors []storeLastError `json:"last_errors"`
}
taskToJSON := func(t TaskStatus) jsonTask {
s := storage.FormatBackendURL(t.Info.GetStorage())
Expand All @@ -220,18 +220,16 @@ func (p *printByJSON) PrintTasks() {
LastError: lastError,
})
}
cp := t.GetMinStoreCheckpoint()
return jsonTask{
Name: t.Info.GetName(),
StartTS: t.Info.GetStartTs(),
EndTS: t.Info.GetEndTs(),
TableFilter: t.Info.GetTableFilter(),
Progress: sp,
Storage: s.String(),
CheckpointTS: cp.TS,
EstQPS: t.QPS,
LastErrors: se,
AllCheckpoints: t.Checkpoints,
Name: t.Info.GetName(),
StartTS: t.Info.GetStartTs(),
EndTS: t.Info.GetEndTs(),
TableFilter: t.Info.GetTableFilter(),
Progress: sp,
Storage: s.String(),
CheckpointTS: t.globalCheckpoint,
EstQPS: t.QPS,
LastErrors: se,
}
}
mustMarshal := func(i interface{}) string {
Expand Down Expand Up @@ -359,6 +357,10 @@ func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatu
return s, errors.Annotatef(err, "failed to get progress of task %s", s.Info.Name)
}

if s.globalCheckpoint, err = task.GetStorageCheckpoint(ctx); err != nil {
return s, errors.Annotatef(err, "failed to get storage checkpoint of task %s", s.Info.Name)
}

s.LastErrors, err = task.LastError(ctx)
if err != nil {
return s, err
Expand Down
31 changes: 31 additions & 0 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -299,6 +300,13 @@ type Task struct {
Info backuppb.StreamBackupTaskInfo
}

func NewTask(client *MetaDataClient, info backuppb.StreamBackupTaskInfo) *Task {
return &Task{
cli: client,
Info: info,
}
}

// Pause is a shorthand for `metaCli.PauseTask`.
func (t *Task) Pause(ctx context.Context) error {
return t.cli.PauseTask(ctx, t.Info.Name)
Expand Down Expand Up @@ -352,6 +360,29 @@ func (t *Task) NextBackupTSList(ctx context.Context) ([]Checkpoint, error) {
return cps, nil
}

func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error) {
prefix := StorageCheckpointOf(t.Info.Name)
scanner := scanEtcdPrefix(t.cli.Client, prefix)
kvs, err := scanner.AllPages(ctx, 1024)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}

var storageCheckpoint = t.Info.StartTs
for _, kv := range kvs {
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the value isn't 64bits (it is %d bytes, value = %s)",
len(kv.Value),
redact.Key(kv.Value))
}
ts := binary.BigEndian.Uint64(kv.Value)
storageCheckpoint = mathutil.Max(storageCheckpoint, ts)
}

return storageCheckpoint, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
key := CheckPointOf(t.Info.Name, store)
Expand Down
46 changes: 41 additions & 5 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package streamhelper_test

import (
"context"
"encoding/binary"
"fmt"
"net"
"net/url"
"path"
"testing"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -77,7 +79,7 @@ func simpleRanges(tableCount int) streamhelper.Ranges {

func simpleTask(name string, tableCount int) streamhelper.TaskInfo {
backend, _ := storage.ParseBackend("noop://", nil)
task, err := streamhelper.NewTask(name).
task, err := streamhelper.NewTaskInfo(name).
FromTS(1).
UntilTS(1000).
WithRanges(simpleRanges(tableCount)...).
Expand Down Expand Up @@ -136,31 +138,32 @@ func TestIntegration(t *testing.T) {
metaCli := streamhelper.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli, etcd) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.TaskEventClient{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
noop, _ := storage.ParseBackend("noop://", nil)
// The name must not contains slash.
_, err := streamhelper.NewTask("/root").
_, err := streamhelper.NewTaskInfo("/root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
ToStorage(noop).
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Must specify the external storage.
_, err = streamhelper.NewTask("root").
_, err = streamhelper.NewTaskInfo("root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Must specift the table filter and range?
_, err = streamhelper.NewTask("root").
_, err = streamhelper.NewTaskInfo("root").
ToStorage(noop).
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Happy path.
_, err = streamhelper.NewTask("root").
_, err = streamhelper.NewTaskInfo("root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
ToStorage(noop).
Expand Down Expand Up @@ -229,6 +232,39 @@ func testForwardProgress(t *testing.T, metaCli streamhelper.MetaDataClient, etcd
require.Equal(t, store2Checkpoint, uint64(40))
}

func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
var (
taskName = "my_task"
ctx = context.Background()
value = make([]byte, 8)
)

cases := []struct {
storeID string
storageCheckPoint uint64
}{
{
"1",
10001,
}, {
"2",
10002,
},
}
for _, c := range cases {
key := path.Join(streamhelper.StorageCheckpointOf(taskName), c.storeID)
binary.BigEndian.PutUint64(value, c.storageCheckPoint)
_, err := metaCli.Put(ctx, key, string(value))
require.NoError(t, err)
}

taskInfo := simpleTask(taskName, 1)
task := streamhelper.NewTask(&metaCli, taskInfo.PBInfo)
ts, err := task.GetStorageCheckpoint(ctx)
require.NoError(t, err)
require.Equal(t, uint64(10002), ts)
}

func testStreamListening(t *testing.T, metaCli streamhelper.TaskEventClient) {
ctx, cancel := context.WithCancel(context.Background())
taskName := "simple"
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/streamhelper/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
taskInfoPath = "/info"
// nolint:deadcode,varcheck
taskCheckpointPath = "/checkpoint"
storageCheckPoint = "/storage-checkpoint"
taskRangesPath = "/ranges"
taskPausePath = "/pause"
taskLastErrorPath = "/last-error"
Expand Down Expand Up @@ -86,6 +87,11 @@ func GlobalCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, taskCheckpointPath, task, checkpointTypeGlobal)
}

// StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task.
func StorageCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, storageCheckPoint, task)
}

// CheckpointOf returns the checkpoint prefix of some store.
// Normally it would be <prefix>/checkpoint/<task-name>/<store-id(binary-u64)>.
func CheckPointOf(task string, store uint64) string {
Expand Down Expand Up @@ -119,7 +125,7 @@ type TaskInfo struct {
}

// NewTask creates a new task with the name.
func NewTask(name string) *TaskInfo {
func NewTaskInfo(name string) *TaskInfo {
return &TaskInfo{
PBInfo: backuppb.StreamBackupTaskInfo{
Name: name,
Expand Down

0 comments on commit 676ecab

Please sign in to comment.