diff --git a/server/embed/config.go b/server/embed/config.go index eccea6598fa..ec4da2fbd91 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -205,12 +205,12 @@ type Config struct { // streams that each client can open at a time. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` - LPUrls, LCUrls []url.URL - APUrls, ACUrls []url.URL - ClientTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerTLSInfo transport.TLSInfo - PeerAutoTLS bool + ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL + AdvertisePeerUrls, AdvertiseClientUrls []url.URL + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool // SelfSignedCertValidity specifies the validity period of the client and peer certificates // that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS, // the unit is year, and the default is 1 @@ -423,10 +423,11 @@ type configYAML struct { // configJSON has file options that are translated into Config options type configJSON struct { - LPUrlsJSON string `json:"listen-peer-urls"` - LCUrlsJSON string `json:"listen-client-urls"` - APUrlsJSON string `json:"initial-advertise-peer-urls"` - ACUrlsJSON string `json:"advertise-client-urls"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` + AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` CORSJSON string `json:"cors"` HostWhitelistJSON string `json:"host-whitelist"` @@ -475,10 +476,10 @@ func NewConfig() *Config { ElectionMs: 1000, InitialElectionTickAdvance: true, - LPUrls: []url.URL{*lpurl}, - LCUrls: []url.URL{*lcurl}, - APUrls: []url.URL{*apurl}, - ACUrls: []url.URL{*acurl}, + ListenPeerUrls: []url.URL{*lpurl}, + ListenClientUrls: []url.URL{*lcurl}, + AdvertisePeerUrls: []url.URL{*apurl}, + AdvertiseClientUrls: []url.URL{*acurl}, ClusterState: ClusterStateFlagNew, InitialClusterToken: "etcd-cluster", @@ -539,40 +540,49 @@ func (cfg *configYAML) configFromFile(path string) error { return err } - if cfg.LPUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ",")) + if cfg.configJSON.ListenPeerUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenPeerUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up listen-peer-urls: %v\n", err) os.Exit(1) } - cfg.LPUrls = []url.URL(u) + cfg.Config.ListenPeerUrls = u } - if cfg.LCUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ",")) + if cfg.configJSON.ListenClientUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-urls: %v\n", err) os.Exit(1) } - cfg.LCUrls = []url.URL(u) + cfg.Config.ListenClientUrls = u } - if cfg.APUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ",")) + if cfg.configJSON.ListenClientHttpUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ",")) + if err != nil { + fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err) + os.Exit(1) + } + cfg.Config.ListenClientHttpUrls = u + } + + if cfg.configJSON.AdvertisePeerUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up initial-advertise-peer-urls: %v\n", err) os.Exit(1) } - cfg.APUrls = []url.URL(u) + cfg.Config.AdvertisePeerUrls = u } - if cfg.ACUrlsJSON != "" { - u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ",")) + if cfg.configJSON.AdvertiseClientUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertiseClientUrls, ",")) if err != nil { fmt.Fprintf(os.Stderr, "unexpected error setting up advertise-peer-urls: %v\n", err) os.Exit(1) } - cfg.ACUrls = []url.URL(u) + cfg.Config.AdvertiseClientUrls = u } if cfg.ListenMetricsUrlsJSON != "" { @@ -650,21 +660,27 @@ func (cfg *Config) Validate() error { if err := cfg.setupLogging(); err != nil { return err } - if err := checkBindURLs(cfg.LPUrls); err != nil { + if err := checkBindURLs(cfg.ListenPeerUrls); err != nil { + return err + } + if err := checkBindURLs(cfg.ListenClientUrls); err != nil { return err } - if err := checkBindURLs(cfg.LCUrls); err != nil { + if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil { return err } + if len(cfg.ListenClientHttpUrls) == 0 { + cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.") + } if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil { return err } - if err := checkHostURLs(cfg.APUrls); err != nil { - addrs := cfg.getAPURLs() + if err := checkHostURLs(cfg.AdvertisePeerUrls); err != nil { + addrs := cfg.getAdvertisePeerUrls() return fmt.Errorf(`--initial-advertise-peer-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err) } - if err := checkHostURLs(cfg.ACUrls); err != nil { - addrs := cfg.getACURLs() + if err := checkHostURLs(cfg.AdvertiseClientUrls); err != nil { + addrs := cfg.getAdvertiseClientUrls() return fmt.Errorf(`--advertise-client-urls %q must be "host:port" (%v)`, strings.Join(addrs, ","), err) } // Check if conflicting flags are passed. @@ -697,7 +713,7 @@ func (cfg *Config) Validate() error { } // check this last since proxying in etcdmain may make this OK - if cfg.LCUrls != nil && cfg.ACUrls == nil { + if cfg.ListenClientUrls != nil && cfg.AdvertiseClientUrls == nil { return ErrUnsetAdvertiseClientURLsFlag } @@ -750,7 +766,7 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok urlsmap = types.URLsMap{} // If using discovery, generate a temporary cluster based on // self's advertised peer URLs - urlsmap[cfg.Name] = cfg.APUrls + urlsmap[cfg.Name] = cfg.AdvertisePeerUrls token = cfg.Durl case cfg.DNSCluster != "": @@ -804,7 +820,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { // Use both etcd-server-ssl and etcd-server for discovery. // Combine the results if both are available. - clusterStrs, cerr = getCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) + clusterStrs, cerr = getCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls) if cerr != nil { clusterStrs = make([]string, 0) } @@ -814,12 +830,12 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { zap.String("service-name", "etcd-server-ssl"+serviceNameSuffix), zap.String("server-name", cfg.Name), zap.String("discovery-srv", cfg.DNSCluster), - zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()), zap.Strings("found-cluster", clusterStrs), zap.Error(cerr), ) - defaultHTTPClusterStrs, httpCerr := getCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) + defaultHTTPClusterStrs, httpCerr := getCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.AdvertisePeerUrls) if httpCerr == nil { clusterStrs = append(clusterStrs, defaultHTTPClusterStrs...) } @@ -829,7 +845,7 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { zap.String("service-name", "etcd-server"+serviceNameSuffix), zap.String("server-name", cfg.Name), zap.String("discovery-srv", cfg.DNSCluster), - zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("advertise-peer-urls", cfg.getAdvertisePeerUrls()), zap.Strings("found-cluster", clusterStrs), zap.Error(httpCerr), ) @@ -838,15 +854,15 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { } func (cfg Config) InitialClusterFromName(name string) (ret string) { - if len(cfg.APUrls) == 0 { + if len(cfg.AdvertisePeerUrls) == 0 { return "" } n := name if name == "" { n = DefaultName } - for i := range cfg.APUrls { - ret = ret + "," + n + "=" + cfg.APUrls[i].String() + for i := range cfg.AdvertisePeerUrls { + ret = ret + "," + n + "=" + cfg.AdvertisePeerUrls[i].String() } return ret[1:] } @@ -862,11 +878,11 @@ func (cfg Config) V2DeprecationEffective() config.V2DeprecationEnum { } func (cfg Config) defaultPeerHost() bool { - return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs + return len(cfg.AdvertisePeerUrls) == 1 && cfg.AdvertisePeerUrls[0].String() == DefaultInitialAdvertisePeerURLs } func (cfg Config) defaultClientHost() bool { - return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs + return len(cfg.AdvertiseClientUrls) == 1 && cfg.AdvertiseClientUrls[0].String() == DefaultAdvertiseClientURLs } func (cfg *Config) ClientSelfCert() (err error) { @@ -877,9 +893,12 @@ func (cfg *Config) ClientSelfCert() (err error) { cfg.logger.Warn("ignoring client auto TLS since certs given") return nil } - chosts := make([]string, len(cfg.LCUrls)) - for i, u := range cfg.LCUrls { - chosts[i] = u.Host + chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls)) + for _, u := range cfg.ListenClientUrls { + chosts = append(chosts, u.Host) + } + for _, u := range cfg.ListenClientHttpUrls { + chosts = append(chosts, u.Host) } cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts, cfg.SelfSignedCertValidity) if err != nil { @@ -896,8 +915,8 @@ func (cfg *Config) PeerSelfCert() (err error) { cfg.logger.Warn("ignoring peer auto TLS since certs given") return nil } - phosts := make([]string, len(cfg.LPUrls)) - for i, u := range cfg.LPUrls { + phosts := make([]string, len(cfg.ListenPeerUrls)) + for i, u := range cfg.ListenPeerUrls { phosts[i] = u.Host } cfg.PeerTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "peer"), phosts, cfg.SelfSignedCertValidity) @@ -925,9 +944,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s } used := false - pip, pport := cfg.LPUrls[0].Hostname(), cfg.LPUrls[0].Port() + pip, pport := cfg.ListenPeerUrls[0].Hostname(), cfg.ListenPeerUrls[0].Port() if cfg.defaultPeerHost() && pip == "0.0.0.0" { - cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)} + cfg.AdvertisePeerUrls[0] = url.URL{Scheme: cfg.AdvertisePeerUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)} used = true } // update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc') @@ -935,9 +954,9 @@ func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (s cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) } - cip, cport := cfg.LCUrls[0].Hostname(), cfg.LCUrls[0].Port() + cip, cport := cfg.ListenClientUrls[0].Hostname(), cfg.ListenClientUrls[0].Port() if cfg.defaultClientHost() && cip == "0.0.0.0" { - cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)} + cfg.AdvertiseClientUrls[0] = url.URL{Scheme: cfg.AdvertiseClientUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)} used = true } dhost := defaultHostname @@ -982,34 +1001,42 @@ func checkHostURLs(urls []url.URL) error { return nil } -func (cfg *Config) getAPURLs() (ss []string) { - ss = make([]string, len(cfg.APUrls)) - for i := range cfg.APUrls { - ss[i] = cfg.APUrls[i].String() +func (cfg *Config) getAdvertisePeerUrls() (ss []string) { + ss = make([]string, len(cfg.AdvertisePeerUrls)) + for i := range cfg.AdvertisePeerUrls { + ss[i] = cfg.AdvertisePeerUrls[i].String() + } + return ss +} + +func (cfg *Config) getListenPeerUrls() (ss []string) { + ss = make([]string, len(cfg.ListenPeerUrls)) + for i := range cfg.ListenPeerUrls { + ss[i] = cfg.ListenPeerUrls[i].String() } return ss } -func (cfg *Config) getLPURLs() (ss []string) { - ss = make([]string, len(cfg.LPUrls)) - for i := range cfg.LPUrls { - ss[i] = cfg.LPUrls[i].String() +func (cfg *Config) getAdvertiseClientUrls() (ss []string) { + ss = make([]string, len(cfg.AdvertiseClientUrls)) + for i := range cfg.AdvertiseClientUrls { + ss[i] = cfg.AdvertiseClientUrls[i].String() } return ss } -func (cfg *Config) getACURLs() (ss []string) { - ss = make([]string, len(cfg.ACUrls)) - for i := range cfg.ACUrls { - ss[i] = cfg.ACUrls[i].String() +func (cfg *Config) getListenClientUrls() (ss []string) { + ss = make([]string, len(cfg.ListenClientUrls)) + for i := range cfg.ListenClientUrls { + ss[i] = cfg.ListenClientUrls[i].String() } return ss } -func (cfg *Config) getLCURLs() (ss []string) { - ss = make([]string, len(cfg.LCUrls)) - for i := range cfg.LCUrls { - ss[i] = cfg.LCUrls[i].String() +func (cfg *Config) getListenClientHttpUrls() (ss []string) { + ss = make([]string, len(cfg.ListenClientHttpUrls)) + for i := range cfg.ListenClientHttpUrls { + ss[i] = cfg.ListenClientHttpUrls[i].String() } return ss } diff --git a/server/embed/config_test.go b/server/embed/config_test.go index 584ef3553f2..a0550a6a288 100644 --- a/server/embed/config_test.go +++ b/server/embed/config_test.go @@ -86,12 +86,12 @@ func TestConfigFileOtherFields(t *testing.T) { func TestUpdateDefaultClusterFromName(t *testing.T) { cfg := NewConfig() defaultInitialCluster := cfg.InitialCluster - oldscheme := cfg.APUrls[0].Scheme - origpeer := cfg.APUrls[0].String() - origadvc := cfg.ACUrls[0].String() + oldscheme := cfg.AdvertisePeerUrls[0].Scheme + origpeer := cfg.AdvertisePeerUrls[0].String() + origadvc := cfg.AdvertiseClientUrls[0].String() cfg.Name = "abc" - lpport := cfg.LPUrls[0].Port() + lpport := cfg.ListenPeerUrls[0].Port() // in case of 'etcd --name=abc' exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport) @@ -100,12 +100,12 @@ func TestUpdateDefaultClusterFromName(t *testing.T) { t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster) } // advertise peer URL should not be affected - if origpeer != cfg.APUrls[0].String() { - t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String()) + if origpeer != cfg.AdvertisePeerUrls[0].String() { + t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.AdvertisePeerUrls[0].String()) } // advertise client URL should not be affected - if origadvc != cfg.ACUrls[0].String() { - t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String()) + if origadvc != cfg.AdvertiseClientUrls[0].String() { + t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String()) } } @@ -118,17 +118,17 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) { cfg := NewConfig() defaultInitialCluster := cfg.InitialCluster - oldscheme := cfg.APUrls[0].Scheme - origadvc := cfg.ACUrls[0].String() + oldscheme := cfg.AdvertisePeerUrls[0].Scheme + origadvc := cfg.AdvertiseClientUrls[0].String() cfg.Name = "abc" - lpport := cfg.LPUrls[0].Port() - cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)} + lpport := cfg.ListenPeerUrls[0].Port() + cfg.ListenPeerUrls[0] = url.URL{Scheme: cfg.ListenPeerUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)} dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster) if dhost != defaultHostname { t.Fatalf("expected default host %q, got %q", defaultHostname, dhost) } - aphost, apport := cfg.APUrls[0].Hostname(), cfg.APUrls[0].Port() + aphost, apport := cfg.AdvertisePeerUrls[0].Hostname(), cfg.AdvertisePeerUrls[0].Port() if apport != lpport { t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport) } @@ -141,8 +141,8 @@ func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) { } // advertise client URL should not be affected - if origadvc != cfg.ACUrls[0].String() { - t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String()) + if origadvc != cfg.AdvertiseClientUrls[0].String() { + t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.AdvertiseClientUrls[0].String()) } } @@ -276,7 +276,7 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) { cfg.InitialCluster = "" cfg.InitialClusterToken = "" cfg.DNSCluster = "example.com" - cfg.APUrls = types.MustNewURLs(tt.apurls) + cfg.AdvertisePeerUrls = types.MustNewURLs(tt.apurls) if err := cfg.Validate(); err != nil { t.Errorf("#%d: failed to validate test Config: %v", i, err) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 23f89092286..43bf4234106 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" defaultLog "log" + "math" "net" "net/http" "net/url" @@ -32,6 +33,7 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" runtimeutil "go.etcd.io/etcd/pkg/v3/runtime" "go.etcd.io/etcd/server/v3/config" @@ -48,6 +50,7 @@ import ( "github.com/soheilhy/cmux" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" ) @@ -123,7 +126,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { } e.cfg.logger.Info( "configuring peer listeners", - zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), + zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()), ) if e.Peers, err = configurePeerListeners(cfg); err != nil { return e, err @@ -131,7 +134,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.cfg.logger.Info( "configuring client listeners", - zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()), ) if e.sctxs, err = configureClientListeners(cfg); err != nil { return e, err @@ -167,8 +170,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { srvcfg := config.ServerConfig{ Name: cfg.Name, - ClientURLs: cfg.ACUrls, - PeerURLs: cfg.APUrls, + ClientURLs: cfg.AdvertiseClientUrls, + PeerURLs: cfg.AdvertisePeerUrls, DataDir: cfg.Dir, DedicatedWALDir: cfg.WalDir, SnapshotCount: cfg.SnapshotCount, @@ -275,10 +278,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.cfg.logger.Info( "now serving peer/client/metrics", zap.String("local-member-id", e.Server.ID().String()), - zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()), - zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), - zap.Strings("advertise-client-urls", e.cfg.getACURLs()), - zap.Strings("listen-client-urls", e.cfg.getLCURLs()), + zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()), + zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()), + zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()), + zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()), zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()), ) serving = true @@ -326,10 +329,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Uint("max-wals", sc.MaxWALFiles), zap.Uint("max-snapshots", sc.MaxSnapFiles), zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries), - zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()), - zap.Strings("listen-peer-urls", ec.getLPURLs()), - zap.Strings("advertise-client-urls", ec.getACURLs()), - zap.Strings("listen-client-urls", ec.getLCURLs()), + zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerUrls()), + zap.Strings("listen-peer-urls", ec.getListenPeerUrls()), + zap.Strings("advertise-client-urls", ec.getAdvertiseClientUrls()), + zap.Strings("listen-client-urls", ec.getListenClientUrls()), zap.Strings("listen-metrics-urls", ec.getMetricsURLs()), zap.Strings("cors", cors), zap.Strings("host-whitelist", hss), @@ -366,8 +369,8 @@ func (e *Etcd) Close() { fields := []zap.Field{ zap.String("name", e.cfg.Name), zap.String("data-dir", e.cfg.Dir), - zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()), - zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()), + zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()), } lg := e.GetLogger() lg.Info("closing etcd server", fields...) @@ -437,11 +440,16 @@ func (e *Etcd) Close() { func stopServers(ctx context.Context, ss *servers) { // first, close the http.Server - ss.http.Shutdown(ctx) - // do not grpc.Server.GracefulStop with TLS enabled etcd server + if ss.http != nil { + ss.http.Shutdown(ctx) + } + if ss.grpc == nil { + return + } + // do not grpc.Server.GracefulStop when grpc runs under http server // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 // and https://github.com/etcd-io/etcd/issues/8916 - if ss.secure { + if ss.secure && ss.http != nil { ss.grpc.Stop() return } @@ -491,7 +499,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { ) } - peers = make([]*peerListener, len(cfg.LPUrls)) + peers = make([]*peerListener, len(cfg.ListenPeerUrls)) defer func() { if err == nil { return @@ -500,7 +508,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { if peers[i] != nil && peers[i].close != nil { cfg.logger.Warn( "closing peer listener", - zap.String("address", cfg.LPUrls[i].String()), + zap.String("address", cfg.ListenPeerUrls[i].String()), zap.Error(err), ) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -510,7 +518,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) { } }() - for i, u := range cfg.LPUrls { + for i, u := range cfg.ListenPeerUrls { if u.Scheme == "http" { if !cfg.PeerTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) @@ -611,8 +619,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } sctxs = make(map[string]*serveCtx) - for _, u := range cfg.LCUrls { - sctx := newServeCtx(cfg.logger) + for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) { if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String())) @@ -624,24 +631,41 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() { return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String()) } + } - network := "tcp" - addr := u.Host - if u.Scheme == "unix" || u.Scheme == "unixs" { - network = "unix" - addr = u.Host + u.Path + for _, u := range cfg.ListenClientUrls { + addr, secure, network := resolveUrl(u) + sctx := sctxs[addr] + if sctx == nil { + sctx = newServeCtx(cfg.logger) + sctxs[addr] = sctx } + sctx.secure = sctx.secure || secure + sctx.insecure = sctx.insecure || !secure + sctx.scheme = u.Scheme + sctx.addr = addr sctx.network = network + } + for _, u := range cfg.ListenClientHttpUrls { + addr, secure, network := resolveUrl(u) - sctx.secure = u.Scheme == "https" || u.Scheme == "unixs" - sctx.insecure = !sctx.secure - if oldctx := sctxs[addr]; oldctx != nil { - oldctx.secure = oldctx.secure || sctx.secure - oldctx.insecure = oldctx.insecure || sctx.insecure - continue + sctx := sctxs[addr] + if sctx == nil { + sctx = newServeCtx(cfg.logger) + sctxs[addr] = sctx + } else if !sctx.httpOnly { + return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String()) } + sctx.secure = sctx.secure || secure + sctx.insecure = sctx.insecure || !secure + sctx.scheme = u.Scheme + sctx.addr = addr + sctx.network = network + sctx.httpOnly = true + } - if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme, + for _, sctx := range sctxs { + if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme, transport.WithSocketOpts(&cfg.SocketOpts), transport.WithSkipTLSInfoCheck(true), ); err != nil { @@ -649,7 +673,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking // hosts that disable ipv6. So, use the address given by the user. - sctx.addr = addr if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { if fdLimit <= reservedInternalFDNum { @@ -662,17 +685,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) } - defer func(u url.URL) { - if err == nil { + defer func(sctx *serveCtx) { + if err == nil || sctx.l == nil { return } sctx.l.Close() cfg.logger.Warn( "closing peer listener", - zap.String("address", u.Host), + zap.String("address", sctx.addr), zap.Error(err), ) - }(u) + }(sctx) for k := range cfg.UserHandlers { sctx.userHandlers[k] = cfg.UserHandlers[k] } @@ -683,11 +706,21 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro if cfg.LogLevel == "debug" { sctx.registerTrace() } - sctxs[addr] = sctx } return sctxs, nil } +func resolveUrl(u url.URL) (addr string, secure bool, network string) { + addr = u.Host + network = "tcp" + if u.Scheme == "unix" || u.Scheme == "unixs" { + addr = u.Host + u.Path + network = "unix" + } + secure = u.Scheme == "https" || u.Scheme == "unixs" + return addr, secure, network +} + func (e *Etcd) serveClients() (err error) { if !e.cfg.ClientTLSInfo.Empty() { e.cfg.logger.Info( @@ -733,15 +766,69 @@ func (e *Etcd) serveClients() (err error) { })) } + splitHttp := false + for _, sctx := range e.sctxs { + if sctx.httpOnly { + splitHttp = true + } + } + // start client servers in each goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...)) }(sctx) } return nil } +func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { + if !e.cfg.EnableGRPCGateway { + return nil + } + sctx := e.pickGrpcGatewayServeContext(splitHttp) + addr := sctx.addr + if network := sctx.network; network == "unix" { + // explicitly define unix network for gRPC socket support + addr = fmt.Sprintf("%s://%s", network, addr) + } + + opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))} + if sctx.secure { + tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig() + if tlsErr != nil { + return func(ctx context.Context) (*grpc.ClientConn, error) { + return nil, tlsErr + } + } + dtls := tlscfg.Clone() + // trust local server + dtls.InsecureSkipVerify = true + bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls}) + opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials())) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + return func(ctx context.Context) (*grpc.ClientConn, error) { + conn, err := grpc.DialContext(ctx, addr, opts...) + if err != nil { + sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err)) + return nil, err + } + return conn, err + } +} + +func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx { + for _, sctx := range e.sctxs { + if !splitHttp || !sctx.httpOnly { + return sctx + } + } + panic("Expect at least one context able to serve grpc") +} + func (e *Etcd) serveMetrics() (err error) { if e.cfg.Metrics == "extensive" { grpc_prometheus.EnableHandlingTimeHistogram() diff --git a/server/embed/serve.go b/server/embed/serve.go index 4989e1ca521..30c975c3768 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -19,14 +19,12 @@ import ( "fmt" "io/ioutil" defaultLog "log" - "math" "net" "net/http" "strings" etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw" "go.etcd.io/etcd/client/pkg/v3/transport" - "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" "go.etcd.io/etcd/server/v3/config" @@ -50,12 +48,15 @@ import ( ) type serveCtx struct { - lg *zap.Logger - l net.Listener + lg *zap.Logger + l net.Listener + + scheme string addr string network string secure bool insecure bool + httpOnly bool ctx context.Context cancel context.CancelFunc @@ -93,6 +94,8 @@ func (sctx *serveCtx) serve( tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error), + grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error), + splitHttp bool, gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() @@ -100,129 +103,158 @@ func (sctx *serveCtx) serve( sctx.lg.Info("ready to serve client requests") m := cmux.New(sctx.l) + var server func() error + onlyGRPC := splitHttp && !sctx.httpOnly + onlyHttp := splitHttp && sctx.httpOnly + grpcEnabled := !onlyHttp + httpEnabled := !onlyGRPC + v3c := v3client.New(s) servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) - if sctx.insecure { - gs := v3rpc.Server(s, nil, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) + // Make sure serversC is closed even if we prematurely exit the function. + defer close(sctx.serversC) + var gwmux *gw.ServeMux + if s.Cfg.EnableGRPCGateway { + // GRPC gateway connects to grpc server via connection provided by grpc dial. + gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends) + if err != nil { + sctx.lg.Error("registerGateway failed", zap.Error(err)) + return err } + } + var traffic string + switch { + case onlyGRPC: + traffic = "grpc" + case onlyHttp: + traffic = "http" + default: + traffic = "grpc+http" + } - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) + if sctx.insecure { + var gs *grpc.Server + var srv *http.Server + if httpEnabled { + httpmux := sctx.createMux(gwmux, handler) + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + ErrorLog: logger, // do not log user error } - }(gs) - - grpcl := m.Match(cmux.HTTP2()) - go func(gs *grpc.Server, grpcLis net.Listener) { - errHandler(gs.Serve(grpcLis)) - }(gs, grpcl) - - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()}) - if err != nil { + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) return err } } - - httpmux := sctx.createMux(gwmux, handler) - - srvhttp := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - ErrorLog: logger, // do not log user error + if grpcEnabled { + gs = v3rpc.Server(s, nil, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) + } + defer func(gs *grpc.Server) { + if err != nil { + sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) + } + }(gs) } - if err := configureHttpServer(srvhttp, s.Cfg); err != nil { - sctx.lg.Error("Configure http server failed", zap.Error(err)) - return err + if onlyGRPC { + server = func() error { + return gs.Serve(sctx.l) + } + } else { + server = m.Serve + + httpl := m.Match(cmux.HTTP1()) + go func(srvhttp *http.Server, tlsLis net.Listener) { + errHandler(srvhttp.Serve(tlsLis)) + }(srv, httpl) + + if grpcEnabled { + grpcl := m.Match(cmux.HTTP2()) + go func(gs *grpc.Server, l net.Listener) { + errHandler(gs.Serve(l)) + }(gs, grpcl) + } } - httpl := m.Match(cmux.HTTP1()) - go func(srvhttp *http.Server, httpLis net.Listener) { - errHandler(srvhttp.Serve(httpLis)) - }(srvhttp, httpl) - - sctx.serversC <- &servers{grpc: gs, http: srvhttp} + sctx.serversC <- &servers{grpc: gs, http: srv} sctx.lg.Info( "serving client traffic insecurely; this is strongly discouraged!", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } if sctx.secure { + var gs *grpc.Server + var srv *http.Server + tlscfg, tlsErr := tlsinfo.ServerConfig() if tlsErr != nil { return tlsErr } - gs := v3rpc.Server(s, tlscfg, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) + if grpcEnabled { + gs = v3rpc.Server(s, tlscfg, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) + } + defer func(gs *grpc.Server) { + if err != nil { + sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) + } + }(gs) } + if httpEnabled { + if grpcEnabled { + handler = grpcHandlerFunc(gs, handler) + } + httpmux := sctx.createMux(gwmux, handler) - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + TLSConfig: tlscfg, + ErrorLog: logger, // do not log user error } - }(gs) - - handler = grpcHandlerFunc(gs, handler) - - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - dtls := tlscfg.Clone() - // trust local server - dtls.InsecureSkipVerify = true - bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls}) - opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())} - gwmux, err = sctx.registerGateway(opts) - if err != nil { + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) return err } } - var tlsl net.Listener - tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) - if err != nil { - return err - } - // TODO: add debug flag; enable logging when debug flag is set - httpmux := sctx.createMux(gwmux, handler) + if onlyGRPC { + server = func() error { return gs.Serve(sctx.l) } + } else { + server = m.Serve - srv := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - TLSConfig: tlscfg, - ErrorLog: logger, // do not log user error - } - if err := configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure https server failed", zap.Error(err)) - return err + tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) + if err != nil { + return err + } + go func(srvhttp *http.Server, tlsl net.Listener) { + errHandler(srvhttp.Serve(tlsl)) + }(srv, tlsl) } - go func(srvhttp *http.Server, tlsLis net.Listener) { - errHandler(srvhttp.Serve(tlsLis)) - }(srv, tlsl) - sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} sctx.lg.Info( "serving client traffic securely", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } - close(sctx.serversC) - return m.Serve() + return server() } func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { @@ -253,20 +285,10 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error -func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) { +func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.ClientConn, error)) (*gw.ServeMux, error) { ctx := sctx.ctx - addr := sctx.addr - if network := sctx.network; network == "unix" { - // explicitly define unix network for gRPC socket support - addr = fmt.Sprintf("%s://%s", network, addr) - } - - opts = append(opts, grpc.WithDefaultCallOptions([]grpc.CallOption{ - grpc.MaxCallRecvMsgSize(math.MaxInt32), - }...)) - - conn, err := grpc.DialContext(ctx, addr, opts...) + conn, err := dial(ctx) if err != nil { return nil, err } diff --git a/server/embed/serve_test.go b/server/embed/serve_test.go index aada585f07e..6a1991119ee 100644 --- a/server/embed/serve_test.go +++ b/server/embed/serve_test.go @@ -38,8 +38,8 @@ func TestStartEtcdWrongToken(t *testing.T) { urls := newEmbedURLs(2) curls := []url.URL{urls[0]} purls := []url.URL{urls[1]} - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range purls { cfg.InitialCluster += ",default=" + purls[i].String() diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index fddb81bcde5..bd7ca792948 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -146,7 +146,11 @@ func newConfig() *config { ) fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultListenClientURLs, ""), "listen-client-urls", - "List of URLs to listen on for client traffic.", + "List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.", + ) + fs.Var( + flags.NewUniqueURLsWithExceptions("", ""), "listen-client-http-urls", + "List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.", ) fs.Var( flags.NewUniqueURLsWithExceptions("", ""), @@ -392,10 +396,11 @@ func (cfg *config) configFromCmdLine() error { lg.Info(fmt.Sprintf("raft-write-timeout increased to minimum value: %v", rafthttp.DefaultConnWriteTimeout)) } - cfg.ec.LPUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") - cfg.ec.APUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") - cfg.ec.LCUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") - cfg.ec.ACUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") + cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") + cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") + cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") + cfg.ec.ListenClientHttpUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-http-urls") + cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls") cfg.ec.CORS = flags.UniqueURLsMapFromFlag(cfg.cf.flagSet, "cors") @@ -416,7 +421,7 @@ func (cfg *config) configFromCmdLine() error { // disable default advertise-client-urls if lcurls is set missingAC := flags.IsSet(cfg.cf.flagSet, "listen-client-urls") && !flags.IsSet(cfg.cf.flagSet, "advertise-client-urls") if !cfg.mayBeProxy() && missingAC { - cfg.ec.ACUrls = nil + cfg.ec.AdvertiseClientUrls = nil } // disable default initial-cluster if discovery is set diff --git a/server/etcdmain/config_test.go b/server/etcdmain/config_test.go index 0dd4db97ec2..faaf250a48f 100644 --- a/server/etcdmain/config_test.go +++ b/server/etcdmain/config_test.go @@ -36,6 +36,7 @@ func TestConfigParsingMemberFlags(t *testing.T) { "-snapshot-count=10", "-listen-peer-urls=http://localhost:8000,https://localhost:8001", "-listen-client-urls=http://localhost:7000,https://localhost:7001", + "-listen-client-http-urls=http://localhost:7002,https://localhost:7003", // it should be set if -listen-client-urls is set "-advertise-client-urls=http://localhost:7000,https://localhost:7001", } @@ -51,14 +52,15 @@ func TestConfigParsingMemberFlags(t *testing.T) { func TestConfigFileMemberFields(t *testing.T) { yc := struct { - Dir string `json:"data-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapshotCount uint64 `json:"snapshot-count"` - LPUrls string `json:"listen-peer-urls"` - LCUrls string `json:"listen-client-urls"` - AcurlsCfgFile string `json:"advertise-client-urls"` + Dir string `json:"data-dir"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + SnapshotCount uint64 `json:"snapshot-count"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` }{ "testdir", 10, @@ -67,6 +69,7 @@ func TestConfigFileMemberFields(t *testing.T) { 10, "http://localhost:8000,https://localhost:8001", "http://localhost:7000,https://localhost:7001", + "http://localhost:7002,https://localhost:7003", "http://localhost:7000,https://localhost:7001", } @@ -513,13 +516,14 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { func validateMemberFlags(t *testing.T, cfg *config) { wcfg := &embed.Config{ - Dir: "testdir", - LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, - MaxSnapFiles: 10, - MaxWalFiles: 10, - Name: "testname", - SnapshotCount: 10, + Dir: "testdir", + ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + ListenClientHttpUrls: []url.URL{{Scheme: "http", Host: "localhost:7002"}, {Scheme: "https", Host: "localhost:7003"}}, + MaxSnapFiles: 10, + MaxWalFiles: 10, + Name: "testname", + SnapshotCount: 10, } if cfg.ec.Dir != wcfg.Dir { @@ -537,18 +541,21 @@ func validateMemberFlags(t *testing.T, cfg *config) { if cfg.ec.SnapshotCount != wcfg.SnapshotCount { t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount) } - if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) { - t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls) + if !reflect.DeepEqual(cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls) { + t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.ListenPeerUrls, wcfg.ListenPeerUrls) } - if !reflect.DeepEqual(cfg.ec.LCUrls, wcfg.LCUrls) { - t.Errorf("listen-client-urls = %v, want %v", cfg.ec.LCUrls, wcfg.LCUrls) + if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) { + t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) + } + if !reflect.DeepEqual(cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) { + t.Errorf("listen-client-http-urls = %v, want %v", cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) } } func validateClusteringFlags(t *testing.T, cfg *config) { wcfg := newConfig() - wcfg.ec.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} - wcfg.ec.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} + wcfg.ec.AdvertisePeerUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} + wcfg.ec.AdvertiseClientUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} wcfg.ec.ClusterState = embed.ClusterStateFlagExisting wcfg.cf.fallback.Set(fallbackFlagExit) wcfg.ec.InitialCluster = "0=http://localhost:8000" @@ -566,11 +573,11 @@ func validateClusteringFlags(t *testing.T, cfg *config) { if cfg.ec.InitialClusterToken != wcfg.ec.InitialClusterToken { t.Errorf("initialClusterToken = %v, want %v", cfg.ec.InitialClusterToken, wcfg.ec.InitialClusterToken) } - if !reflect.DeepEqual(cfg.ec.APUrls, wcfg.ec.APUrls) { - t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.APUrls, wcfg.ec.APUrls) + if !reflect.DeepEqual(cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls) { + t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.ec.AdvertisePeerUrls, wcfg.ec.AdvertisePeerUrls) } - if !reflect.DeepEqual(cfg.ec.ACUrls, wcfg.ec.ACUrls) { - t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.ACUrls, wcfg.ec.ACUrls) + if !reflect.DeepEqual(cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls) { + t.Errorf("advertise-client-urls = %v, want %v", cfg.ec.AdvertiseClientUrls, wcfg.ec.AdvertiseClientUrls) } } diff --git a/server/etcdmain/etcd.go b/server/etcdmain/etcd.go index 2ff8d9173a5..6a79f0a787c 100644 --- a/server/etcdmain/etcd.go +++ b/server/etcdmain/etcd.go @@ -193,7 +193,7 @@ func startEtcdOrProxyV2(args []string) { if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) { lg.Warn("forgot to set --initial-cluster?") } - if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs { + if types.URLs(cfg.ec.AdvertisePeerUrls).String() == embed.DefaultInitialAdvertisePeerURLs { lg.Warn("forgot to set --initial-advertise-peer-urls?") } if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 { @@ -389,11 +389,11 @@ func startProxy(cfg *config) error { // setup self signed certs when serving https cHosts, cTLS := []string{}, false - for _, u := range cfg.ec.LCUrls { + for _, u := range cfg.ec.ListenClientUrls { cHosts = append(cHosts, u.Host) cTLS = cTLS || u.Scheme == "https" } - for _, u := range cfg.ec.ACUrls { + for _, u := range cfg.ec.AdvertiseClientUrls { cHosts = append(cHosts, u.Host) cTLS = cTLS || u.Scheme == "https" } @@ -406,7 +406,7 @@ func startProxy(cfg *config) error { } // Start a proxy server goroutine for each listen address - for _, u := range cfg.ec.LCUrls { + for _, u := range cfg.ec.ListenClientUrls { l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS) if err != nil { return err diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index dd53938f9b3..3b4febd8aa8 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -63,7 +63,9 @@ Member: --listen-peer-urls 'http://localhost:2380' List of URLs to listen on for peer traffic. --listen-client-urls 'http://localhost:2379' - List of URLs to listen on for client traffic. + List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified. + --listen-client-http-urls '' + List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls. --max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `' Maximum number of snapshot files to retain (0 is unlimited). --max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `' diff --git a/tests/e2e/cluster_proxy_test.go b/tests/e2e/cluster_proxy_test.go index fd7924835b3..beca84cfdea 100644 --- a/tests/e2e/cluster_proxy_test.go +++ b/tests/e2e/cluster_proxy_test.go @@ -55,8 +55,10 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } -func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() } -func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() } +func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() } +func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() } +func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() } +func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() } func (p *proxyEtcdProcess) EndpointsMetrics() []string { panic("not implemented; proxy doesn't provide health information") } diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 6b293673f72..e63e9a5d88d 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -149,6 +149,7 @@ type etcdProcessClusterConfig struct { clientTLS clientConnType clientCertAuthEnabled bool + clientHttpSeparate bool isPeerTLS bool isPeerAutoTLS bool isClientAutoTLS bool @@ -245,18 +246,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* initialCluster := make([]string, cfg.clusterSize) for i := 0; i < cfg.clusterSize; i++ { var curls []string - var curl, curltls string + var curl string port := cfg.basePort + 5*i - curlHost := fmt.Sprintf("localhost:%d", port) + clientPort := port + clientHttpPort := port + 4 - switch cfg.clientTLS { - case clientNonTLS, clientTLS: - curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() + if cfg.clientTLS == clientTLSAndNonTLS { + curl = clientURL(clientPort, clientNonTLS) + curls = []string{curl, clientURL(clientPort, clientTLS)} + } else { + curl = clientURL(clientPort, cfg.clientTLS) curls = []string{curl} - case clientTLSAndNonTLS: - curl = (&url.URL{Scheme: "http", Host: curlHost}).String() - curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() - curls = []string{curl, curltls} } purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} @@ -277,6 +277,11 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), } + var clientHttpUrl string + if cfg.clientHttpSeparate { + clientHttpUrl = clientURL(clientHttpPort, cfg.clientTLS) + args = append(args, "--listen-client-http-urls", clientHttpUrl) + } args = addV2Args(args) if cfg.forceNewCluster { args = append(args, "--force-new-cluster") @@ -336,18 +341,19 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* } etcdCfgs[i] = &etcdServerProcessConfig{ - lg: lg, - execPath: cfg.execPath, - args: args, - envVars: cfg.envVars, - tlsArgs: cfg.tlsArgs(), - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - purl: purl, - acurl: curl, - murl: murl, - initialToken: cfg.initialToken, + lg: lg, + execPath: cfg.execPath, + args: args, + envVars: cfg.envVars, + tlsArgs: cfg.tlsArgs(), + dataDirPath: dataDirPath, + keepDataDir: cfg.keepDataDir, + name: name, + purl: purl, + acurl: curl, + murl: murl, + initialToken: cfg.initialToken, + clientHttpUrl: clientHttpUrl, } } @@ -360,6 +366,18 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* return etcdCfgs } +func clientURL(port int, connType clientConnType) string { + curlHost := fmt.Sprintf("localhost:%d", port) + switch connType { + case clientNonTLS: + return (&url.URL{Scheme: "http", Host: curlHost}).String() + case clientTLS: + return (&url.URL{Scheme: "https", Host: curlHost}).String() + default: + panic(fmt.Sprintf("Unsupported connection type %v", connType)) + } +} + func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { if cfg.clientTLS != clientNonTLS { if cfg.isClientAutoTLS { diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index bf3f5ac43e9..22f95297d2a 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -37,8 +37,9 @@ import ( func TestConnectionMultiplexing(t *testing.T) { BeforeTest(t) for _, tc := range []struct { - name string - serverTLS clientConnType + name string + serverTLS clientConnType + separateHttpPort bool }{ { name: "ServerTLS", @@ -52,10 +53,20 @@ func TestConnectionMultiplexing(t *testing.T) { name: "ServerTLSAndNonTLS", serverTLS: clientTLSAndNonTLS, }, + { + name: "SeparateHTTP/ServerTLS", + serverTLS: clientTLS, + separateHttpPort: true, + }, + { + name: "SeparateHTTP/ServerNonTLS", + serverTLS: clientNonTLS, + separateHttpPort: true, + }, } { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true} + cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort} clus, err := newEtcdProcessCluster(t, &cfg) require.NoError(t, err) defer clus.Close() @@ -76,43 +87,45 @@ func TestConnectionMultiplexing(t *testing.T) { name = "ClientTLS" } t.Run(name, func(t *testing.T) { - testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType) + testConnectionMultiplexing(ctx, t, clus.procs[0], connType) }) } }) } - } -func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) { +func testConnectionMultiplexing(ctx context.Context, t *testing.T, member etcdProcess, connType clientConnType) { + httpEndpoint := member.EndpointsHTTP()[0] + grpcEndpoint := member.EndpointsGRPC()[0] switch connType { case clientTLS: - endpoint = toTLS(endpoint) + httpEndpoint = toTLS(httpEndpoint) + grpcEndpoint = toTLS(grpcEndpoint) case clientNonTLS: default: panic(fmt.Sprintf("Unsupported conn type %v", connType)) } t.Run("etcdctl", func(t *testing.T) { t.Run("v2", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true) + etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true) err := etcdctl.Set("a", "1") assert.NoError(t, err) }) t.Run("v3", func(t *testing.T) { - etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false) + etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false) err := etcdctl.Put("a", "1") assert.NoError(t, err) }) }) t.Run("clientv2", func(t *testing.T) { - c, err := newClientV2(t, []string{endpoint}, connType, false) + c, err := newClientV2(t, []string{httpEndpoint}, connType, false) require.NoError(t, err) kv := clientv2.NewKeysAPI(c) _, err = kv.Set(ctx, "a", "1", nil) assert.NoError(t, err) }) t.Run("clientv3", func(t *testing.T) { - c := newClient(t, []string{endpoint}, connType, false) + c := newClient(t, []string{grpcEndpoint}, connType, false) _, err := c.Get(ctx, "a") assert.NoError(t, err) }) @@ -123,11 +136,11 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint stri tname = "default" } t.Run(tname, func(t *testing.T) { - assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType)) - assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType)) - assert.NoError(t, fetchVersion(endpoint, httpVersion, connType)) - assert.NoError(t, fetchHealth(endpoint, httpVersion, connType)) - assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType)) + assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType)) + assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType)) }) } }) diff --git a/tests/e2e/etcd_process.go b/tests/e2e/etcd_process.go index 027b7d6aa14..2c5a408e1af 100644 --- a/tests/e2e/etcd_process.go +++ b/tests/e2e/etcd_process.go @@ -35,6 +35,8 @@ var ( type etcdProcess interface { EndpointsV2() []string EndpointsV3() []string + EndpointsGRPC() []string + EndpointsHTTP() []string EndpointsMetrics() []string Start() error @@ -72,8 +74,9 @@ type etcdServerProcessConfig struct { purl url.URL - acurl string - murl string + acurl string + murl string + clientHttpUrl string initialToken string initialCluster string @@ -91,8 +94,15 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil } -func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} } -func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } +func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() } +func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() } +func (ep *etcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.acurl} } +func (ep *etcdServerProcess) EndpointsHTTP() []string { + if ep.cfg.clientHttpUrl == "" { + return []string{ep.cfg.acurl} + } + return []string{ep.cfg.clientHttpUrl} +} func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} } func (ep *etcdServerProcess) Start() error { diff --git a/tests/e2e/etcdctl.go b/tests/e2e/etcdctl.go index 05e8c4f4a4d..854e61234c8 100644 --- a/tests/e2e/etcdctl.go +++ b/tests/e2e/etcdctl.go @@ -121,6 +121,7 @@ func (ctl *Etcdctl) cmdArgs(args ...string) []string { func (ctl *Etcdctl) flags() map[string]string { fmap := make(map[string]string) if ctl.v2 { + fmap["no-sync"] = "true" if ctl.connType == clientTLS { fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile fmap["cert-file"] = integration.TestTLSInfo.CertFile diff --git a/tests/e2e/utils.go b/tests/e2e/utils.go index 4023fc63e9a..d05b3ad4641 100644 --- a/tests/e2e/utils.go +++ b/tests/e2e/utils.go @@ -101,15 +101,17 @@ func tlsInfo(t testing.TB, connType clientConnType, isAutoTLS bool) (*transport. } } -func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error { +func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error { g := errgroup.Group{} concurrency := 10 + keyCount := 100 keysPerRoutine := keyCount / concurrency + valueSize := dbSize / keyCount for i := 0; i < concurrency; i++ { i := i g.Go(func() error { for j := 0; j < keysPerRoutine; j++ { - _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize)) + _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize))) if err != nil { return err } diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_delay_test.go index c5bf0375ec1..b1ee9c8a0be 100644 --- a/tests/e2e/watch_delay_test.go +++ b/tests/e2e/watch_delay_test.go @@ -33,29 +33,48 @@ import ( const ( watchResponsePeriod = 100 * time.Millisecond watchTestDuration = 5 * time.Second - // TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed. - maxWatchDelay = 2 * time.Second - // Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402. - // Tweaked to pass on GitHub runner. For local runs please increase parameters. - // TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed. - numberOfPreexistingKeys = 100 - sizeOfPreexistingValues = 5000 - readLoadConcurrency = 10 + readLoadConcurrency = 10 ) type testCase struct { - name string - config etcdProcessClusterConfig + name string + config etcdProcessClusterConfig + maxWatchDelay time.Duration + dbSizeBytes int } +const ( + Kilo = 1000 + Mega = 1000 * Kilo +) + +// 10 MB is not a bottleneck of grpc server, but filling up etcd with data. +// Keeping it lower so tests don't take too long. +// If we implement reuse of db we could increase the dbSize. var tcs = []testCase{ { - name: "NoTLS", - config: etcdProcessClusterConfig{clusterSize: 1}, + name: "NoTLS", + config: etcdProcessClusterConfig{clusterSize: 1}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, + }, + { + name: "TLS", + config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS}, + maxWatchDelay: 2 * time.Second, + dbSizeBytes: 500 * Kilo, + }, + { + name: "SeparateHttpNoTLS", + config: etcdProcessClusterConfig{clusterSize: 1, clientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, { - name: "ClientTLS", - config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS}, + name: "SeparateHttpTLS", + config: etcdProcessClusterConfig{clusterSize: 1, isClientAutoTLS: true, clientTLS: clientTLS, clientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, } @@ -69,13 +88,13 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() g := errgroup.Group{} continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify())) + validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -89,7 +108,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -107,7 +126,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { time.Sleep(watchResponsePeriod) } }) - validateWatchDelay(t, c.Watch(ctx, "fake-key")) + validateWatchDelay(t, c.Watch(ctx, "fake-key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -121,7 +140,7 @@ func TestWatchDelayForEvent(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.clientTLS, tc.config.isClientAutoTLS) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -140,13 +159,13 @@ func TestWatchDelayForEvent(t *testing.T) { } }) continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "key")) + validateWatchDelay(t, c.Watch(ctx, "key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } } -func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) { +func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) { start := time.Now() var maxDelay time.Duration for range watch { @@ -177,15 +196,19 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr for i := 0; i < readLoadConcurrency; i++ { g.Go(func() error { for { - _, err := c.Get(ctx, "", clientv3.WithPrefix()) + resp, err := c.Get(ctx, "", clientv3.WithPrefix()) if err != nil { if strings.Contains(err.Error(), "context deadline exceeded") { return nil } return err } + respSize := 0 + for _, kv := range resp.Kvs { + respSize += kv.Size() + } mux.Lock() - size += numberOfPreexistingKeys * sizeOfPreexistingValues + size += respSize mux.Unlock() } }) diff --git a/tests/integration/clientv3/snapshot/v3_snapshot_test.go b/tests/integration/clientv3/snapshot/v3_snapshot_test.go index 82b03214f5a..770a73430aa 100644 --- a/tests/integration/clientv3/snapshot/v3_snapshot_test.go +++ b/tests/integration/clientv3/snapshot/v3_snapshot_test.go @@ -66,8 +66,8 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { cfg := integration.NewEmbedConfig(t, "default") cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) srv, err := embed.StartEtcd(cfg) if err != nil { @@ -82,7 +82,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { t.Fatalf("failed to start embed.Etcd for creating snapshots") } - ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + ccfg := clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}} cli, err := integration.NewClient(t, ccfg) if err != nil { t.Fatal(err) diff --git a/tests/integration/embed/embed_test.go b/tests/integration/embed/embed_test.go index c04bf97c961..27da5bf473b 100644 --- a/tests/integration/embed/embed_test.go +++ b/tests/integration/embed/embed_test.go @@ -78,7 +78,7 @@ func TestEmbedEtcd(t *testing.T) { tests[0].cfg.Durl = "abc" setupEmbedCfg(&tests[1].cfg, []url.URL{urls[0]}, []url.URL{urls[1]}) - tests[1].cfg.ACUrls = nil + tests[1].cfg.AdvertiseClientUrls = nil tests[2].cfg.TickMs = tests[2].cfg.ElectionMs - 1 tests[3].cfg.ElectionMs = 999999 setupEmbedCfg(&tests[4].cfg, []url.URL{urls[2]}, []url.URL{urls[3]}) @@ -86,8 +86,8 @@ func TestEmbedEtcd(t *testing.T) { setupEmbedCfg(&tests[6].cfg, []url.URL{urls[7], urls[8]}, []url.URL{urls[9]}) dnsURL, _ := url.Parse("http://whatever.test:12345") - tests[7].cfg.LCUrls = []url.URL{*dnsURL} - tests[8].cfg.LPUrls = []url.URL{*dnsURL} + tests[7].cfg.ListenClientUrls = []url.URL{*dnsURL} + tests[8].cfg.ListenPeerUrls = []url.URL{*dnsURL} dir := filepath.Join(t.TempDir(), fmt.Sprintf("embed-etcd")) @@ -202,8 +202,8 @@ func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) { cfg.LogOutputs = []string{"/dev/null"} cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range purls { cfg.InitialCluster += ",default=" + purls[i].String() diff --git a/tests/integration/snapshot/member_test.go b/tests/integration/snapshot/member_test.go index 076d928bbc3..61ce235f5c5 100644 --- a/tests/integration/snapshot/member_test.go +++ b/tests/integration/snapshot/member_test.go @@ -66,8 +66,8 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) { cfg := integration.NewEmbedConfig(t, "3") cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = newCURLs, newCURLs - cfg.LPUrls, cfg.APUrls = newPURLs, newPURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = newCURLs, newCURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = newPURLs, newPURLs cfg.InitialCluster = "" for i := 0; i < clusterN; i++ { cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index 36886c40bf9..4de0f251d6b 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -48,8 +48,8 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { cfg := integration.NewEmbedConfig(t, "s1") cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) sp := snapshot.NewV3(zaptest.NewLogger(t)) @@ -82,7 +82,7 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { } var cli *clientv3.Client - cli, err = integration.NewClient(t, clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) + cli, err = integration.NewClient(t, clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}}) if err != nil { t.Fatal(err) } @@ -177,8 +177,8 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { cfg := integration.NewEmbedConfig(t, "default") cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = cURLs, cURLs - cfg.LPUrls, cfg.APUrls = pURLs, pURLs + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) srv, err := embed.StartEtcd(cfg) if err != nil { @@ -193,7 +193,7 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { t.Fatalf("failed to start embed.Etcd for creating snapshots") } - ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} + ccfg := clientv3.Config{Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}} cli, err := integration.NewClient(t, ccfg) if err != nil { t.Fatal(err) @@ -237,8 +237,8 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) ( cfg := integration.NewEmbedConfig(t, fmt.Sprintf("m%d", i)) cfg.InitialClusterToken = testClusterTkn cfg.ClusterState = "existing" - cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} - cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} cfg.InitialCluster = ics sp := snapshot.NewV3( diff --git a/tools/etcd-dump-metrics/etcd.go b/tools/etcd-dump-metrics/etcd.go index 7997e283237..5b4db2c25ab 100644 --- a/tools/etcd-dump-metrics/etcd.go +++ b/tools/etcd-dump-metrics/etcd.go @@ -51,8 +51,8 @@ func setupEmbedCfg(cfg *embed.Config, curls, purls, ics []url.URL) { os.RemoveAll(cfg.Dir) cfg.ClusterState = "new" - cfg.LCUrls, cfg.ACUrls = curls, curls - cfg.LPUrls, cfg.APUrls = purls, purls + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = curls, curls + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = purls, purls cfg.InitialCluster = "" for i := range ics {