-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server: Auto fix gc_worker's service safepoint for upgraded clusters #3371
Changes from 3 commits
fed4504
e70877e
8322306
96bf381
5db9638
3b8fa3f
1903929
0b05a28
5fb63fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,11 +27,13 @@ import ( | |
"github.com/gogo/protobuf/proto" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/kvproto/pkg/metapb" | ||
"github.com/pingcap/log" | ||
"github.com/tikv/pd/pkg/encryption" | ||
"github.com/tikv/pd/pkg/errs" | ||
"github.com/tikv/pd/server/encryptionkm" | ||
"github.com/tikv/pd/server/kv" | ||
"go.etcd.io/etcd/clientv3" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
|
@@ -501,10 +503,46 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error { | |
return s.Remove(key) | ||
} | ||
|
||
func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) { | ||
// fixGCWorkerServiceSafepoint tries to fix gc_worker's special service safepoint if it's missing or incorrect. An issue | ||
// in the older version may have invalid TTL for gc_worker's safepoint, and it also might be missing. gc_worker's | ||
// safepoint may also be missing when the cluster is just bootstrapped. Detect these cases and fix gc_worker's safepoint | ||
// if necessary. | ||
func (s *Storage) fixGCWorkerServiceSafePpoint(allServiceSafePoints []*ServiceSafePoint) (modified bool, err error) { | ||
if len(allServiceSafePoints) == 0 { | ||
// It's a new cluster, or everything is lost so we have no way to recover it. Initialize gc_worker's service | ||
// safepoint to zero. | ||
_, err = s.initServiceGCSafePointForGCWorker(0) | ||
return true, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the initialization is wrong, we still return true? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes but I thought err will be not nil here so it doesn't matter 🤔 maybe it doesn't look good. I'll change it. |
||
} | ||
|
||
var min uint64 = math.MaxUint64 | ||
for _, ssp := range allServiceSafePoints { | ||
if ssp.ServiceID == gcWorkerServiceSafePointID { | ||
if ssp.ExpiredAt != math.MaxInt64 { | ||
// gc_worker's TTL is incorrectly set. Set it to MaxInt64. | ||
log.Info("gc_worker's service safepoint has invalid TTL, fixing it", | ||
zap.Uint64("safepoint", ssp.SafePoint), zap.Int64("expiredAt", ssp.ExpiredAt)) | ||
ssp.ExpiredAt = math.MaxInt64 | ||
err = s.SaveServiceGCSafePoint(ssp) | ||
return true, err | ||
} | ||
// gc_worker's service safepoint exists normally. | ||
return false, nil | ||
} | ||
if ssp.SafePoint < min { | ||
min = ssp.SafePoint | ||
} | ||
} | ||
|
||
// gc_worker is missing. | ||
_, err = s.initServiceGCSafePointForGCWorker(min) | ||
return true, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
} | ||
|
||
func (s *Storage) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) { | ||
ssp := &ServiceSafePoint{ | ||
ServiceID: gcWorkerServiceSafePointID, | ||
SafePoint: 0, | ||
SafePoint: initialValue, | ||
ExpiredAt: math.MaxInt64, | ||
} | ||
if err := s.SaveServiceGCSafePoint(ssp); err != nil { | ||
|
@@ -515,25 +553,52 @@ func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) | |
|
||
// LoadMinServiceGCSafePoint returns the minimum safepoint across all services | ||
func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) { | ||
prefix := path.Join(gcPath, "safe_point", "service") + "/" | ||
prefixEnd := clientv3.GetPrefixRangeEnd(prefix) | ||
keys, values, err := s.LoadRange(prefix, prefixEnd, 0) | ||
loadAll := func() ([]string, []*ServiceSafePoint, error) { | ||
prefix := path.Join(gcPath, "safe_point", "service") + "/" | ||
prefixEnd := clientv3.GetPrefixRangeEnd(prefix) | ||
keys, values, err := s.LoadRange(prefix, prefixEnd, 0) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
allServiceSafePoints := make([]*ServiceSafePoint, 0, len(values)) | ||
for _, value := range values { | ||
ssp := &ServiceSafePoint{} | ||
if err := json.Unmarshal([]byte(value), ssp); err != nil { | ||
return nil, nil, err | ||
} | ||
allServiceSafePoints = append(allServiceSafePoints, ssp) | ||
} | ||
return keys, allServiceSafePoints, nil | ||
} | ||
|
||
keys, allServiceSafePoints, err := loadAll() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(keys) == 0 { | ||
// There's no service safepoint. Store an initial value for GC worker. | ||
return s.initServiceGCSafePointForGCWorker() | ||
|
||
modified, err := s.fixGCWorkerServiceSafePpoint(allServiceSafePoints) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
min := &ServiceSafePoint{SafePoint: math.MaxUint64} | ||
for i, key := range keys { | ||
ssp := &ServiceSafePoint{} | ||
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { | ||
if modified { | ||
// Reload the safepoints | ||
keys, allServiceSafePoints, err = loadAll() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is just modifying the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's more, it sames that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The user needs the minimum value among all non-expired safepoints, but I want to fix it with the minimum value including expired-but-not-deleted ones. So the final min value may decrease. It's possible to avoid reloading and do all these things in one loop, but trying to make the code more readable, I choose the less-effective way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
min := &ServiceSafePoint{SafePoint: math.MaxUint64} | ||
for i, key := range keys { | ||
ssp := allServiceSafePoints[i] | ||
if ssp.ExpiredAt < now.Unix() { | ||
s.Remove(key) | ||
err = s.Remove(key) | ||
if err != nil { | ||
log.Warn("failed to remove expired service safepoint", | ||
zap.String("id", key), zap.Uint64("safepoint", ssp.SafePoint), zap.Error(err)) | ||
} | ||
continue | ||
} | ||
if ssp.SafePoint < min.SafePoint { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"encoding/json" | ||
"fmt" | ||
"math" | ||
"path" | ||
"path/filepath" | ||
"sort" | ||
"strconv" | ||
|
@@ -911,6 +912,61 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { | |
_, err = s.client.UpdateServiceGCSafePoint(context.Background(), | ||
"", 1000, 15) | ||
c.Assert(err, NotNil) | ||
|
||
// Put some other safepoints to test fixing gc_worker's safepoint when there exists other safepoints. | ||
_, err = s.client.UpdateServiceGCSafePoint(context.Background(), | ||
"a", 1000, 11) | ||
c.Assert(err, IsNil) | ||
_, err = s.client.UpdateServiceGCSafePoint(context.Background(), | ||
"b", 1000, 12) | ||
c.Assert(err, IsNil) | ||
_, err = s.client.UpdateServiceGCSafePoint(context.Background(), | ||
"c", 1000, 13) | ||
c.Assert(err, IsNil) | ||
|
||
// Force set invalid ttl to gc_worker | ||
//err = s.srv.GetStorage().SaveServiceGCSafePoint(&core.ServiceSafePoint{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why comment it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah sorry. I forgot to remove it. |
||
// ServiceID: "gc_worker", | ||
// ExpiredAt: -12345, | ||
// SafePoint: 10, | ||
//}) | ||
gcWorkerKey := path.Join("gc", "safe_point", "service", "gc_worker") | ||
{ | ||
gcWorkerSsp := &core.ServiceSafePoint{ | ||
ServiceID: "gc_worker", | ||
ExpiredAt: -12345, | ||
SafePoint: 10, | ||
} | ||
value, err := json.Marshal(gcWorkerSsp) | ||
c.Assert(err, IsNil) | ||
err = s.srv.GetStorage().Save(gcWorkerKey, string(value)) | ||
c.Assert(err, IsNil) | ||
} | ||
|
||
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) | ||
c.Assert(err, IsNil) | ||
c.Assert(minSsp.ServiceID, Equals, "gc_worker") | ||
c.Assert(minSsp.SafePoint, Equals, uint64(10)) | ||
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64)) | ||
|
||
// Force delete gc_worker, then the min service safepoint is 11 of "a". | ||
err = s.srv.GetStorage().Remove(gcWorkerKey) | ||
c.Assert(err, IsNil) | ||
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) | ||
c.Assert(err, IsNil) | ||
c.Assert(minSsp.SafePoint, Equals, uint64(11)) | ||
// After calling LoadMinServiceGCS when "gc_worker"'s service safepoint is missing, "gc_worker"'s service safepoint | ||
// will be newly created. | ||
// Increase "a" so that "gc_worker" is the only minimum that will be returned by LoadMinServiceGCSafePoint. | ||
_, err = s.client.UpdateServiceGCSafePoint(context.Background(), | ||
"a", 1000, 14) | ||
c.Assert(err, IsNil) | ||
|
||
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now()) | ||
c.Assert(err, IsNil) | ||
c.Assert(minSsp.ServiceID, Equals, "gc_worker") | ||
c.Assert(minSsp.SafePoint, Equals, uint64(11)) | ||
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64)) | ||
} | ||
|
||
func (s *testClientSuite) TestScatterRegion(c *C) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.