From 3c364fde556f14b85b698a6f24ee6dc088fd6920 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 6 May 2020 22:03:28 +0800 Subject: [PATCH] lock down safepoint when use backup (#257) (#260) Co-authored-by: kennytm Co-authored-by: kennytm --- pkg/backup/client.go | 37 +++++++++++++++++++++++++--------- pkg/backup/safe_point.go | 21 +++++++++++++++++-- pkg/backup/safe_point_test.go | 24 +++++++++++----------- pkg/task/backup.go | 12 ++++++++++- tests/br_z_gc_safepoint/gc.go | 18 ++++++++++++----- tests/br_z_gc_safepoint/run.sh | 22 +++++++++++--------- 6 files changed, 95 insertions(+), 39 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 1b016ba8b..16df96a9d 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -68,6 +68,8 @@ type Client struct { backupMeta kvproto.BackupMeta storage storage.ExternalStorage backend *kvproto.StorageBackend + + gcTTL int64 } // NewBackupClient returns a new backup client @@ -112,7 +114,7 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) } // check backup time do not exceed GCSafePoint - err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS) + err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS) if err != nil { return 0, errors.Trace(err) } @@ -120,6 +122,16 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) return backupTS, nil } +// SetGCTTL set gcTTL for client +func (bc *Client) SetGCTTL(ttl int64) { + bc.gcTTL = ttl +} + +// GetGCTTL get gcTTL for this backup. +func (bc *Client) GetGCTTL() int64 { + return bc.gcTTL +} + // SetStorage set ExternalStorage for client func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBackend, sendCreds bool) error { var err error @@ -355,19 +367,26 @@ func (bc *Client) BackupRanges( t := time.NewTicker(time.Second * 5) defer t.Stop() + backupTS := req.EndVersion + // use lastBackupTS as safePoint if exists + if req.StartVersion > 0 { + backupTS = req.StartVersion + } + + log.Info("current backup safePoint job", + zap.Uint64("backupTS", backupTS)) + finished := false for { - err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.EndVersion) + err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS) if err != nil { - log.Error("check GC safepoint failed", zap.Error(err)) + log.Error("update GC safePoint with TTL failed", zap.Error(err)) return err } - if req.StartVersion > 0 { - err = CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.StartVersion) - if err != nil { - log.Error("Check gc safepoint for last backup ts failed", zap.Error(err)) - return err - } + err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS) + if err != nil { + log.Error("check GC safePoint failed", zap.Error(err)) + return err } if finished { // Return error (if there is any) before finishing backup. diff --git a/pkg/backup/safe_point.go b/pkg/backup/safe_point.go index d4d431ded..b867852c9 100644 --- a/pkg/backup/safe_point.go +++ b/pkg/backup/safe_point.go @@ -11,6 +11,12 @@ import ( "go.uber.org/zap" ) +const ( + brServiceSafePointID = "br" + // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min + DefaultBRGCSafePointTTL = 5 * 60 +) + // getGCSafePoint returns the current gc safe point. // TODO: Some cluster may not enable distributed GC. func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { @@ -21,9 +27,9 @@ func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { return safePoint, nil } -// CheckGCSafepoint checks whether the ts is older than GC safepoint. +// CheckGCSafePoint checks whether the ts is older than GC safepoint. // Note: It ignores errors other than exceed GC safepoint. -func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error { +func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error { // TODO: use PDClient.GetGCSafePoint instead once PD client exports it. safePoint, err := getGCSafePoint(ctx, pdClient) if err != nil { @@ -35,3 +41,14 @@ func CheckGCSafepoint(ctx context.Context, pdClient pd.Client, ts uint64) error } return nil } + +// UpdateServiceSafePoint register backupTS to PD, to lock down backupTS as safePoint with ttl seconds +func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, backupTS uint64) error { + log.Debug("update PD safePoint limit with ttl", + zap.Uint64("safePoint", backupTS), + zap.Int64("ttl", ttl)) + + _, err := pdClient.UpdateServiceGCSafePoint(ctx, + brServiceSafePointID, ttl, backupTS-1) + return err +} diff --git a/pkg/backup/safe_point_test.go b/pkg/backup/safe_point_test.go index cdc071686..6ded3d0ab 100644 --- a/pkg/backup/safe_point_test.go +++ b/pkg/backup/safe_point_test.go @@ -13,53 +13,53 @@ import ( "github.com/pingcap/br/pkg/mock" ) -var _ = Suite(&testSaftPointSuite{}) +var _ = Suite(&testSafePointSuite{}) -type testSaftPointSuite struct { +type testSafePointSuite struct { mock *mock.Cluster } -func (s *testSaftPointSuite) SetUpSuite(c *C) { +func (s *testSafePointSuite) SetUpSuite(c *C) { var err error s.mock, err = mock.NewCluster() c.Assert(err, IsNil) } -func (s *testSaftPointSuite) TearDownSuite(c *C) { +func (s *testSafePointSuite) TearDownSuite(c *C) { testleak.AfterTest(c)() } -func (s *testSaftPointSuite) TestCheckGCSafepoint(c *C) { +func (s *testSafePointSuite) TestCheckGCSafepoint(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() ctx := context.Background() - pdClient := &mockSafepoint{Client: s.mock.PDClient, safepoint: 2333} + pdClient := &mockSafePoint{Client: s.mock.PDClient, safepoint: 2333} { - err := CheckGCSafepoint(ctx, pdClient, 2333+1) + err := CheckGCSafePoint(ctx, pdClient, 2333+1) c.Assert(err, IsNil) } { - err := CheckGCSafepoint(ctx, pdClient, 2333) + err := CheckGCSafePoint(ctx, pdClient, 2333) c.Assert(err, NotNil) } { - err := CheckGCSafepoint(ctx, pdClient, 2333-1) + err := CheckGCSafePoint(ctx, pdClient, 2333-1) c.Assert(err, NotNil) } { - err := CheckGCSafepoint(ctx, pdClient, 0) + err := CheckGCSafePoint(ctx, pdClient, 0) c.Assert(err, ErrorMatches, "GC safepoint 2333 exceed TS 0") } } -type mockSafepoint struct { +type mockSafePoint struct { sync.Mutex pd.Client safepoint uint64 } -func (m *mockSafepoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) { +func (m *mockSafePoint) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) { m.Lock() defer m.Unlock() diff --git a/pkg/task/backup.go b/pkg/task/backup.go index c24a6c607..e6c479732 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -33,6 +33,8 @@ const ( flagBackupTS = "backupts" flagLastBackupTS = "lastbackupts" + flagGCTTL = "gcttl" + defaultBackupConcurrency = 4 ) @@ -43,6 +45,7 @@ type BackupConfig struct { TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` BackupTS uint64 `json:"backup-ts" toml:"backup-ts"` LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` + GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"` } // DefineBackupFlags defines common flags for the backup command. @@ -56,6 +59,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) { " use for incremental backup, support TSO only") flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+ " e.g. '400036290571534337', '2018-05-11 01:42:23'") + flags.Int64(flagGCTTL, backup.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint") } // ParseFromFlags parses the backup-related flags from the flag set. @@ -80,6 +84,11 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + gcTTL, err := flags.GetInt64(flagGCTTL) + if err != nil { + return errors.Trace(err) + } + cfg.GCTTL = gcTTL if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) @@ -117,6 +126,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err = client.SetStorage(ctx, u, cfg.SendCreds); err != nil { return err } + client.SetGCTTL(cfg.GCTTL) backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS) if err != nil { @@ -139,7 +149,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig log.Error("LastBackupTS is larger than current TS") return errors.New("LastBackupTS is larger than current TS") } - err = backup.CheckGCSafepoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS) + err = backup.CheckGCSafePoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS) if err != nil { log.Error("Check gc safepoint for last backup ts failed", zap.Error(err)) return err diff --git a/tests/br_z_gc_safepoint/gc.go b/tests/br_z_gc_safepoint/gc.go index d5a30361e..ce2b2a089 100644 --- a/tests/br_z_gc_safepoint/gc.go +++ b/tests/br_z_gc_safepoint/gc.go @@ -30,6 +30,7 @@ var ( pdAddr = flag.String("pd", "", "PD address") gcOffset = flag.Duration("gc-offset", time.Second*10, "Set GC safe point to current time - gc-offset, default: 10s") + updateService = flag.Bool("update-service", false, "use new service to update min SafePoint") ) func main() { @@ -55,10 +56,17 @@ func main() { now := oracle.ComposeTS(p, l) nowMinusOffset := oracle.GetTimeFromTS(now).Add(-*gcOffset) newSP := oracle.ComposeTS(oracle.GetPhysical(nowMinusOffset), 0) - _, err = pdclient.UpdateGCSafePoint(ctx, newSP) - if err != nil { - log.Fatal("create pd client failed", zap.Error(err)) + if *updateService { + _, err = pdclient.UpdateServiceGCSafePoint(ctx, "br", 300, newSP) + if err != nil { + log.Fatal("update service safe point failed", zap.Error(err)) + } + log.Info("update service GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now)) + } else { + _, err = pdclient.UpdateGCSafePoint(ctx, newSP) + if err != nil { + log.Fatal("update safe point failed", zap.Error(err)) + } + log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now)) } - - log.Info("update GC safe point", zap.Uint64("SP", newSP), zap.Uint64("now", now)) } diff --git a/tests/br_z_gc_safepoint/run.sh b/tests/br_z_gc_safepoint/run.sh index a76e97501..608798b0f 100755 --- a/tests/br_z_gc_safepoint/run.sh +++ b/tests/br_z_gc_safepoint/run.sh @@ -29,25 +29,27 @@ run_sql "CREATE DATABASE $DB;" go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB -row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') - # Update GC safepoint to now + 5s after 10s seconds. -sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" & +sleep 10 && bin/gc -pd $PD_ADDR -gc-offset "5s" -update-service true & -# Set ratelimit to 1 bytes/second, we assume it can not finish within 10s, -# so it will trigger exceed GC safe point error. +# total bytes is 1136000 +# Set ratelimit to 40960 bytes/second, it will finish within 25s, +# so it won't trigger exceed GC safe point error. even It use updateServiceGCSafePoint to update GC safePoint. backup_gc_fail=0 -echo "backup start (expect fail)..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1 +echo "backup start (won't fail)..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/1" --db $DB -t $TABLE --ratelimit 40960 --ratelimit-unit 1 || backup_gc_fail=1 -if [ "$backup_gc_fail" -ne "1" ];then +if [ "$backup_gc_fail" -ne "0" ];then echo "TEST: [$TEST_NAME] test check backup ts failed!" exit 1 fi +# set safePoint otherwise the default safePoint is zero +bin/gc -pd $PD_ADDR -gc-offset "1s" + backup_gc_fail=0 echo "incremental backup start (expect fail)..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --lastbackupts 1 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1 +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/2" --db $DB -t $TABLE --lastbackupts 1 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1 if [ "$backup_gc_fail" -ne "1" ];then echo "TEST: [$TEST_NAME] test check last backup ts failed!" @@ -56,7 +58,7 @@ fi backup_gc_fail=0 echo "incremental backup with max_uint64 start (expect fail)..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --lastbackupts $MAX_UINT64 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1 +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/3" --db $DB -t $TABLE --lastbackupts $MAX_UINT64 --ratelimit 1 --ratelimit-unit 1 || backup_gc_fail=1 if [ "$backup_gc_fail" -ne "1" ];then echo "TEST: [$TEST_NAME] test check max backup ts failed!"