Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lock down safepoint when use backup (#257) (#260)
Browse files Browse the repository at this point in the history
Co-authored-by: kennytm <kennytm@gmail.com>

Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3pointer and kennytm authored May 6, 2020
1 parent a7b498e commit 3c364fd
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 39 deletions.
37 changes: 28 additions & 9 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type Client struct {
backupMeta kvproto.BackupMeta
storage storage.ExternalStorage
backend *kvproto.StorageBackend

gcTTL int64
}

// NewBackupClient returns a new backup client
Expand Down Expand Up @@ -112,14 +114,24 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64)
}

// check backup time do not exceed GCSafePoint
err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS)
err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS)
if err != nil {
return 0, errors.Trace(err)
}
log.Info("backup encode timestamp", zap.Uint64("BackupTS", backupTS))
return backupTS, nil
}

// SetGCTTL set gcTTL for client
func (bc *Client) SetGCTTL(ttl int64) {
bc.gcTTL = ttl
}

// GetGCTTL get gcTTL for this backup.
func (bc *Client) GetGCTTL() int64 {
return bc.gcTTL
}

// SetStorage set ExternalStorage for client
func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBackend, sendCreds bool) error {
var err error
Expand Down Expand Up @@ -355,19 +367,26 @@ func (bc *Client) BackupRanges(
t := time.NewTicker(time.Second * 5)
defer t.Stop()

backupTS := req.EndVersion
// use lastBackupTS as safePoint if exists
if req.StartVersion > 0 {
backupTS = req.StartVersion
}

log.Info("current backup safePoint job",
zap.Uint64("backupTS", backupTS))

finished := false
for {
err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.EndVersion)
err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS)
if err != nil {
log.Error("check GC safepoint failed", zap.Error(err))
log.Error("update GC safePoint with TTL failed", zap.Error(err))
return err
}
if req.StartVersion > 0 {
err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.StartVersion)
if err != nil {
log.Error("Check gc safepoint for last backup ts failed", zap.Error(err))
return err
}
err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS)
if err != nil {
log.Error("check GC safePoint failed", zap.Error(err))
return err
}
if finished {
// Return error (if there is any) before finishing backup.
Expand Down
21 changes: 19 additions & 2 deletions pkg/backup/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"go.uber.org/zap"
)

const (
brServiceSafePointID = "br"
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min
DefaultBRGCSafePointTTL = 5 * 60
)

// getGCSafePoint returns the current gc safe point.
// TODO: Some cluster may not enable distributed GC.
func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) {
Expand All @@ -21,9 +27,9 @@ func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) {
return safePoint, nil
}

// CheckGCSafepoint checks whether the ts is older than GC safepoint.
// CheckGCSafePoint checks whether the ts is older than GC safepoint.
// Note: It ignores errors other than exceed GC safepoint.
func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error {
func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error {
// TODO: use PDClient.GetGCSafePoint instead once PD client exports it.
safePoint, err := getGCSafePoint(ctx, pdClient)
if err != nil {
Expand All @@ -35,3 +41,14 @@ func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error
}
return nil
}

// UpdateServiceSafePoint register backupTS to PD, to lock down backupTS as safePoint with ttl seconds
func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, backupTS uint64) error {
log.Debug("update PD safePoint limit with ttl",
zap.Uint64("safePoint", backupTS),
zap.Int64("ttl", ttl))

_, err := pdClient.UpdateServiceGCSafePoint(ctx,
brServiceSafePointID, ttl, backupTS-1)
return err
}
24 changes: 12 additions & 12 deletions pkg/backup/safe_point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,53 @@ import (
"github.com/pingcap/br/pkg/mock"
)

var _ = Suite(&testSaftPointSuite{})
var _ = Suite(&testSafePointSuite{})

type testSaftPointSuite struct {
type testSafePointSuite struct {
mock *mock.Cluster
}

func (s *testSaftPointSuite) SetUpSuite(c *C) {
func (s *testSafePointSuite) SetUpSuite(c *C) {
var err error
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

func (s *testSaftPointSuite) TearDownSuite(c *C) {
func (s *testSafePointSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func (s *testSaftPointSuite) TestCheckGCSafepoint(c *C) {
func (s *testSafePointSuite) TestCheckGCSafepoint(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()

ctx := context.Background()
pdClient := &mockSafepoint{Client: s.mock.PDClient, safepoint: 2333}
pdClient := &mockSafePoint{Client: s.mock.PDClient, safepoint: 2333}
{
err := CheckGCSafepoint(ctx, pdClient, 2333+1)
err := CheckGCSafePoint(ctx, pdClient, 2333+1)
c.Assert(err, IsNil)
}
{
err := CheckGCSafepoint(ctx, pdClient, 2333)
err := CheckGCSafePoint(ctx, pdClient, 2333)
c.Assert(err, NotNil)
}
{
err := CheckGCSafepoint(ctx, pdClient, 2333-1)
err := CheckGCSafePoint(ctx, pdClient, 2333-1)
c.Assert(err, NotNil)
}
{
err := CheckGCSafepoint(ctx, pdClient, 0)
err := CheckGCSafePoint(ctx, pdClient, 0)
c.Assert(err, ErrorMatches, "GC safepoint 2333 exceed TS 0")
}
}

type mockSafepoint struct {
type mockSafePoint struct {
sync.Mutex
pd.Client
safepoint uint64
}

func (m *mockSafepoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
func (m *mockSafePoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
m.Lock()
defer m.Unlock()

Expand Down
12 changes: 11 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"

flagGCTTL = "gcttl"

defaultBackupConcurrency = 4
)

Expand All @@ -43,6 +45,7 @@ type BackupConfig struct {
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
}

// DefineBackupFlags defines common flags for the backup command.
Expand All @@ -56,6 +59,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
" use for incremental backup, support TSO only")
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand All @@ -80,6 +84,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
gcTTL, err := flags.GetInt64(flagGCTTL)
if err != nil {
return errors.Trace(err)
}
cfg.GCTTL = gcTTL

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -117,6 +126,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err = client.SetStorage(ctx, u, cfg.SendCreds); err != nil {
return err
}
client.SetGCTTL(cfg.GCTTL)

backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS)
if err != nil {
Expand All @@ -139,7 +149,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
log.Error("LastBackupTS is larger than current TS")
return errors.New("LastBackupTS is larger than current TS")
}
err = backup.CheckGCSafepoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS)
err = backup.CheckGCSafePoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS)
if err != nil {
log.Error("Check gc safepoint for last backup ts failed", zap.Error(err))
return err
Expand Down
18 changes: 13 additions & 5 deletions tests/br_z_gc_safepoint/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
pdAddr = flag.String("pd", "", "PD address")
gcOffset = flag.Duration("gc-offset", time.Second*10,
"Set GC safe point to current time - gc-offset, default: 10s")
updateService = flag.Bool("update-service", false, "use new service to update min SafePoint")
)

func main() {
Expand All @@ -55,10 +56,17 @@ func main() {
now := oracle.ComposeTS(p, l)
nowMinusOffset := oracle.GetTimeFromTS(now).Add(-*gcOffset)
newSP := oracle.ComposeTS(oracle.GetPhysical(nowMinusOffset), 0)
_, err = pdclient.UpdateGCSafePoint(ctx, newSP)
if err != nil {
log.Fatal("create pd client failed", zap.Error(err))
if *updateService {
_, err = pdclient.UpdateServiceGCSafePoint(ctx, "br", 300, newSP)
if err != nil {
log.Fatal("update service safe point failed", zap.Error(err))
}
log.Info("update service GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now))
} else {
_, err = pdclient.UpdateGCSafePoint(ctx, newSP)
if err != nil {
log.Fatal("update safe point failed", zap.Error(err))
}
log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now))
}

log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now))
}
22 changes: 12 additions & 10 deletions tests/br_z_gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,27 @@ run_sql "CREATE DATABASE $DB;"

go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB

row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}')

# Update GC safepoint to now + 5s after 10s seconds.
sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" &
sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" -update-service true &

# Set ratelimit to 1 bytes/second, we assume it can not finish within 10s,
# so it will trigger exceed GC safe point error.
# total bytes is 1136000
# Set ratelimit to 40960 bytes/second, it will finish within 25s,
# so it won't trigger exceed GC safe point error. even It use updateServiceGCSafePoint to update GC safePoint.
backup_gc_fail=0
echo "backup start (expect fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1
echo "backup start (won't fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/1" --db $DB -t $TABLE --ratelimit 40960 --ratelimit-unit 1 || backup_gc_fail=1

if [ "$backup_gc_fail" -ne "1" ];then
if [ "$backup_gc_fail" -ne "0" ];then
echo "TEST: [$TEST_NAME] test check backup ts failed!"
exit 1
fi

# set safePoint otherwise the default safePoint is zero
bin/gc -pd $PD_ADDR -gc-offset "1s"

backup_gc_fail=0
echo "incremental backup start (expect fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --lastbackupts 1 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/2" --db $DB -t $TABLE --lastbackupts 1 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1

if [ "$backup_gc_fail" -ne "1" ];then
echo "TEST: [$TEST_NAME] test check last backup ts failed!"
Expand All @@ -56,7 +58,7 @@ fi

backup_gc_fail=0
echo "incremental backup with max_uint64 start (expect fail)..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --lastbackupts $MAX_UINT64 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/3" --db $DB -t $TABLE --lastbackupts $MAX_UINT64 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1

if [ "$backup_gc_fail" -ne "1" ];then
echo "TEST: [$TEST_NAME] test check max backup ts failed!"
Expand Down

0 comments on commit 3c364fd

Please sign in to comment.