Skip to content

Commit

Permalink
server: Auto fix gc_worker's service safepoint for upgraded clusters (#…
Browse files Browse the repository at this point in the history
…3371) (#3391)

* cherry pick #3371 to release-4.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Jan 28, 2021
1 parent 72dc648 commit d620cfd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 5 deletions.
36 changes: 32 additions & 4 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/kv"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -445,10 +446,10 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error {
return s.Remove(key)
}

func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) {
func (s *Storage) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) {
ssp := &ServiceSafePoint{
ServiceID: gcWorkerServiceSafePointID,
SafePoint: 0,
SafePoint: initialValue,
ExpiredAt: math.MaxInt64,
}
if err := s.SaveServiceGCSafePoint(ssp); err != nil {
Expand All @@ -467,16 +468,31 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e
return nil, err
}
if len(keys) == 0 {
// There's no service safepoint. Store an initial value for GC worker.
return s.initServiceGCSafePointForGCWorker()
// There's no service safepoint. It may be a new cluster, or upgraded from an older version where all service
// safepoints are missing. For the second case, we have no way to recover it. Store an initial value 0 for
// gc_worker.
return s.initServiceGCSafePointForGCWorker(0)
}

hasGCWorker := false
min := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.ServiceID == gcWorkerServiceSafePointID {
hasGCWorker = true
// If gc_worker's expire time is incorrectly set, fix it.
if ssp.ExpiredAt != math.MaxInt64 {
ssp.ExpiredAt = math.MaxInt64
err = s.SaveServiceGCSafePoint(ssp)
if err != nil {
return nil, errors.Trace(err)
}
}
}

if ssp.ExpiredAt < now.Unix() {
s.Remove(key)
continue
Expand All @@ -486,6 +502,18 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e
}
}

if min.SafePoint == math.MaxUint64 {
// There's no valid safepoints and we have no way to recover it. Just set gc_worker to 0.
log.Info("there are no valid service safepoints. init gc_worker's service safepoint to 0")
return s.initServiceGCSafePointForGCWorker(0)
}

if !hasGCWorker {
// If there exists some service safepoints but gc_worker is missing, init it with the min value among all
// safepoints (including expired ones)
return s.initServiceGCSafePointForGCWorker(min.SafePoint)
}

return min, nil
}

Expand Down
13 changes: 13 additions & 0 deletions server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,21 @@ func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) {
c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil)
}

// gc_worker's safepoint will be automatically inserted when loading service safepoints. Here the returned
// safepoint can be either of "gc_worker" or "2".
ssp, err := storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.SafePoint, Equals, uint64(2))

// Advance gc_worker's safepoint
c.Assert(storage.SaveServiceGCSafePoint(&ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: math.MaxInt64,
SafePoint: 10,
}), IsNil)

ssp, err = storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.ServiceID, Equals, "2")
c.Assert(ssp.ExpiredAt, Equals, expireAt)
c.Assert(ssp.SafePoint, Equals, uint64(2))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
ExpiredAt: now.Unix() + request.TTL,
SafePoint: request.SafePoint,
}
if request.TTL == math.MaxInt64 {
if math.MaxInt64-now.Unix() <= request.TTL {
ssp.ExpiredAt = math.MaxInt64
}
if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil {
Expand Down
51 changes: 51 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"path"
"path/filepath"
"sort"
"strconv"
Expand Down Expand Up @@ -731,6 +732,56 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"", 1000, 15)
c.Assert(err, NotNil)

// Put some other safepoints to test fixing gc_worker's safepoint when there exists other safepoints.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 11)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"b", 1000, 12)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"c", 1000, 13)
c.Assert(err, IsNil)

// Force set invalid ttl to gc_worker
gcWorkerKey := path.Join("gc", "safe_point", "service", "gc_worker")
{
gcWorkerSsp := &core.ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: -12345,
SafePoint: 10,
}
value, err := json.Marshal(gcWorkerSsp)
c.Assert(err, IsNil)
err = s.srv.GetStorage().Save(gcWorkerKey, string(value))
c.Assert(err, IsNil)
}

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(10))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))

// Force delete gc_worker, then the min service safepoint is 11 of "a".
err = s.srv.GetStorage().Remove(gcWorkerKey)
c.Assert(err, IsNil)
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.SafePoint, Equals, uint64(11))
// After calling LoadMinServiceGCS when "gc_worker"'s service safepoint is missing, "gc_worker"'s service safepoint
// will be newly created.
// Increase "a" so that "gc_worker" is the only minimum that will be returned by LoadMinServiceGCSafePoint.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 14)
c.Assert(err, IsNil)

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(11))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))
}

func (s *testClientSuite) TestScatterRegion(c *C) {
Expand Down

0 comments on commit d620cfd

Please sign in to comment.