From 518b88d18c020aff1bc8a484529e1cf8e9920820 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Fri, 25 Oct 2024 03:32:37 +0000 Subject: [PATCH 1/3] Bump etcd to v3.5.16-k3s1 Signed-off-by: Brad Davidson --- go.mod | 16 ++++++++-------- go.sum | 32 ++++++++++++++++---------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index f32eedf0581c..11d7278e1528 100644 --- a/go.mod +++ b/go.mod @@ -21,14 +21,14 @@ replace ( github.com/prometheus/common => github.com/prometheus/common v0.55.0 github.com/spegel-org/spegel => github.com/k3s-io/spegel v0.0.23-0.20240516234953-f3d2c4072314 github.com/ugorji/go => github.com/ugorji/go v1.2.11 - go.etcd.io/etcd/api/v3 => github.com/k3s-io/etcd/api/v3 v3.5.13-k3s1 - go.etcd.io/etcd/client/pkg/v3 => github.com/k3s-io/etcd/client/pkg/v3 v3.5.13-k3s1 - go.etcd.io/etcd/client/v2 => github.com/k3s-io/etcd/client/v2 v2.305.13-k3s1 - go.etcd.io/etcd/client/v3 => github.com/k3s-io/etcd/client/v3 v3.5.13-k3s1 - go.etcd.io/etcd/etcdutl/v3 => github.com/k3s-io/etcd/etcdutl/v3 v3.5.13-k3s1 - go.etcd.io/etcd/pkg/v3 => github.com/k3s-io/etcd/pkg/v3 v3.5.13-k3s1 - go.etcd.io/etcd/raft/v3 => github.com/k3s-io/etcd/raft/v3 v3.5.13-k3s1 - go.etcd.io/etcd/server/v3 => github.com/k3s-io/etcd/server/v3 v3.5.13-k3s1 + go.etcd.io/etcd/api/v3 => github.com/k3s-io/etcd/api/v3 v3.5.16-k3s1 + go.etcd.io/etcd/client/pkg/v3 => github.com/k3s-io/etcd/client/pkg/v3 v3.5.16-k3s1 + go.etcd.io/etcd/client/v2 => github.com/k3s-io/etcd/client/v2 v2.305.16-k3s1 + go.etcd.io/etcd/client/v3 => github.com/k3s-io/etcd/client/v3 v3.5.16-k3s1 + go.etcd.io/etcd/etcdutl/v3 => github.com/k3s-io/etcd/etcdutl/v3 v3.5.16-k3s1 + go.etcd.io/etcd/pkg/v3 => github.com/k3s-io/etcd/pkg/v3 v3.5.16-k3s1 + go.etcd.io/etcd/raft/v3 => github.com/k3s-io/etcd/raft/v3 v3.5.16-k3s1 + go.etcd.io/etcd/server/v3 => github.com/k3s-io/etcd/server/v3 v3.5.16-k3s1 go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful => go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful v0.44.0 golang.org/x/crypto => golang.org/x/crypto v0.24.0 golang.org/x/net => golang.org/x/net v0.26.0 diff --git a/go.sum b/go.sum index bd68b1b1548a..5b9988eeb74a 100644 --- a/go.sum +++ b/go.sum @@ -948,22 +948,22 @@ github.com/k3s-io/cri-dockerd v0.3.15-k3s1.31-3 h1:TH3zSbIM9zSZMeWKcWjQqeja3FsmJ github.com/k3s-io/cri-dockerd v0.3.15-k3s1.31-3/go.mod h1:ny6wyM7fqfew5FABQ+MtKaU07rhsHhlXr/jtFsA2m8Y= github.com/k3s-io/cri-tools v1.31.0-k3s2 h1:nekOdJe5Hecm+C5eswg688uXTI0enUZOJYadmyU9pYw= github.com/k3s-io/cri-tools v1.31.0-k3s2/go.mod h1:PvPf/fN5FiNdK1v43jCydRNRw6631qGTSEOhv/OsjYU= -github.com/k3s-io/etcd/api/v3 v3.5.13-k3s1 h1:aq6fxlEKdwCooLE3HOR6227U51DEvOw3DEbriJxD2QM= -github.com/k3s-io/etcd/api/v3 v3.5.13-k3s1/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c= -github.com/k3s-io/etcd/client/pkg/v3 v3.5.13-k3s1 h1:t2I25UtBvohVAhlyXpYjd/Lznm+ybxNhvs3cnEGsF4Y= -github.com/k3s-io/etcd/client/pkg/v3 v3.5.13-k3s1/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8= -github.com/k3s-io/etcd/client/v2 v2.305.13-k3s1 h1:lvIdlAI6xRIHSUJC43sJx9lmxehq2quGb+8z5TJldGg= -github.com/k3s-io/etcd/client/v2 v2.305.13-k3s1/go.mod h1:iQnL7fepbiomdXMb3om1rHq96htNNGv2sJkEcZGDRRg= -github.com/k3s-io/etcd/client/v3 v3.5.13-k3s1 h1:/D6KAEGVzwivnjxZ5CzVIykVloLoKB/TBeKw2tKKVQ0= -github.com/k3s-io/etcd/client/v3 v3.5.13-k3s1/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI= -github.com/k3s-io/etcd/etcdutl/v3 v3.5.13-k3s1 h1:fIt+PVHCeINM5fl9OfMI+o9BJKf951pRiVcCytFW97c= -github.com/k3s-io/etcd/etcdutl/v3 v3.5.13-k3s1/go.mod h1:2vhvTIQobP+Cb04qzlcbKGvX6J5oq/N1kquk1yCDIQY= -github.com/k3s-io/etcd/pkg/v3 v3.5.13-k3s1 h1:uLU/SnBuhtSkdBk830x0pseHSsQQvh99C3deG6nc9d0= -github.com/k3s-io/etcd/pkg/v3 v3.5.13-k3s1/go.mod h1:N+4PLrp7agI/Viy+dUYpX7iRtSPvKq+w8Y14d1vX+m0= -github.com/k3s-io/etcd/raft/v3 v3.5.13-k3s1 h1:yexUwAPPdmYfIMWOj6sSyJ2nEe8QOrFzNuvYGRAsm5E= -github.com/k3s-io/etcd/raft/v3 v3.5.13-k3s1/go.mod h1:uUFibGLn2Ksm2URMxN1fICGhk8Wu96EfDQyuLhAcAmw= -github.com/k3s-io/etcd/server/v3 v3.5.13-k3s1 h1:Pqcxkg7V60c26ZpHoekP9QoUdLuduxFn827A/5CIwm4= -github.com/k3s-io/etcd/server/v3 v3.5.13-k3s1/go.mod h1:K/8nbsGupHqmr5MkgaZpLlH1QdX1pcNQLAkODy44XcQ= +github.com/k3s-io/etcd/api/v3 v3.5.16-k3s1 h1:RNExemPFr4S+VqJ2jXVf0Y9iXaps0pTeklSN735z0Mw= +github.com/k3s-io/etcd/api/v3 v3.5.16-k3s1/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28= +github.com/k3s-io/etcd/client/pkg/v3 v3.5.16-k3s1 h1:wEmVFnZ+h3v5ECRmX6jf4SeIykDQei+DRnBczM23YQA= +github.com/k3s-io/etcd/client/pkg/v3 v3.5.16-k3s1/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E= +github.com/k3s-io/etcd/client/v2 v2.305.16-k3s1 h1:f7qWAqVhxrMdBt0coehUYfP0Cix7clL2ko/XCqvWols= +github.com/k3s-io/etcd/client/v2 v2.305.16-k3s1/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE= +github.com/k3s-io/etcd/client/v3 v3.5.16-k3s1 h1:ON2Cd0Fx+qQ53GS6qK6Mr9fh7MFCShZfw+rsrLZ6j5M= +github.com/k3s-io/etcd/client/v3 v3.5.16-k3s1/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50= +github.com/k3s-io/etcd/etcdutl/v3 v3.5.16-k3s1 h1:qUuDUhfhOZ3D6/XsW04jV514+DhV7R669+/+3n9i7VY= +github.com/k3s-io/etcd/etcdutl/v3 v3.5.16-k3s1/go.mod h1:X22QojXcHZNS3TPAitpcYW7rwTvnmchFwAKkSSz0Ncw= +github.com/k3s-io/etcd/pkg/v3 v3.5.16-k3s1 h1:4nDx3la68jehJfqWPs1Yx1clPW7938pKQXrVxp2OgyA= +github.com/k3s-io/etcd/pkg/v3 v3.5.16-k3s1/go.mod h1:+lutCZHG5MBBFI/U4eYT5yL7sJfnexsoM20Y0t2uNuY= +github.com/k3s-io/etcd/raft/v3 v3.5.16-k3s1 h1:nD/YzAeIbEcSkXAQFRwAs/2zc2vXAkKmQnDKf6UDCxY= +github.com/k3s-io/etcd/raft/v3 v3.5.16-k3s1/go.mod h1:P4UP14AxofMJ/54boWilabqqWoW9eLodl6I5GdGzazI= +github.com/k3s-io/etcd/server/v3 v3.5.16-k3s1 h1:9c0DChFw6WRz6r+eCuVLBltZcRwT6h1l79biTPuAGR0= +github.com/k3s-io/etcd/server/v3 v3.5.16-k3s1/go.mod h1:ynhyZZpdDp1Gq49jkUg5mfkDWZwXnn3eIqCqtJnrD/s= github.com/k3s-io/helm-controller v0.16.5 h1:SsUHfksQXNwePkswv4a970EGD2h0Exsf6t3IdXhpXRo= github.com/k3s-io/helm-controller v0.16.5/go.mod h1:AcSxEhOIUgeVvBTnJOAwcezBZXtYew/RhKwO5xp3RlM= github.com/k3s-io/kine v0.13.2 h1:l++g2KY/3UaPJiGpgYuGoqaaYKeMpVj9fP/yfnSxHxo= From 32489704d714909e0f9df357f1670a429f924d6b Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 24 Oct 2024 00:16:34 +0000 Subject: [PATCH 2/3] Fix issues with defragment and alarm clear on etcd startup * Use clientv3.NewCtxClient instead of New to avoid automatic retry of all RPCs * Only timeout status requests; allow defrag and alarm clear requests to run to completion. * Only clear alarms on the local cluster member, not ALL cluster members Signed-off-by: Brad Davidson --- go.mod | 4 +- pkg/etcd/etcd.go | 179 +++++++++++++++++++++++++++---------------- pkg/etcd/resolver.go | 80 +++++++++++++++++++ pkg/etcd/snapshot.go | 4 +- 4 files changed, 195 insertions(+), 72 deletions(-) create mode 100644 pkg/etcd/resolver.go diff --git a/go.mod b/go.mod index 11d7278e1528..2938d6367334 100644 --- a/go.mod +++ b/go.mod @@ -134,9 +134,11 @@ require ( github.com/vishvananda/netlink v1.2.1-beta.2 github.com/yl2chen/cidranger v1.0.2 go.etcd.io/etcd/api/v3 v3.5.16 + go.etcd.io/etcd/client/pkg/v3 v3.5.16 go.etcd.io/etcd/client/v3 v3.5.16 go.etcd.io/etcd/etcdutl/v3 v3.5.13 go.etcd.io/etcd/server/v3 v3.5.16 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.27.0 golang.org/x/net v0.28.0 golang.org/x/sync v0.8.0 @@ -422,7 +424,6 @@ require ( github.com/xlab/treeprint v1.2.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.etcd.io/bbolt v1.3.11 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect go.etcd.io/etcd/client/v2 v2.305.16 // indirect go.etcd.io/etcd/pkg/v3 v3.5.16 // indirect go.etcd.io/etcd/raft/v3 v3.5.16 // indirect @@ -444,7 +445,6 @@ require ( go.uber.org/fx v1.20.1 // indirect go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 34eafdd7c070..dfcce5433658 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -41,8 +41,15 @@ import ( "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/logutil" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/credentials" snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -55,7 +62,7 @@ import ( ) const ( - testTimeout = time.Second * 30 + statusTimeout = time.Second * 30 manageTickerTime = time.Second * 15 learnerMaxStallTime = time.Minute * 5 memberRemovalTimeout = time.Minute * 1 @@ -206,35 +213,40 @@ func (e *ETCD) Test(ctx context.Context) error { return errors.New("etcd datastore is not started") } - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - endpoints := getEndpoints(e.config) - status, err := e.client.Status(ctx, endpoints[0]) + status, err := e.status(ctx) if err != nil { - return err + return errors.Wrap(err, "failed to get etcd status") + } else if status.IsLearner { + return errors.New("this server has not yet been promoted from learner to voting member") + } else if status.Leader == 0 { + return etcdserver.ErrNoLeader } - if status.IsLearner { - return errors.New("this server has not yet been promoted from learner to voting member") + logrus.Infof("Connected to etcd v%s - datastore using %d of %d bytes", status.Version, status.DbSizeInUse, status.DbSize) + if len(status.Errors) > 0 { + logrus.Warnf("Errors present on etcd cluster: %s", strings.Join(status.Errors, ",")) } + // defrag this node to reclaim freed space from compacted revisions if err := e.defragment(ctx); err != nil { return errors.Wrap(err, "failed to defragment etcd database") } - if err := e.clearAlarms(ctx); err != nil { - return errors.Wrap(err, "failed to report and disarm etcd alarms") + // clear alarms on this node + if err := e.clearAlarms(ctx, status.Header.MemberId); err != nil { + return errors.Wrap(err, "failed to disarm etcd alarms") } - // refresh status to see if any errors remain after clearing alarms - status, err = e.client.Status(ctx, endpoints[0]) + // refresh status - note that errors may remain on other nodes, but this + // should not prevent us from continuing with startup. + status, err = e.status(ctx) if err != nil { - return err + return errors.Wrap(err, "failed to get etcd status") } + logrus.Infof("Datastore using %d of %d bytes after defragment", status.DbSizeInUse, status.DbSize) if len(status.Errors) > 0 { - return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", ")) + logrus.Warnf("Errors present on etcd cluster after defragment: %s", strings.Join(status.Errors, ",")) } members, err := e.client.MemberList(ctx) @@ -242,6 +254,7 @@ func (e *ETCD) Test(ctx context.Context) error { return err } + // Ensure that there is a cluster member with our peerURL and name var memberNameUrls []string for _, member := range members.Members { for _, peerURL := range member.PeerURLs { @@ -253,6 +266,8 @@ func (e *ETCD) Test(ctx context.Context) error { memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0]) } } + + // no matching PeerURL on any Member, return an error that indicates what was expected vs what we found. return &membershipError{members: memberNameUrls, self: e.name + "=" + e.peerURL()} } @@ -523,7 +538,7 @@ func (e *ETCD) startClient(ctx context.Context) error { e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey - client, err := getClient(ctx, e.config, endpoints...) + client, conn, err := getClient(ctx, e.config, endpoints...) if err != nil { return err } @@ -531,9 +546,8 @@ func (e *ETCD) startClient(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() return nil @@ -554,11 +568,11 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er return err } - client, err := getClient(clientCtx, e.config, clientURLs...) + client, conn, err := getClient(clientCtx, e.config, clientURLs...) if err != nil { return err } - defer client.Close() + defer conn.Close() for _, member := range memberList.Members { for _, peer := range member.PeerURLs { @@ -725,13 +739,53 @@ func (e *ETCD) infoHandler() http.Handler { // If the runtime config does not list any endpoints, the default endpoint is used. // The returned client should be closed when no longer needed, in order to avoid leaking GRPC // client goroutines. -func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, error) { +func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, *grpc.ClientConn, error) { + logger, err := logutil.CreateDefaultZapLogger(zapcore.DebugLevel) + if err != nil { + return nil, nil, err + } + cfg, err := getClientConfig(ctx, control, endpoints...) if err != nil { - return nil, err + return nil, nil, err + } + + // Set up dialer and resolver options. + // This is normally handled by clientv3.New() but that wraps all the GRPC + // service with retry handlers and uses deprecated grpc.DialContext() which + // tries to establish a connection even when one isn't wanted. + if cfg.DialKeepAliveTime > 0 { + params := keepalive.ClientParameters{ + Time: cfg.DialKeepAliveTime, + Timeout: cfg.DialKeepAliveTimeout, + PermitWithoutStream: cfg.PermitWithoutStream, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithKeepaliveParams(params)) + } + + if cfg.TLS != nil { + creds := credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials() + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(creds)) + } else { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) } - return clientv3.New(*cfg) + cfg.DialOptions = append(cfg.DialOptions, grpc.WithResolvers(NewSimpleResolver(cfg.Endpoints[0]))) + + target := fmt.Sprintf("%s://%p/%s", scheme, cfg, authority(cfg.Endpoints[0])) + conn, err := grpc.NewClient(target, cfg.DialOptions...) + if err != nil { + return nil, nil, err + } + + // Create a new client and wire up the GRPC service interfaces. + // Ref: https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/client.go#L87 + client := clientv3.NewCtxClient(ctx, clientv3.WithZapLogger(logger.Named(version.Program+"-etcd-client"))) + client.Cluster = clientv3.NewClusterFromClusterClient(etcdserverpb.NewClusterClient(conn), client) + client.KV = clientv3.NewKVFromKVClient(etcdserverpb.NewKVClient(conn), client) + client.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(etcdserverpb.NewMaintenanceClient(conn), client) + + return client, conn, nil } // getClientConfig generates an etcd client config connected to the specified endpoints. @@ -851,11 +905,11 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error { } defer sqliteClient.Close() - etcdClient, err := getClient(ctx, e.config) + etcdClient, conn, err := getClient(ctx, e.config) if err != nil { return err } - defer etcdClient.Close() + defer conn.Close() values, err := sqliteClient.List(ctx, "/registry/", 0) if err != nil { @@ -984,7 +1038,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { return errors.New("etcd datastore already started") } - client, err := getClient(ctx, e.config) + client, conn, err := getClient(ctx, e.config) if err != nil { return err } @@ -992,9 +1046,8 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil { @@ -1251,8 +1304,6 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre } func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) { - ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() resp, err := e.client.Status(ctx, url) if err != nil { return resp, errors.Wrap(err, "failed to check etcd member status") @@ -1363,12 +1414,10 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress) return err } -// clearAlarms checks for any alarms on the local etcd member. If found, they are -// reported and the alarm state is cleared. -func (e *ETCD) clearAlarms(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - +// clearAlarms checks for any NOSPACE alarms on the local etcd member. +// If found, they are reported and the alarm state is cleared. +// Other alarm types are not handled. +func (e *ETCD) clearAlarms(ctx context.Context, memberID uint64) error { if e.client == nil { return errors.New("etcd client was nil") } @@ -1379,22 +1428,37 @@ func (e *ETCD) clearAlarms(ctx context.Context) error { } for _, alarm := range alarmList.Alarms { - logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm) - } - - if len(alarmList.Alarms) > 0 { - if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil { - return fmt.Errorf("etcd alarm disarm failed: %v", err) + if alarm.MemberID != memberID { + // ignore alarms on other cluster members, they should manage their own problems + continue + } + if alarm.Alarm == etcdserverpb.AlarmType_NOSPACE { + if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{MemberID: alarm.MemberID, Alarm: alarm.Alarm}); err != nil { + return fmt.Errorf("%s disarm failed: %v", alarm.Alarm, err) + } + logrus.Infof("%s disarmed successfully", alarm.Alarm) + } else { + return fmt.Errorf("%s alarm must be disarmed manually", alarm.Alarm) } - logrus.Infof("Alarms disarmed on etcd server") } return nil } -func (e *ETCD) defragment(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) +// status returns status using the first etcd endpoint. +func (e *ETCD) status(ctx context.Context) (*clientv3.StatusResponse, error) { + if e.client == nil { + return nil, errors.New("etcd client was nil") + } + + ctx, cancel := context.WithTimeout(ctx, statusTimeout) defer cancel() + endpoints := getEndpoints(e.config) + return e.client.Status(ctx, endpoints[0]) +} + +// defragment defragments the etcd datastore using the first etcd endpoint +func (e *ETCD) defragment(ctx context.Context) error { if e.client == nil { return errors.New("etcd client was nil") } @@ -1550,11 +1614,11 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) // GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd // and unmarshal it to a list of apiserver endpoints. func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) { - cl, err := getClient(ctx, cfg) + cl, conn, err := getClient(ctx, cfg) if err != nil { return nil, err } - defer cl.Close() + defer conn.Close() etcdResp, err := cl.KV.Get(ctx, AddressKey) if err != nil { @@ -1576,9 +1640,6 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin // GetMembersClientURLs will list through the member lists in etcd and return // back a combined list of client urls for each member in the cluster func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - members, err := e.client.MemberList(ctx) if err != nil { return nil, err @@ -1593,24 +1654,6 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { return clientURLs, nil } -// GetMembersNames will list through the member lists in etcd and return -// back a combined list of member names -func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - members, err := e.client.MemberList(ctx) - if err != nil { - return nil, err - } - - var memberNames []string - for _, member := range members.Members { - memberNames = append(memberNames, member.Name) - } - return memberNames, nil -} - // RemoveSelf will remove the member if it exists in the cluster. This should // only be called on a node that may have previously run etcd, but will not // currently run etcd, to ensure that it is not a member of the cluster. diff --git a/pkg/etcd/resolver.go b/pkg/etcd/resolver.go new file mode 100644 index 000000000000..b95242cbfa91 --- /dev/null +++ b/pkg/etcd/resolver.go @@ -0,0 +1,80 @@ +package etcd + +import ( + "net/url" + "path" + "strings" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" +) + +const scheme = "etcd-endpoint" + +type EtcdSimpleResolver struct { + *manual.Resolver + endpoint string +} + +// Cribbed from https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/internal/resolver/resolver.go +// but only supports a single fixed endpoint. We use this instead of the internal etcd client resolver +// because the agent loadbalancer handles failover and we don't want etcd or grpc's special behavior. +func NewSimpleResolver(endpoint string) *EtcdSimpleResolver { + r := manual.NewBuilderWithScheme(scheme) + return &EtcdSimpleResolver{Resolver: r, endpoint: endpoint} +} + +func (r *EtcdSimpleResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + res, err := r.Resolver.Build(target, cc, opts) + if err != nil { + return nil, err + } + + if r.CC != nil { + addr, serverName := interpret(r.endpoint) + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr, ServerName: serverName}}, + }) + } + + return res, nil +} + +func interpret(ep string) (string, string) { + if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") { + if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") { + _, absolutePath, _ := strings.Cut(ep, "://") + return "unix://" + absolutePath, path.Base(absolutePath) + } + if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") { + _, localPath, _ := strings.Cut(ep, "://") + return "unix:" + localPath, path.Base(localPath) + } + _, localPath, _ := strings.Cut(ep, ":") + return "unix:" + localPath, path.Base(localPath) + } + if strings.Contains(ep, "://") { + url, err := url.Parse(ep) + if err != nil { + return ep, ep + } + if url.Scheme == "http" || url.Scheme == "https" { + return url.Host, url.Host + } + return ep, url.Host + } + return ep, ep +} + +func authority(ep string) string { + if _, authority, ok := strings.Cut(ep, "://"); ok { + return authority + } + if suff, ok := strings.CutPrefix(ep, "unix:"); ok { + return suff + } + if suff, ok := strings.CutPrefix(ep, "unixs:"); ok { + return suff + } + return ep +} diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 8f844981f5d9..90919c2403af 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -27,7 +27,7 @@ import ( "github.com/pkg/errors" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" - snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + snapshotv3 "go.etcd.io/etcd/client/v3/snapshot" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -243,7 +243,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) { var sf *snapshot.File - if err := snapshotv3.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil { + if err := snapshotv3.Save(ctx, e.client.GetLogger(), *cfg, snapshotPath); err != nil { sf = &snapshot.File{ Name: snapshotName, Location: "", From 09cea436c9775fe413844aa1ec8dda4b2f69ba0e Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 24 Oct 2024 22:21:44 +0000 Subject: [PATCH 3/3] Add tests for ETCD.Test() Signed-off-by: Brad Davidson --- pkg/etcd/etcd_test.go | 523 ++++++++++++++++++++++++++++++++++++++++-- tests/unit.go | 4 + 2 files changed, 507 insertions(+), 20 deletions(-) diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 5a519bdcffe4..f875a24ad1b7 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "path/filepath" + "sync" "testing" "time" @@ -15,11 +16,23 @@ import ( testutil "github.com/k3s-io/k3s/tests" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" + "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" utilnet "k8s.io/apimachinery/pkg/util/net" ) +func init() { + logrus.SetLevel(logrus.DebugLevel) +} + func mustGetAddress() string { ipAddr, err := utilnet.ChooseHostInterface() if err != nil { @@ -76,7 +89,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { wantErr bool }{ { - name: "Directory exists", + name: "directory exists", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -95,7 +108,7 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { want: true, }, { - name: "Directory does not exist", + name: "directory does not exist", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -117,9 +130,6 @@ func Test_UnitETCD_IsInitialized(t *testing.T) { }, } - // enable logging - logrus.SetLevel(logrus.DebugLevel) - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { e := NewETCD() @@ -159,7 +169,7 @@ func Test_UnitETCD_Register(t *testing.T) { wantErr bool }{ { - name: "Call Register with standard config", + name: "standard config", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -174,7 +184,7 @@ func Test_UnitETCD_Register(t *testing.T) { }, }, { - name: "Call Register with a tombstone file created", + name: "with a tombstone file created", args: args{ ctx: context.TODO(), config: generateTestConfig(), @@ -249,7 +259,7 @@ func Test_UnitETCD_Start(t *testing.T) { wantErr bool }{ { - name: "Start etcd without clientAccessInfo and without snapshots", + name: "nil clientAccessInfo and nil cron", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -266,17 +276,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd without clientAccessInfo on", + name: "nil clientAccessInfo", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -293,17 +304,18 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, { - name: "Start etcd with an existing cluster", + name: "existing cluster", fields: fields{ config: generateTestConfig(), address: mustGetAddress(), @@ -322,13 +334,14 @@ func Test_UnitETCD_Start(t *testing.T) { }, teardown: func(e *ETCD, ctxInfo *contextInfo) error { // RemoveSelf will fail with a specific error, but it still does cleanup for testing purposes - if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { - return err - } + err := e.RemoveSelf(ctxInfo.ctx) ctxInfo.cancel() time.Sleep(5 * time.Second) testutil.CleanupDataDir(e.config) os.Remove(walDir(e.config)) + if err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() { + return err + } return nil }, }, @@ -353,8 +366,478 @@ func Test_UnitETCD_Start(t *testing.T) { } if err := tt.teardown(e, &tt.fields.context); err != nil { t.Errorf("Teardown for ETCD.Start() failed = %v", err) + } + }) + } +} + +func Test_UnitETCD_Test(t *testing.T) { + type contextInfo struct { + ctx context.Context + cancel context.CancelFunc + } + type fields struct { + context contextInfo + client *clientv3.Client + config *config.Control + name string + address string + } + type args struct { + clientAccessInfo *clientaccess.Info + } + tests := []struct { + name string + fields fields + setup func(e *ETCD, ctxInfo *contextInfo) error + teardown func(e *ETCD, ctxInfo *contextInfo) error + wantErr bool + }{ + { + name: "no server running", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "unreachable server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + e.config.Runtime.EtcdConfig.Endpoints = []string{"https://192.0.2.0:2379"} // RFC5737 + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "learner server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, true, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "corrupt server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, true, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "leaderless server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, true, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: true, + }, + { + name: "normal server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "alarm on other server", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + extraAlarm := &etcdserverpb.AlarmMember{MemberID: 2, Alarm: etcdserverpb.AlarmType_NOSPACE} + if err := startMock(ctxInfo.ctx, e, false, false, false, time.Second, extraAlarm); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + { + name: "slow defrag", + fields: fields{ + config: generateTestConfig(), + address: mustGetAddress(), + name: "default", + }, + setup: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background()) + testutil.GenerateRuntime(e.config) + if err := startMock(ctxInfo.ctx, e, false, false, false, 40*time.Second); err != nil { + return err + } + return e.startClient(ctxInfo.ctx) + }, + teardown: func(e *ETCD, ctxInfo *contextInfo) error { + ctxInfo.cancel() + time.Sleep(1 * time.Second) + testutil.CleanupDataDir(e.config) + return nil + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &ETCD{ + client: tt.fields.client, + config: tt.fields.config, + name: tt.fields.name, + address: tt.fields.address, + } + + if err := tt.setup(e, &tt.fields.context); err != nil { + t.Errorf("Setup for ETCD.Test() failed = %v", err) return } + start := time.Now() + err := e.Test(tt.fields.context.ctx) + duration := time.Now().Sub(start) + t.Logf("ETCD.Test() completed in %v with err=%v", duration, err) + if (err != nil) != tt.wantErr { + t.Errorf("ETCD.Test() error = %v, wantErr %v", err, tt.wantErr) + } + if err := tt.teardown(e, &tt.fields.context); err != nil { + t.Errorf("Teardown for ETCD.Test() failed = %v", err) + } }) } } + +// startMock starts up a mock etcd grpc service with canned responses +// that can be used to test specific scenarios. +func startMock(ctx context.Context, e *ETCD, isLearner, isCorrupt, noLeader bool, defragDelay time.Duration, extraAlarms ...*etcdserverpb.AlarmMember) error { + address := authority(getEndpoints(e.config)[0]) + // listen on endpoint and close listener on context cancel + listener, err := net.Listen("tcp", address) + if err != nil { + return err + } + + // set up tls if enabled + gopts := []grpc.ServerOption{} + if e.config.Datastore.ServerTLSConfig.CertFile != "" && e.config.Datastore.ServerTLSConfig.KeyFile != "" { + creds, err := credentials.NewServerTLSFromFile(e.config.Datastore.ServerTLSConfig.CertFile, e.config.Datastore.ServerTLSConfig.KeyFile) + if err != nil { + return err + } + gopts = append(gopts, grpc.Creds(creds)) + } + server := grpc.NewServer(gopts...) + + mock := &mockEtcd{ + e: e, + mu: &sync.RWMutex{}, + isLearner: isLearner, + isCorrupt: isCorrupt, + noLeader: noLeader, + defragDelay: defragDelay, + extraAlarms: extraAlarms, + } + + // register grpc services + etcdserverpb.RegisterKVServer(server, mock) + etcdserverpb.RegisterClusterServer(server, mock) + etcdserverpb.RegisterMaintenanceServer(server, mock) + + hsrv := health.NewServer() + hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(server, hsrv) + + reflection.Register(server) + + // shutdown on context cancel + go func() { + <-ctx.Done() + server.GracefulStop() + listener.Close() + }() + + // start serving + go func() { + logrus.Infof("Mock etcd server starting on %s", listener.Addr()) + logrus.Infof("Mock etcd server exited: %v", server.Serve(listener)) + }() + + return nil +} + +type mockEtcd struct { + e *ETCD + mu *sync.RWMutex + calls map[string]int + isLearner bool + isCorrupt bool + noLeader bool + defragDelay time.Duration + extraAlarms []*etcdserverpb.AlarmMember +} + +// increment call counter for this function +func (m *mockEtcd) inc(call string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.calls == nil { + m.calls = map[string]int{} + } + m.calls[call] = m.calls[call] + 1 +} + +// get call counter for this function +func (m *mockEtcd) get(call string) int { + m.mu.RLock() + defer m.mu.RUnlock() + return m.calls[call] +} + +// get alarm list +func (m *mockEtcd) alarms() []*etcdserverpb.AlarmMember { + alarms := m.extraAlarms + if m.get("alarm") < 2 { + // on the first check, return NOSPACE so that we can clear it after defragging + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_NOSPACE, + MemberID: 1, + }) + } + if m.isCorrupt { + // return CORRUPT if so requested + alarms = append(alarms, &etcdserverpb.AlarmMember{ + Alarm: etcdserverpb.AlarmType_CORRUPT, + MemberID: 1, + }) + } + return alarms +} + +// KV mocks +func (m *mockEtcd) Range(context.Context, *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) { + m.inc("range") + return nil, unsupported("range") +} +func (m *mockEtcd) Put(context.Context, *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) { + m.inc("put") + return nil, unsupported("put") +} +func (m *mockEtcd) DeleteRange(context.Context, *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) { + m.inc("deleterange") + return nil, unsupported("deleterange") +} +func (m *mockEtcd) Txn(context.Context, *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { + m.inc("txn") + return nil, unsupported("txn") +} +func (m *mockEtcd) Compact(context.Context, *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) { + m.inc("compact") + return nil, unsupported("compact") +} + +// Maintenance mocks +func (m *mockEtcd) Alarm(ctx context.Context, r *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { + m.inc("alarm") + res := &etcdserverpb.AlarmResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + } + if r.Action == etcdserverpb.AlarmRequest_GET { + res.Alarms = m.alarms() + } + return res, nil +} +func (m *mockEtcd) Status(context.Context, *etcdserverpb.StatusRequest) (*etcdserverpb.StatusResponse, error) { + m.inc("status") + res := &etcdserverpb.StatusResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Leader: 1, + Version: "v3.5.0-mock0", + DbSize: 1024, + DbSizeInUse: 512, + IsLearner: m.isLearner, + } + if m.noLeader { + res.Leader = 0 + res.Errors = append(res.Errors, etcdserver.ErrNoLeader.Error()) + } + for _, a := range m.alarms() { + res.Errors = append(res.Errors, a.String()) + } + return res, nil +} +func (m *mockEtcd) Defragment(ctx context.Context, r *etcdserverpb.DefragmentRequest) (*etcdserverpb.DefragmentResponse, error) { + m.inc("defragment") + // delay defrag response by configured time, or until the request is cancelled + select { + case <-ctx.Done(): + case <-time.After(m.defragDelay): + } + return &etcdserverpb.DefragmentResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + }, nil +} +func (m *mockEtcd) Hash(context.Context, *etcdserverpb.HashRequest) (*etcdserverpb.HashResponse, error) { + m.inc("hash") + return nil, unsupported("hash") +} +func (m *mockEtcd) HashKV(context.Context, *etcdserverpb.HashKVRequest) (*etcdserverpb.HashKVResponse, error) { + m.inc("hashkv") + return nil, unsupported("hashkv") +} +func (m *mockEtcd) Snapshot(*etcdserverpb.SnapshotRequest, etcdserverpb.Maintenance_SnapshotServer) error { + m.inc("snapshot") + return unsupported("snapshot") +} +func (m *mockEtcd) MoveLeader(context.Context, *etcdserverpb.MoveLeaderRequest) (*etcdserverpb.MoveLeaderResponse, error) { + m.inc("moveleader") + return nil, unsupported("moveleader") +} +func (m *mockEtcd) Downgrade(context.Context, *etcdserverpb.DowngradeRequest) (*etcdserverpb.DowngradeResponse, error) { + m.inc("downgrade") + return nil, unsupported("downgrade") +} + +// Cluster mocks +func (m *mockEtcd) MemberAdd(context.Context, *etcdserverpb.MemberAddRequest) (*etcdserverpb.MemberAddResponse, error) { + m.inc("memberadd") + return nil, unsupported("memberadd") +} +func (m *mockEtcd) MemberRemove(context.Context, *etcdserverpb.MemberRemoveRequest) (*etcdserverpb.MemberRemoveResponse, error) { + m.inc("memberremove") + return nil, etcdserver.ErrNotEnoughStartedMembers +} +func (m *mockEtcd) MemberUpdate(context.Context, *etcdserverpb.MemberUpdateRequest) (*etcdserverpb.MemberUpdateResponse, error) { + m.inc("memberupdate") + return nil, unsupported("memberupdate") +} +func (m *mockEtcd) MemberList(context.Context, *etcdserverpb.MemberListRequest) (*etcdserverpb.MemberListResponse, error) { + m.inc("memberlist") + scheme := "http" + if m.e.config.Datastore.ServerTLSConfig.CertFile != "" { + scheme = "https" + } + + return &etcdserverpb.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{ + MemberId: 1, + }, + Members: []*etcdserverpb.Member{ + { + ID: 1, + Name: m.e.name, + IsLearner: m.isLearner, + ClientURLs: []string{scheme + "://127.0.0.1:2379"}, + PeerURLs: []string{scheme + "://" + m.e.address + ":2380"}, + }, + }, + }, nil +} + +func (m *mockEtcd) MemberPromote(context.Context, *etcdserverpb.MemberPromoteRequest) (*etcdserverpb.MemberPromoteResponse, error) { + m.inc("memberpromote") + return nil, unsupported("memberpromote") +} + +func unsupported(field string) error { + return status.New(codes.Unimplemented, field+" is not implemented").Err() +} diff --git a/tests/unit.go b/tests/unit.go index ee76af03851a..5bc40cea3b55 100644 --- a/tests/unit.go +++ b/tests/unit.go @@ -54,6 +54,10 @@ func GenerateRuntime(cnf *config.Control) error { deps.CreateRuntimeCertFiles(cnf) + cnf.Datastore.ServerTLSConfig.CAFile = cnf.Runtime.ETCDServerCA + cnf.Datastore.ServerTLSConfig.CertFile = cnf.Runtime.ServerETCDCert + cnf.Datastore.ServerTLSConfig.KeyFile = cnf.Runtime.ServerETCDKey + return deps.GenServerDeps(cnf) }