Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Backup: register backupTS as safepoint to PD #257

Merged
merged 2 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type Client struct {
backupMeta kvproto.BackupMeta
storage storage.ExternalStorage
backend *kvproto.StorageBackend

gcTTL int64
}

// NewBackupClient returns a new backup client
Expand Down Expand Up @@ -112,14 +114,24 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64)
}

// check backup time do not exceed GCSafePoint
err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS)
err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS)
if err != nil {
return 0, errors.Trace(err)
}
log.Info("backup encode timestamp", zap.Uint64("BackupTS", backupTS))
return backupTS, nil
}

// SetGCTTL set gcTTL for client
func (bc *Client) SetGCTTL(ttl int64) {
bc.gcTTL = ttl
}

// GetGCTTL get gcTTL for this backup.
func (bc *Client) GetGCTTL() int64 {
return bc.gcTTL
}

// SetStorage set ExternalStorage for client
func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBackend, sendCreds bool) error {
var err error
Expand Down Expand Up @@ -370,19 +382,26 @@ func (bc *Client) BackupRanges(
t := time.NewTicker(time.Second * 5)
defer t.Stop()

backupTS := req.EndVersion
// use lastBackupTS as safePoint if exists
if req.StartVersion > 0 {
backupTS = req.StartVersion
}

log.Info("current backup safePoint job",
zap.Uint64("backupTS", backupTS))

finished := false
for {
err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.EndVersion)
err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS)
if err != nil {
log.Error("check GC safepoint failed", zap.Error(err))
log.Error("update GC safePoint with TTL failed", zap.Error(err))
return err
}
if req.StartVersion > 0 {
err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.StartVersion)
if err != nil {
log.Error("Check gc safepoint for last backup ts failed", zap.Error(err))
return err
}
err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS)
if err != nil {
log.Error("check GC safePoint failed", zap.Error(err))
return err
}
if finished {
// Return error (if there is any) before finishing backup.
Expand Down
21 changes: 19 additions & 2 deletions pkg/backup/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"go.uber.org/zap"
)

const (
brServiceSafePointID = "br"
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min
DefaultBRGCSafePointTTL = 5 * 60
)

// getGCSafePoint returns the current gc safe point.
// TODO: Some cluster may not enable distributed GC.
func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) {
Expand All @@ -21,9 +27,9 @@ func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) {
return safePoint, nil
}

// CheckGCSafepoint checks whether the ts is older than GC safepoint.
// CheckGCSafePoint checks whether the ts is older than GC safepoint.
// Note: It ignores errors other than exceed GC safepoint.
func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error {
func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error {
// TODO: use PDClient.GetGCSafePoint instead once PD client exports it.
safePoint, err := getGCSafePoint(ctx, pdClient)
if err != nil {
Expand All @@ -35,3 +41,14 @@ func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error
}
return nil
}

// UpdateServiceSafePoint register backupTS to PD, to lock down backupTS as safePoint with ttl seconds
func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, backupTS uint64) error {
log.Debug("update PD safePoint limit with ttl",
zap.Uint64("safePoint", backupTS),
zap.Int64("ttl", ttl))

_, err := pdClient.UpdateServiceGCSafePoint(ctx,
brServiceSafePointID, ttl, backupTS-1)
return err
}
24 changes: 12 additions & 12 deletions pkg/backup/safe_point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,53 @@ import (
"github.com/pingcap/br/pkg/mock"
)

var _ = Suite(&testSaftPointSuite{})
var _ = Suite(&testSafePointSuite{})

type testSaftPointSuite struct {
type testSafePointSuite struct {
mock *mock.Cluster
}

func (s *testSaftPointSuite) SetUpSuite(c *C) {
func (s *testSafePointSuite) SetUpSuite(c *C) {
var err error
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

func (s *testSaftPointSuite) TearDownSuite(c *C) {
func (s *testSafePointSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func (s *testSaftPointSuite) TestCheckGCSafepoint(c *C) {
func (s *testSafePointSuite) TestCheckGCSafepoint(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()

ctx := context.Background()
pdClient := &mockSafepoint{Client: s.mock.PDClient, safepoint: 2333}
pdClient := &mockSafePoint{Client: s.mock.PDClient, safepoint: 2333}
{
err := CheckGCSafepoint(ctx, pdClient, 2333+1)
err := CheckGCSafePoint(ctx, pdClient, 2333+1)
c.Assert(err, IsNil)
}
{
err := CheckGCSafepoint(ctx, pdClient, 2333)
err := CheckGCSafePoint(ctx, pdClient, 2333)
c.Assert(err, NotNil)
}
{
err := CheckGCSafepoint(ctx, pdClient, 2333-1)
err := CheckGCSafePoint(ctx, pdClient, 2333-1)
c.Assert(err, NotNil)
}
{
err := CheckGCSafepoint(ctx, pdClient, 0)
err := CheckGCSafePoint(ctx, pdClient, 0)
c.Assert(err, ErrorMatches, "GC safepoint 2333 exceed TS 0")
}
}

type mockSafepoint struct {
type mockSafePoint struct {
sync.Mutex
pd.Client
safepoint uint64
}

func (m *mockSafepoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
func (m *mockSafePoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
m.Lock()
defer m.Unlock()

Expand Down
12 changes: 11 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"

flagGCTTL = "gcttl"

defaultBackupConcurrency = 4
)

Expand All @@ -43,6 +45,7 @@ type BackupConfig struct {
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
}

// DefineBackupFlags defines common flags for the backup command.
Expand All @@ -56,6 +59,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
" use for incremental backup, support TSO only")
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand All @@ -80,6 +84,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
gcTTL, err := flags.GetInt64(flagGCTTL)
if err != nil {
return errors.Trace(err)
}
cfg.GCTTL = gcTTL

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -117,6 +126,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err = client.SetStorage(ctx, u, cfg.SendCreds); err != nil {
return err
}
client.SetGCTTL(cfg.GCTTL)

backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS)
if err != nil {
Expand All @@ -140,7 +150,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
log.Error("LastBackupTS is larger than current TS")
return errors.New("LastBackupTS is larger than current TS")
}
err = backup.CheckGCSafepoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS)
err = backup.CheckGCSafePoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS)
if err != nil {
log.Error("Check gc safepoint for last backup ts failed", zap.Error(err))
return err
Expand Down
18 changes: 13 additions & 5 deletions tests/br_z_gc_safepoint/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
pdAddr = flag.String("pd", "", "PD address")
gcOffset = flag.Duration("gc-offset", time.Second*10,
"Set GC safe point to current time - gc-offset, default: 10s")
updateService = flag.Bool("update-service", false, "use new service to update min SafePoint")
)

func main() {
Expand All @@ -55,10 +56,17 @@ func main() {
now := oracle.ComposeTS(p, l)
nowMinusOffset := oracle.GetTimeFromTS(now).Add(-*gcOffset)
newSP := oracle.ComposeTS(oracle.GetPhysical(nowMinusOffset), 0)
_, err = pdclient.UpdateGCSafePoint(ctx, newSP)
if err != nil {
log.Fatal("create pd client failed", zap.Error(err))
if *updateService {
_, err = pdclient.UpdateServiceGCSafePoint(ctx, "br", 300, newSP)
if err != nil {
log.Fatal("update service safe point failed", zap.Error(err))
}
log.Info("update service GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now))
} else {
_, err = pdclient.UpdateGCSafePoint(ctx, newSP)
if err != nil {
log.Fatal("update safe point failed", zap.Error(err))
}
log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now))
}

log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now))
}
22 changes: 12 additions & 10 deletions tests/br_z_gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,27 @@ run_sql "CREATE DATABASE $DB;"

go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB

row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}')

# Update GC safepoint to now + 5s after 10s seconds.
sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" &
sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" -update-service true &

# Set ratelimit to 1 bytes/second, we assume it can not finish within 10s,
# so it will trigger exceed GC safe point error.
# total bytes is 1136000
# Set ratelimit to 40960 bytes/second, it will finish within 25s,
# so it won't trigger exceed GC safe point error. even It use updateServiceGCSafePoint to update GC safePoint.
backup_gc_fail=0
echo "backup start (expect fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1
echo "backup start (won't fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/1" --db $DB -t $TABLE --ratelimit 40960 --ratelimit-unit 1 || backup_gc_fail=1

if [ "$backup_gc_fail" -ne "1" ];then
if [ "$backup_gc_fail" -ne "0" ];then
echo "TEST: [$TEST_NAME] test check backup ts failed!"
exit 1
fi

# set safePoint otherwise the default safePoint is zero
bin/gc -pd $PD_ADDR -gc-offset "1s"

backup_gc_fail=0
echo "incremental backup start (expect fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --lastbackupts 1 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/2" --db $DB -t $TABLE --lastbackupts 1 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1

if [ "$backup_gc_fail" -ne "1" ];then
echo "TEST: [$TEST_NAME] test check last backup ts failed!"
Expand All @@ -56,7 +58,7 @@ fi

backup_gc_fail=0
echo "incremental backup with max_uint64 start (expect fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --lastbackupts $MAX_UINT64 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/3" --db $DB -t $TABLE --lastbackupts $MAX_UINT64 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1

if [ "$backup_gc_fail" -ne "1" ];then
echo "TEST: [$TEST_NAME] test check max backup ts failed!"
Expand Down