Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: add IsFeatureSupported in Cluster Interface #2735

Merged
merged 12 commits into from
Aug 13, 2020
5 changes: 3 additions & 2 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"time"

. "github.com/pingcap/check"

"github.com/pingcap/pd/v4/pkg/typeutil"
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/cluster"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/versioninfo"
)

var _ = Suite(&testConfigSuite{})
Expand Down Expand Up @@ -107,7 +108,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) {
cfg.Log.Level = "warn"
cfg.ReplicationMode.DRAutoSync.LabelKey = "foobar"
cfg.ReplicationMode.ReplicationMode = "dr-auto-sync"
v, err := cluster.ParseVersion("v4.0.0-beta")
v, err := versioninfo.ParseVersion("v4.0.0-beta")
c.Assert(err, IsNil)
cfg.ClusterVersion = *v
c.Assert(newCfg1, DeepEquals, cfg)
Expand Down
5 changes: 3 additions & 2 deletions server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/cluster"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/versioninfo"
)

var _ = Suite(&testOperatorSuite{})
Expand Down Expand Up @@ -143,7 +144,7 @@ func mustPutStore(c *C, svr *server.Server, id uint64, state metapb.StoreState,
Address: fmt.Sprintf("tikv%d", id),
State: state,
Labels: labels,
Version: (*cluster.MinSupportedVersion(cluster.Version2_0)).String(),
Version: versioninfo.MinSupportedVersion(versioninfo.Version2_0).String(),
},
})
c.Assert(err, IsNil)
Expand Down
20 changes: 11 additions & 9 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/pingcap/pd/v4/pkg/cache"
"github.com/pingcap/pd/v4/pkg/component"
"github.com/pingcap/pd/v4/pkg/etcdutil"
Expand All @@ -44,9 +48,7 @@ import (
"github.com/pingcap/pd/v4/server/schedule/placement"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pingcap/pd/v4/server/statistics"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"github.com/pingcap/pd/v4/server/versioninfo"
)

var backgroundJobInterval = 10 * time.Second
Expand Down Expand Up @@ -900,12 +902,12 @@ func (c *RaftCluster) PutStore(store *metapb.Store, force bool) error {
return errors.Errorf("invalid put store %v", store)
}

v, err := ParseVersion(store.GetVersion())
v, err := versioninfo.ParseVersion(store.GetVersion())
if err != nil {
return errors.Errorf("invalid put store %v, error: %s", store, err)
}
clusterVersion := *c.opt.GetClusterVersion()
if !IsCompatible(clusterVersion, *v) {
if !versioninfo.IsCompatible(clusterVersion, *v) {
return errors.Errorf("version should compatible with version %s, got %s", clusterVersion, v)
}

Expand Down Expand Up @@ -1298,7 +1300,7 @@ func (c *RaftCluster) OnStoreVersionChange() {
if s.IsTombstone() {
continue
}
v := MustParseVersion(s.GetVersion())
v := versioninfo.MustParseVersion(s.GetVersion())

if minVersion == nil || v.LessThan(*minVersion) {
minVersion = v
Expand Down Expand Up @@ -1330,12 +1332,12 @@ func (c *RaftCluster) changedRegionNotifier() <-chan *core.RegionInfo {
}

// IsFeatureSupported checks if the feature is supported by current cluster.
func (c *RaftCluster) IsFeatureSupported(f Feature) bool {
func (c *RaftCluster) IsFeatureSupported(f versioninfo.Feature) bool {
c.RLock()
defer c.RUnlock()
clusterVersion := *c.opt.GetClusterVersion()
minSupportVersion := *MinSupportedVersion(f)
return !clusterVersion.LessThan(minSupportVersion)
minSupportVersion := versioninfo.MinSupportedVersion(f)
return !clusterVersion.LessThan(*minSupportVersion)
}

// GetConfig gets config from cluster.
Expand Down
4 changes: 3 additions & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/pingcap/pd/v4/pkg/mock/mockid"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/id"
"github.com/pingcap/pd/v4/server/kv"
"github.com/pingcap/pd/v4/server/schedule/opt"
"github.com/pingcap/pd/v4/server/versioninfo"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -663,7 +665,7 @@ func newTestScheduleConfig() (*config.ScheduleConfig, *config.PersistOptions, er
return nil, nil, err
}
opt := config.NewPersistOptions(cfg)
opt.SetClusterVersion(MinSupportedVersion(Version2_0))
opt.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version2_0))
return &cfg.Schedule, opt, nil
}

Expand Down
10 changes: 6 additions & 4 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ package cluster
import (
"bytes"

"github.com/pkg/errors"
"go.uber.org/zap"

HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule"
"github.com/pingcap/pd/v4/server/schedulers"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/pingcap/pd/v4/server/versioninfo"
)

// HandleRegionHeartbeat processes RegionInfo reports from client.
Expand Down Expand Up @@ -66,7 +68,7 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
}
}

if c.IsFeatureSupported(RegionMerge) {
if c.IsFeatureSupported(versioninfo.RegionMerge) {
// Disable merge for the 2 regions in a period of time.
c.GetMergeChecker().RecordRegionSplit([]uint64{reqRegion.GetId(), newRegionID})
}
Expand Down Expand Up @@ -132,7 +134,7 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*
}

recordRegions = append(recordRegions, reqRegion.GetId())
if c.IsFeatureSupported(RegionMerge) {
if c.IsFeatureSupported(versioninfo.RegionMerge) {
// Disable merge the regions in a period of time.
c.GetMergeChecker().RecordRegionSplit(recordRegions)
}
Expand Down
8 changes: 5 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/server/cluster"
"github.com/pingcap/pd/v4/server/core"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/pd/v4/server/cluster"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/versioninfo"
)

const slowThreshold = 5 * time.Millisecond
Expand Down Expand Up @@ -551,7 +553,7 @@ func (s *Server) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitR
return &pdpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

if !rc.IsFeatureSupported(cluster.BatchSplit) {
if !rc.IsFeatureSupported(versioninfo.BatchSplit) {
return &pdpb.AskBatchSplitResponse{Header: s.incompatibleVersion("batch_split")}, nil
}
if request.GetRegion() == nil {
Expand Down
19 changes: 10 additions & 9 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/pkg/errors"
"github.com/urfave/negroni"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
"google.golang.org/grpc"

HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/pd/v4/pkg/etcdutil"
"github.com/pingcap/pd/v4/pkg/grpcutil"
"github.com/pingcap/pd/v4/pkg/logutil"
Expand All @@ -51,14 +60,6 @@ import (
"github.com/pingcap/pd/v4/server/schedule/opt"
"github.com/pingcap/pd/v4/server/tso"
"github.com/pingcap/pd/v4/server/versioninfo"
"github.com/pingcap/sysutil"
"github.com/pkg/errors"
"github.com/urfave/negroni"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -913,7 +914,7 @@ func (s *Server) GetLabelProperty() config.LabelPropertyConfig {

// SetClusterVersion sets the version of cluster.
func (s *Server) SetClusterVersion(v string) error {
version, err := cluster.ParseVersion(v)
version, err := versioninfo.ParseVersion(v)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/pd/v4/pkg/etcdutil"
"github.com/pingcap/pd/v4/pkg/typeutil"
"github.com/pingcap/pd/v4/server/cluster"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/versioninfo"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -69,9 +69,9 @@ func PrintConfigCheckMsg(cfg *config.Config) {

// CheckPDVersion checks if PD needs to be upgraded.
func CheckPDVersion(opt *config.PersistOptions) {
pdVersion := *cluster.MinSupportedVersion(cluster.Base)
pdVersion := versioninfo.MinSupportedVersion(versioninfo.Base)
if versioninfo.PDReleaseVersion != "None" {
pdVersion = *cluster.MustParseVersion(versioninfo.PDReleaseVersion)
pdVersion = versioninfo.MustParseVersion(versioninfo.PDReleaseVersion)
}
clusterVersion := *opt.GetClusterVersion()
log.Info("load cluster version", zap.Stringer("cluster-version", clusterVersion))
Expand Down
52 changes: 15 additions & 37 deletions server/cluster/version.go → server/versioninfo/feature.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 PingCAP, Inc.
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,12 +11,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster
package versioninfo

import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/log"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand All @@ -37,13 +36,22 @@ const (
// BatchSplit can speed up the region split.
// and PD will response the BatchSplit request.
BatchSplit
Version3_0
Version4_0
Version5_0
// JointConsensus can support safe conf change across data center.
JointConsensus
)

var featuresDict = map[Feature]string{
Base: "1.0.0",
Version2_0: "2.0.0",
RegionMerge: "2.0.0",
BatchSplit: "2.1.0-rc.1",
Base: "1.0.0",
Version2_0: "2.0.0",
RegionMerge: "2.0.0",
BatchSplit: "2.1.0-rc.1",
Version3_0: "3.0.0",
Version4_0: "4.0.0",
Version5_0: "5.0.0",
JointConsensus: "5.0.0",
}

// MinSupportedVersion returns the minimum support version for the specified feature.
Expand All @@ -55,33 +63,3 @@ func MinSupportedVersion(v Feature) *semver.Version {
version := MustParseVersion(target)
return version
}

// ParseVersion wraps semver.NewVersion and handles compatibility issues.
func ParseVersion(v string) (*semver.Version, error) {
// for compatibility with old version which not support `version` mechanism.
if v == "" {
return semver.New(featuresDict[Base]), nil
}
if v[0] == 'v' {
v = v[1:]
}
ver, err := semver.NewVersion(v)
return ver, errors.WithStack(err)
}

// MustParseVersion wraps ParseVersion and will panic if error is not nil.
func MustParseVersion(v string) *semver.Version {
ver, err := ParseVersion(v)
if err != nil {
log.Fatal("version string is illegal", zap.Error(err))
}
return ver
}

// IsCompatible checks if the clusterVersion is compatible with the specified version.
func IsCompatible(clusterVersion, v semver.Version) bool {
if clusterVersion.LessThan(v) {
return true
}
return clusterVersion.Major == v.Major && clusterVersion.Minor == v.Minor
}
37 changes: 37 additions & 0 deletions server/versioninfo/versioninfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@

package versioninfo

import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/log"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
// CommunityEdition is the default edition for building.
CommunityEdition = "Community"
Expand All @@ -26,3 +33,33 @@ var (
PDGitBranch = "None"
PDEdition = CommunityEdition
)

// ParseVersion wraps semver.NewVersion and handles compatibility issues.
func ParseVersion(v string) (*semver.Version, error) {
// for compatibility with old version which not support `version` mechanism.
if v == "" {
return semver.New(featuresDict[Base]), nil
}
if v[0] == 'v' {
v = v[1:]
}
ver, err := semver.NewVersion(v)
return ver, errors.WithStack(err)
}

// MustParseVersion wraps ParseVersion and will panic if error is not nil.
func MustParseVersion(v string) *semver.Version {
ver, err := ParseVersion(v)
if err != nil {
log.Fatal("version string is illegal", zap.Error(err))
}
return ver
}

// IsCompatible checks if the clusterVersion is compatible with the specified version.
func IsCompatible(clusterVersion, v semver.Version) bool {
if clusterVersion.LessThan(v) {
return true
}
return clusterVersion.Major == v.Major && clusterVersion.Minor == v.Minor
}
Loading