Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulation of applier logic: Move Txn related code out of applier.go. #13878

Merged
merged 18 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

e.cfg.logger.Info(
"now serving peer/client/metrics",
zap.String("local-member-id", e.Server.ID().String()),
zap.String("local-member-id", e.Server.MemberId().String()),
zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
Expand Down
5 changes: 2 additions & 3 deletions server/etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/osutil"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"

"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -127,7 +126,7 @@ func startEtcdOrProxyV2(args []string) {
}

if err != nil {
if derr, ok := err.(*etcdserver.DiscoveryError); ok {
if derr, ok := err.(*errors.DiscoveryError); ok {
switch derr.Err {
case v2discovery.ErrDuplicateID:
lg.Warn(
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *serverversion.DowngradeInfo {
}

func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions {
return getMembersVersions(s.lg, s.cluster, s.id, s.peerRt, s.Cfg.ReqTimeout())
return getMembersVersions(s.lg, s.cluster, s.MemberId(), s.peerRt, s.Cfg.ReqTimeout())
}

func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
Expand Down
3 changes: 2 additions & 1 deletion server/etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/lease/leasehttp"

"go.uber.org/zap"
Expand Down Expand Up @@ -142,7 +143,7 @@ func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ
http.Error(w, err.Error(), http.StatusNotFound)
case membership.ErrMemberNotLearner:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
case etcdserver.ErrLearnerNotReady:
case errors.ErrLearnerNotReady:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
default:
writeError(h.lg, w, r, err)
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/api/etcdhttp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package etcdhttp
import (
"net/http"

"go.etcd.io/etcd/server/v3/etcdserver"
httptypes "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -57,8 +57,8 @@ func writeError(lg *zap.Logger, w http.ResponseWriter, r *http.Request, err erro

default:
switch err {
case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost, etcdserver.ErrNotEnoughStartedMembers,
etcdserver.ErrUnhealthy:
case errors.ErrTimeoutDueToLeaderFail, errors.ErrTimeoutDueToConnectionLost, errors.ErrNotEnoughStartedMembers,
errors.ErrUnhealthy:
if lg != nil {
lg.Warn(
"v2 response error",
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/api/v3rpc/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ package v3rpc
import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/apply"
)

type header struct {
clusterID int64
memberID int64
sg etcdserver.RaftStatusGetter
sg apply.RaftStatusGetter
rev func() int64
}

func newHeader(s *etcdserver.EtcdServer) header {
return header{
clusterID: int64(s.Cluster().ID()),
memberID: int64(s.ID()),
memberID: int64(s.MemberId()),
sg: s,
rev: func() int64 { return s.KV().Rev() },
}
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return nil, rpctypes.ErrGRPCNotCapable
}

if s.IsMemberExist(s.ID()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
if s.IsMemberExist(s.MemberId()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
return nil, rpctypes.ErrGRPCNotSupportedForLearner
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNotCapable
}

if s.IsMemberExist(s.ID()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
if s.IsMemberExist(s.MemberId()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
return rpctypes.ErrGRPCNotSupportedForLearner
}

Expand Down
8 changes: 5 additions & 3 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/apply"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
Expand Down Expand Up @@ -69,7 +71,7 @@ type ClusterStatusGetter interface {

type maintenanceServer struct {
lg *zap.Logger
rg etcdserver.RaftStatusGetter
rg apply.RaftStatusGetter
kg KVGetter
bg BackendGetter
a Alarmer
Expand Down Expand Up @@ -241,7 +243,7 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
resp.StorageVersion = storageVersion.String()
}
if resp.Leader == raft.None {
resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error())
resp.Errors = append(resp.Errors, errors.ErrNoLeader.Error())
}
for _, a := range ms.a.Alarms() {
resp.Errors = append(resp.Errors, a.String())
Expand All @@ -250,7 +252,7 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
}

func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
if ms.rg.ID() != ms.rg.Leader() {
if ms.rg.MemberId() != ms.rg.Leader() {
return nil, rpctypes.ErrGRPCNotLeader
}

Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteR
}

func (cs *ClusterServer) header() *pb.ResponseHeader {
return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.server.Term()}
return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.MemberId()), RaftTerm: cs.server.Term()}
}

func membersToProtoMembers(membs []*membership.Member) []*pb.Member {
Expand Down
8 changes: 6 additions & 2 deletions server/etcdserver/api/v3rpc/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error {
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &quotaKVServer{
NewKVServer(s),
quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()},
quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberId()},
}
}

Expand Down Expand Up @@ -86,6 +86,10 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ
func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
return &quotaLeaseServer{
NewLeaseServer(s),
quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()},
quotaAlarmer{newBackendQuota(s, "lease"), s, s.MemberId()},
}
}

func newBackendQuota(s *etcdserver.EtcdServer, name string) storage.Quota {
return storage.NewBackendQuota(s.Logger(), s.Cfg.QuotaBackendBytes, s.Backend(), name)
}
68 changes: 34 additions & 34 deletions server/etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc"
Expand All @@ -32,39 +32,39 @@ import (
)

var toGRPCErrorMap = map[error]error{
membership.ErrIDRemoved: rpctypes.ErrGRPCMemberNotFound,
membership.ErrIDNotFound: rpctypes.ErrGRPCMemberNotFound,
membership.ErrIDExists: rpctypes.ErrGRPCMemberExist,
membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist,
membership.ErrMemberNotLearner: rpctypes.ErrGRPCMemberNotLearner,
membership.ErrTooManyLearners: rpctypes.ErrGRPCTooManyLearners,
etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
etcdserver.ErrLearnerNotReady: rpctypes.ErrGRPCLearnerNotReady,

mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted,
mvcc.ErrFutureRev: rpctypes.ErrGRPCFutureRev,
etcdserver.ErrRequestTooLarge: rpctypes.ErrGRPCRequestTooLarge,
etcdserver.ErrNoSpace: rpctypes.ErrGRPCNoSpace,
etcdserver.ErrTooManyRequests: rpctypes.ErrTooManyRequests,

etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
etcdserver.ErrStopped: rpctypes.ErrGRPCStopped,
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,

etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,
membership.ErrIDRemoved: rpctypes.ErrGRPCMemberNotFound,
membership.ErrIDNotFound: rpctypes.ErrGRPCMemberNotFound,
membership.ErrIDExists: rpctypes.ErrGRPCMemberExist,
membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist,
membership.ErrMemberNotLearner: rpctypes.ErrGRPCMemberNotLearner,
membership.ErrTooManyLearners: rpctypes.ErrGRPCTooManyLearners,
errors.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
errors.ErrLearnerNotReady: rpctypes.ErrGRPCLearnerNotReady,

mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted,
mvcc.ErrFutureRev: rpctypes.ErrGRPCFutureRev,
errors.ErrRequestTooLarge: rpctypes.ErrGRPCRequestTooLarge,
errors.ErrNoSpace: rpctypes.ErrGRPCNoSpace,
errors.ErrTooManyRequests: rpctypes.ErrTooManyRequests,

errors.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
errors.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
errors.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
errors.ErrStopped: rpctypes.ErrGRPCStopped,
errors.ErrTimeout: rpctypes.ErrGRPCTimeout,
errors.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
errors.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
errors.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
errors.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
errors.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
errors.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
errors.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,

errors.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
errors.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,

lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
Expand Down
7 changes: 4 additions & 3 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/apply"
"go.etcd.io/etcd/server/v3/storage/mvcc"

"go.uber.org/zap"
Expand All @@ -41,7 +42,7 @@ type watchServer struct {

maxRequestBytes int

sg etcdserver.RaftStatusGetter
sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
ag AuthGetter
}
Expand All @@ -52,7 +53,7 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
lg: s.Cfg.Logger,

clusterID: int64(s.Cluster().ID()),
memberID: int64(s.ID()),
memberID: int64(s.MemberId()),

maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes),

Expand Down Expand Up @@ -124,7 +125,7 @@ type serverWatchStream struct {

maxRequestBytes int

sg etcdserver.RaftStatusGetter
sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
ag AuthGetter

Expand Down
Loading