From 47d33ac50b7b978c50134ced9b49feafb49bd6ac Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 10 Jul 2016 11:06:08 -0700 Subject: [PATCH] etcdmain, embed: export Config and StartEtcd into embed/ Lets programs embed etcd. Fixes #5430 --- embed/config.go | 304 +++++++++++++++++++ embed/config_test.go | 69 +++++ etcdmain/const_unix.go => embed/doc.go | 32 +- embed/etcd.go | 300 +++++++++++++++++++ {etcdmain => embed}/serve.go | 44 +-- etcdmain/config.go | 396 ++++++++----------------- etcdmain/config_test.go | 158 ++++------ etcdmain/const_windows.go | 26 -- etcdmain/etcd.go | 306 +++---------------- etcdmain/help.go | 10 +- 10 files changed, 942 insertions(+), 703 deletions(-) create mode 100644 embed/config.go create mode 100644 embed/config_test.go rename etcdmain/const_unix.go => embed/doc.go (56%) create mode 100644 embed/etcd.go rename {etcdmain => embed}/serve.go (75%) delete mode 100644 etcdmain/const_windows.go diff --git a/embed/config.go b/embed/config.go new file mode 100644 index 000000000000..e938827f3a27 --- /dev/null +++ b/embed/config.go @@ -0,0 +1,304 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package embed + +import ( + "fmt" + "io/ioutil" + "net/url" + "strings" + + "github.com/coreos/etcd/discovery" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/pkg/cors" + "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/pkg/types" + "github.com/ghodss/yaml" +) + +const ( + ClusterStateFlagNew = "new" + ClusterStateFlagExisting = "existing" + + DefaultName = "default" + DefaultInitialAdvertisePeerURLs = "http://localhost:2380" + DefaultAdvertiseClientURLs = "http://localhost:2379" + DefaultListenPeerURLs = "http://localhost:2380" + DefaultListenClientURLs = "http://localhost:2379" + DefaultMaxSnapshots = 5 + DefaultMaxWALs = 5 + + // maxElectionMs specifies the maximum value of election timeout. + // More details are listed in ../Documentation/tuning.md#time-parameters. + maxElectionMs = 50000 +) + +var ( + ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " + + "Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"") + ErrUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly") +) + +// Config holds the arguments for configuring an etcd server. +type Config struct { + // member + + CorsInfo *cors.CORSInfo + LPUrls, LCUrls []url.URL + Dir string `json:"data-dir"` + WalDir string `json:"wal-dir"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + SnapCount uint64 `json:"snapshot-count"` + AutoCompactionRetention int `json:"auto-compaction-retention"` + + // TickMs is the number of milliseconds between heartbeat ticks. + // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). + // make ticks a cluster wide configuration. + TickMs uint `json:"heartbeat-interval"` + ElectionMs uint `json:"election-timeout"` + QuotaBackendBytes int64 `json:"quota-backend-bytes"` + + // clustering + + APUrls, ACUrls []url.URL + ClusterState string `json:"initial-cluster-state"` + DNSCluster string `json:"discovery-srv"` + Dproxy string `json:"discovery-proxy"` + Durl string `json:"discovery"` + InitialCluster string `json:"initial-cluster"` + InitialClusterToken string `json:"initial-cluster-token"` + StrictReconfigCheck bool `json:"strict-reconfig-check"` + + // security + + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool + + // debug + + Debug bool `json:"debug"` + LogPkgLevels string `json:"log-package-levels"` + EnablePprof bool + + // ForceNewCluster starts a new cluster even if previously started; unsafe. + ForceNewCluster bool `json:"force-new-cluster"` +} + +// config holds the config suitable for yaml parsing +type config struct { + Config + configJSON +} + +// configJSON has file options that are translated into Config options +type configJSON struct { + LPUrlsJSON string `json:"listen-peer-urls"` + LCUrlsJSON string `json:"listen-client-urls"` + CorsJSON string `json:"cors"` + APUrlsJSON string `json:"initial-advertise-peer-urls"` + ACUrlsJSON string `json:"advertise-client-urls"` + ClientSecurityJSON securityConfig `json:"client-transport-security"` + PeerSecurityJSON securityConfig `json:"peer-transport-security"` +} + +type securityConfig struct { + CAFile string `json:"ca-file"` + CertFile string `json:"cert-file"` + KeyFile string `json:"key-file"` + CertAuth bool `json:"client-cert-auth"` + TrustedCAFile string `json:"trusted-ca-file"` + AutoTLS bool `json:"auto-tls"` +} + +// NewConfig creates a new Config populated with default values. +func NewConfig() *Config { + apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs) + acurl, _ := url.Parse(DefaultAdvertiseClientURLs) + cfg := &Config{ + CorsInfo: &cors.CORSInfo{}, + MaxSnapFiles: DefaultMaxSnapshots, + MaxWalFiles: DefaultMaxWALs, + Name: DefaultName, + SnapCount: etcdserver.DefaultSnapCount, + TickMs: 100, + ElectionMs: 1000, + APUrls: []url.URL{*apurl}, + ACUrls: []url.URL{*acurl}, + ClusterState: ClusterStateFlagNew, + InitialClusterToken: "etcd-cluster", + } + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + return cfg +} + +func ConfigFromFile(path string) (*Config, error) { + cfg := &config{} + if err := cfg.configFromFile(path); err != nil { + return nil, err + } + return &cfg.Config, nil +} + +func (cfg *config) configFromFile(path string) error { + b, err := ioutil.ReadFile(path) + if err != nil { + return err + } + + err = yaml.Unmarshal(b, cfg) + if err != nil { + return err + } + + if cfg.LPUrlsJSON != "" { + u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ",")) + if err != nil { + plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err) + } + cfg.LPUrls = []url.URL(u) + } + + if cfg.LCUrlsJSON != "" { + u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ",")) + if err != nil { + plog.Fatalf("unexpected error setting up listen-client-urls: %v", err) + } + cfg.LCUrls = []url.URL(u) + } + + if cfg.CorsJSON != "" { + if err := cfg.CorsInfo.Set(cfg.CorsJSON); err != nil { + plog.Panicf("unexpected error setting up cors: %v", err) + } + } + + if cfg.APUrlsJSON != "" { + u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ",")) + if err != nil { + plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err) + } + cfg.APUrls = []url.URL(u) + } + + if cfg.ACUrlsJSON != "" { + u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ",")) + if err != nil { + plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err) + } + cfg.ACUrls = []url.URL(u) + } + + if cfg.ClusterState == "" { + cfg.ClusterState = ClusterStateFlagNew + } + + copySecurityDetails := func(tls *transport.TLSInfo, ysc *securityConfig) { + tls.CAFile = ysc.CAFile + tls.CertFile = ysc.CertFile + tls.KeyFile = ysc.KeyFile + tls.ClientCertAuth = ysc.CertAuth + tls.TrustedCAFile = ysc.TrustedCAFile + } + copySecurityDetails(&cfg.ClientTLSInfo, &cfg.ClientSecurityJSON) + copySecurityDetails(&cfg.PeerTLSInfo, &cfg.PeerSecurityJSON) + cfg.ClientAutoTLS = cfg.ClientSecurityJSON.AutoTLS + cfg.PeerAutoTLS = cfg.PeerSecurityJSON.AutoTLS + + return cfg.Validate() +} + +func (cfg *Config) Validate() error { + // Check if conflicting flags are passed. + nSet := 0 + for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != ""} { + if v { + nSet++ + } + } + + if cfg.ClusterState != ClusterStateFlagNew && cfg.ClusterState != ClusterStateFlagExisting { + return fmt.Errorf("unexpected clusterState %q", cfg.ClusterState) + } + + if nSet > 1 { + return ErrConflictBootstrapFlags + } + + if 5*cfg.TickMs > cfg.ElectionMs { + return fmt.Errorf("--election-timeout[%vms] should be at least as 5 times as --heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs) + } + if cfg.ElectionMs > maxElectionMs { + return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs) + } + + // check this last since proxying in etcdmain may make this OK + if cfg.LCUrls != nil && cfg.ACUrls == nil { + return ErrUnsetAdvertiseClientURLsFlag + } + + return nil +} + +// PeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery. +func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, token string, err error) { + switch { + case cfg.Durl != "": + urlsmap = types.URLsMap{} + // If using discovery, generate a temporary cluster based on + // self's advertised peer URLs + urlsmap[cfg.Name] = cfg.APUrls + token = cfg.Durl + case cfg.DNSCluster != "": + var clusterStr string + clusterStr, token, err = discovery.SRVGetCluster(cfg.Name, cfg.DNSCluster, cfg.InitialClusterToken, cfg.APUrls) + if err != nil { + return nil, "", err + } + urlsmap, err = types.NewURLsMap(clusterStr) + // only etcd member must belong to the discovered cluster. + // proxy does not need to belong to the discovered cluster. + if which == "etcd" { + if _, ok := urlsmap[cfg.Name]; !ok { + return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name) + } + } + default: + // We're statically configured, and cluster has appropriately been set. + urlsmap, err = types.NewURLsMap(cfg.InitialCluster) + token = cfg.InitialClusterToken + } + return urlsmap, token, err +} + +func (cfg Config) InitialClusterFromName(name string) (ret string) { + if len(cfg.APUrls) == 0 { + return "" + } + n := name + if name == "" { + n = DefaultName + } + for i := range cfg.APUrls { + ret = ret + "," + n + "=" + cfg.APUrls[i].String() + } + return ret[1:] +} + +func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew } +func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) } diff --git a/embed/config_test.go b/embed/config_test.go new file mode 100644 index 000000000000..b39d7503b1f3 --- /dev/null +++ b/embed/config_test.go @@ -0,0 +1,69 @@ +package embed + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/coreos/etcd/pkg/transport" + "github.com/ghodss/yaml" +) + +func TestConfigFileOtherFields(t *testing.T) { + ctls := securityConfig{CAFile: "cca", CertFile: "ccert", KeyFile: "ckey"} + ptls := securityConfig{CAFile: "pca", CertFile: "pcert", KeyFile: "pkey"} + yc := struct { + ClientSecurityCfgFile securityConfig `json:"client-transport-security"` + PeerSecurityCfgFile securityConfig `json:"peer-transport-security"` + ForceNewCluster bool `json:"force-new-cluster"` + }{ + ctls, + ptls, + true, + } + + b, err := yaml.Marshal(&yc) + if err != nil { + t.Fatal(err) + } + + tmpfile := mustCreateCfgFile(t, b) + defer os.Remove(tmpfile.Name()) + + cfg, err := ConfigFromFile(tmpfile.Name()) + if err != nil { + t.Fatal(err) + } + + if !cfg.ForceNewCluster { + t.Errorf("ForceNewCluster = %v, want %v", cfg.ForceNewCluster, true) + } + + if !ctls.equals(&cfg.ClientTLSInfo) { + t.Errorf("ClientTLS = %v, want %v", cfg.ClientTLSInfo, ctls) + } + if !ptls.equals(&cfg.PeerTLSInfo) { + t.Errorf("PeerTLS = %v, want %v", cfg.PeerTLSInfo, ptls) + } +} + +func (s *securityConfig) equals(t *transport.TLSInfo) bool { + return s.CAFile == t.CAFile && + s.CertFile == t.CertFile && + s.CertAuth == t.ClientCertAuth && + s.TrustedCAFile == t.TrustedCAFile +} + +func mustCreateCfgFile(t *testing.T, b []byte) *os.File { + tmpfile, err := ioutil.TempFile("", "servercfg") + if err != nil { + t.Fatal(err) + } + if _, err = tmpfile.Write(b); err != nil { + t.Fatal(err) + } + if err = tmpfile.Close(); err != nil { + t.Fatal(err) + } + return tmpfile +} diff --git a/etcdmain/const_unix.go b/embed/doc.go similarity index 56% rename from etcdmain/const_unix.go rename to embed/doc.go index 419aead6ce33..362fee9bcfe6 100644 --- a/etcdmain/const_unix.go +++ b/embed/doc.go @@ -1,4 +1,4 @@ -// Copyright 2015 The etcd Authors +// Copyright 2016 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,16 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows,!plan9 +/* +Package embed provides bindings for embedding an etcd server in a program. -package etcdmain +Launch an embedded etcd server using the configuration defaults: -import ( - // import procfs FIX godeps. - _ "github.com/prometheus/procfs" -) + import ( + "log" -const ( - defaultMaxSnapshots = 5 - defaultMaxWALs = 5 -) + "github.com/coreos/etcd/embed" + ) + + func main() { + cfg := embed.NewConfig() + cfg.Dir = "default.etcd" + e, err := embed.StartEtcd(cfg) + if err != nil { + log.Fatal(err) + } + defer e.Close() + log.Fatal(<-e.Err()) + } +*/ +package embed diff --git a/embed/etcd.go b/embed/etcd.go new file mode 100644 index 000000000000..bc57cd3fff23 --- /dev/null +++ b/embed/etcd.go @@ -0,0 +1,300 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package embed + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "path" + + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v2http" + "github.com/coreos/etcd/pkg/cors" + runtimeutil "github.com/coreos/etcd/pkg/runtime" + "github.com/coreos/etcd/pkg/transport" + "github.com/coreos/etcd/rafthttp" + "github.com/coreos/pkg/capnslog" +) + +var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed") + +const ( + // internal fd usage includes disk usage and transport usage. + // To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs + // at most 2 to read/lock/write WALs. One case that it needs to 2 is to + // read all logs after some snapshot index, which locates at the end of + // the second last and the head of the last. For purging, it needs to read + // directory, so it needs 1. For fd monitor, it needs 1. + // For transport, rafthttp builds two long-polling connections and at most + // four temporary connections with each member. There are at most 9 members + // in a cluster, so it should reserve 96. + // For the safety, we set the total reserved number to 150. + reservedInternalFDNum = 150 +) + +// Etcd contains a running etcd server and its listeners. +type Etcd struct { + Peers []net.Listener + Clients []net.Listener + Server *etcdserver.EtcdServer + + cfg Config + errc chan error + sctxs map[string]*serveCtx +} + +// StartEtcd launches the etcd server and HTTP handlers for client/server communication. +func StartEtcd(cfg *Config) (e *Etcd, err error) { + if err = cfg.Validate(); err != nil { + return nil, err + } + e = &Etcd{cfg: *cfg} + defer func() { + if err != nil { + e.Close() + e = nil + } + }() + + if e.Peers, err = startPeerListeners(cfg); err != nil { + return + } + if e.sctxs, err = startClientListeners(cfg); err != nil { + return + } + for _, sctx := range e.sctxs { + e.Clients = append(e.Clients, sctx.l) + } + + urlsmap, token, uerr := cfg.PeerURLsMapAndToken("etcd") + if uerr != nil { + err = fmt.Errorf("error setting up initial cluster: %v", uerr) + return + } + + srvcfg := &etcdserver.ServerConfig{ + Name: cfg.Name, + ClientURLs: cfg.ACUrls, + PeerURLs: cfg.APUrls, + DataDir: cfg.Dir, + DedicatedWALDir: cfg.WalDir, + SnapCount: cfg.SnapCount, + MaxSnapFiles: cfg.MaxSnapFiles, + MaxWALFiles: cfg.MaxWalFiles, + InitialPeerURLsMap: urlsmap, + InitialClusterToken: token, + DiscoveryURL: cfg.Durl, + DiscoveryProxy: cfg.Dproxy, + NewCluster: cfg.IsNewCluster(), + ForceNewCluster: cfg.ForceNewCluster, + PeerTLSInfo: cfg.PeerTLSInfo, + TickMs: cfg.TickMs, + ElectionTicks: cfg.ElectionTicks(), + AutoCompactionRetention: cfg.AutoCompactionRetention, + QuotaBackendBytes: cfg.QuotaBackendBytes, + StrictReconfigCheck: cfg.StrictReconfigCheck, + EnablePprof: cfg.EnablePprof, + } + + if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { + return + } + + // buffer channel so goroutines on closed connections won't wait forever + e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) + + e.Server.Start() + e.serve() + <-e.Server.ReadyNotify() + return +} + +func (e *Etcd) Close() { + for _, sctx := range e.sctxs { + sctx.cancel() + } + for i := range e.Peers { + if e.Peers[i] != nil { + e.Peers[i].Close() + } + } + for i := range e.Clients { + if e.Clients[i] != nil { + e.Clients[i].Close() + } + } + if e.Server != nil { + e.Server.Stop() + } +} + +func (e *Etcd) Err() <-chan error { return e.errc } + +func startPeerListeners(cfg *Config) (plns []net.Listener, err error) { + if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() { + phosts := make([]string, 0, len(cfg.LPUrls)) + for i, u := range cfg.LPUrls { + phosts[i] = u.Host + } + cfg.PeerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts) + if err != nil { + plog.Fatalf("could not get certs (%v)", err) + } + } else if cfg.PeerAutoTLS { + plog.Warningf("ignoring peer auto TLS since certs given") + } + + if !cfg.PeerTLSInfo.Empty() { + plog.Infof("peerTLS: %s", cfg.PeerTLSInfo) + } + + plns = make([]net.Listener, len(cfg.LPUrls)) + defer func() { + if err == nil { + return + } + for i := range plns { + if plns[i] == nil { + continue + } + plns[i].Close() + plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String()) + } + }() + + for i, u := range cfg.LPUrls { + var tlscfg *tls.Config + if u.Scheme == "http" { + if !cfg.PeerTLSInfo.Empty() { + plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) + } + if cfg.PeerTLSInfo.ClientCertAuth { + plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } + } + if !cfg.PeerTLSInfo.Empty() { + if tlscfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil { + return nil, err + } + } + if plns[i], err = rafthttp.NewListener(u, tlscfg); err != nil { + return nil, err + } + plog.Info("listening for peers on ", u.String()) + } + return plns, nil +} + +func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { + if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() { + chosts := make([]string, 0, len(cfg.LCUrls)) + for i, u := range cfg.LCUrls { + chosts[i] = u.Host + } + cfg.ClientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts) + if err != nil { + plog.Fatalf("could not get certs (%v)", err) + } + } else if cfg.ClientAutoTLS { + plog.Warningf("ignoring client auto TLS since certs given") + } + + sctxs = make(map[string]*serveCtx) + for _, u := range cfg.LCUrls { + sctx := newServeCtx() + + if u.Scheme == "http" { + if !cfg.ClientTLSInfo.Empty() { + plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) + } + if cfg.ClientTLSInfo.ClientCertAuth { + plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } + } + if u.Scheme == "https" && cfg.ClientTLSInfo.Empty() { + return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) + } + + sctx.secure = u.Scheme == "https" + sctx.insecure = !sctx.secure + if oldctx := sctxs[u.Host]; oldctx != nil { + oldctx.secure = oldctx.secure || sctx.secure + oldctx.insecure = oldctx.insecure || sctx.insecure + continue + } + + if sctx.l, err = net.Listen("tcp", u.Host); err != nil { + return nil, err + } + + if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { + if fdLimit <= reservedInternalFDNum { + plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + } + sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) + } + + if sctx.l, err = transport.NewKeepAliveListener(sctx.l, "tcp", nil); err != nil { + return nil, err + } + + plog.Info("listening for client requests on ", u.Host) + defer func() { + if err != nil { + sctx.l.Close() + plog.Info("stopping listening for client requests on ", u.Host) + } + }() + sctxs[u.Host] = sctx + } + return sctxs, nil +} + +func (e *Etcd) serve() (err error) { + var ctlscfg *tls.Config + if !e.cfg.ClientTLSInfo.Empty() { + plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) + if ctlscfg, err = e.cfg.ClientTLSInfo.ServerConfig(); err != nil { + return err + } + } + + if e.cfg.CorsInfo.String() != "" { + plog.Infof("cors = %s", e.cfg.CorsInfo) + } + ch := http.Handler(&cors.CORSHandler{ + Handler: v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout()), + Info: e.cfg.CorsInfo, + }) + ph := v2http.NewPeerHandler(e.Server) + + // Start the peer server in a goroutine + for _, l := range e.Peers { + go func(l net.Listener) { + e.errc <- servePeerHTTP(l, ph) + }(l) + } + // Start a client server goroutine for each listen address + for _, sctx := range e.sctxs { + // read timeout does not work with http close notify + // TODO: https://github.com/golang/go/issues/9524 + go func(s *serveCtx) { + e.errc <- s.serve(e.Server, ctlscfg, ch, e.errc) + }(sctx) + } + return nil +} diff --git a/etcdmain/serve.go b/embed/serve.go similarity index 75% rename from etcdmain/serve.go rename to embed/serve.go index d485126fc520..e27d7c9816a9 100644 --- a/etcdmain/serve.go +++ b/embed/serve.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdmain +package embed import ( "crypto/tls" @@ -37,17 +37,23 @@ import ( type serveCtx struct { l net.Listener - host string secure bool insecure bool + + ctx context.Context + cancel context.CancelFunc +} + +func newServeCtx() *serveCtx { + ctx, cancel := context.WithCancel(context.Background()) + return &serveCtx{ctx: ctx, cancel: cancel} } // serve accepts incoming connections on the listener l, // creating a new service goroutine for each. The service goroutines // read requests and then call handler to reply to them. -func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error { +func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errc chan<- error) error { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) - <-s.ReadyNotify() plog.Info("ready to serve client requests") @@ -56,12 +62,12 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler if sctx.insecure { gs := v3rpc.Server(s, nil) grpcl := m.Match(cmux.HTTP2()) - go func() { plog.Fatal(gs.Serve(grpcl)) }() + go func() { errc <- gs.Serve(grpcl) }() opts := []grpc.DialOption{ grpc.WithInsecure(), } - gwmux, err := registerGateway(sctx.l.Addr().String(), opts) + gwmux, err := sctx.registerGateway(opts) if err != nil { return err } @@ -74,8 +80,8 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler ErrorLog: logger, // do not log user error } httpl := m.Match(cmux.HTTP1()) - go func() { plog.Fatal(srvhttp.Serve(httpl)) }() - plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host) + go func() { errc <- srvhttp.Serve(httpl) }() + plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String()) } if sctx.secure { @@ -87,7 +93,7 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler dtls.InsecureSkipVerify = true creds := credentials.NewTLS(dtls) opts := []grpc.DialOption{grpc.WithTransportCredentials(creds)} - gwmux, err := registerGateway(sctx.l.Addr().String(), opts) + gwmux, err := sctx.registerGateway(opts) if err != nil { return err } @@ -102,9 +108,9 @@ func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } - go func() { plog.Fatal(srv.Serve(tlsl)) }() + go func() { errc <- srv.Serve(tlsl) }() - plog.Infof("serving client requests on %s", sctx.host) + plog.Infof("serving client requests on %s", sctx.l.Addr().String()) } return m.Serve() @@ -133,30 +139,32 @@ func servePeerHTTP(l net.Listener, handler http.Handler) error { return srv.Serve(l) } -func registerGateway(addr string, opts []grpc.DialOption) (*gw.ServeMux, error) { +func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) { + ctx := sctx.ctx + addr := sctx.l.Addr().String() gwmux := gw.NewServeMux() - err := pb.RegisterKVHandlerFromEndpoint(context.Background(), gwmux, addr, opts) + err := pb.RegisterKVHandlerFromEndpoint(ctx, gwmux, addr, opts) if err != nil { return nil, err } - err = pb.RegisterWatchHandlerFromEndpoint(context.Background(), gwmux, addr, opts) + err = pb.RegisterWatchHandlerFromEndpoint(ctx, gwmux, addr, opts) if err != nil { return nil, err } - err = pb.RegisterLeaseHandlerFromEndpoint(context.Background(), gwmux, addr, opts) + err = pb.RegisterLeaseHandlerFromEndpoint(ctx, gwmux, addr, opts) if err != nil { return nil, err } - err = pb.RegisterClusterHandlerFromEndpoint(context.Background(), gwmux, addr, opts) + err = pb.RegisterClusterHandlerFromEndpoint(ctx, gwmux, addr, opts) if err != nil { return nil, err } - err = pb.RegisterMaintenanceHandlerFromEndpoint(context.Background(), gwmux, addr, opts) + err = pb.RegisterMaintenanceHandlerFromEndpoint(ctx, gwmux, addr, opts) if err != nil { return nil, err } - err = pb.RegisterAuthHandlerFromEndpoint(context.Background(), gwmux, addr, opts) + err = pb.RegisterAuthHandlerFromEndpoint(ctx, gwmux, addr, opts) if err != nil { return nil, err } diff --git a/etcdmain/config.go b/etcdmain/config.go index dbcf84959bc9..dd128dee8f5b 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -20,21 +20,17 @@ import ( "flag" "fmt" "io/ioutil" - "net/url" "os" "runtime" "strings" - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/pkg/cors" + "github.com/coreos/etcd/embed" "github.com/coreos/etcd/pkg/flags" - "github.com/coreos/etcd/pkg/transport" - "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/version" "github.com/ghodss/yaml" ) -const ( +var ( proxyFlagOff = "off" proxyFlagReadonly = "readonly" proxyFlagOn = "on" @@ -42,21 +38,6 @@ const ( fallbackFlagExit = "exit" fallbackFlagProxy = "proxy" - clusterStateFlagNew = "new" - clusterStateFlagExisting = "existing" - - defaultName = "default" - defaultInitialAdvertisePeerURLs = "http://localhost:2380" - defaultAdvertiseClientURLs = "http://localhost:2379" - defaultListenPeerURLs = "http://localhost:2380" - defaultListenClientURLs = "http://localhost:2379" - - // maxElectionMs specifies the maximum value of election timeout. - // More details are listed in ../Documentation/tuning.md#time-parameters. - maxElectionMs = 50000 -) - -var ( ignored = []string{ "cluster-active-size", "cluster-remove-delay", @@ -72,105 +53,60 @@ var ( "v", "vv", } - - ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " + - "Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"") - errUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly") ) -type config struct { - *flag.FlagSet - - // member - corsInfo *cors.CORSInfo - lpurls, lcurls []url.URL - Dir string `json:"data-dir"` - WalDir string `json:"wal-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapCount uint64 `json:"snapshot-count"` - LPUrlsCfgFile string `json:"listen-peer-urls"` - LCUrlsCfgFile string `json:"listen-client-urls"` - CorsCfgFile string `json:"cors"` - - // TickMs is the number of milliseconds between heartbeat ticks. - // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). - // make ticks a cluster wide configuration. - TickMs uint `json:"heartbeat-interval"` - ElectionMs uint `json:"election-timeout"` - QuotaBackendBytes int64 `json:"quota-backend-bytes"` - - // clustering - apurls, acurls []url.URL - clusterState *flags.StringsFlag - DNSCluster string `json:"discovery-srv"` - Dproxy string `json:"discovery-proxy"` - Durl string `json:"discovery"` - fallback *flags.StringsFlag - InitialCluster string `json:"initial-cluster"` - InitialClusterToken string `json:"initial-cluster-token"` - StrictReconfigCheck bool `json:"strict-reconfig-check"` - ApurlsCfgFile string `json:"initial-advertise-peer-urls"` - AcurlsCfgFile string `json:"advertise-client-urls"` - ClusterStateCfgFile string `json:"initial-cluster-state"` - FallbackCfgFile string `json:"discovery-fallback"` - - // proxy - proxy *flags.StringsFlag - ProxyFailureWaitMs uint `json:"proxy-failure-wait"` - ProxyRefreshIntervalMs uint `json:"proxy-refresh-interval"` - ProxyDialTimeoutMs uint `json:"proxy-dial-timeout"` - ProxyWriteTimeoutMs uint `json:"proxy-write-timeout"` - ProxyReadTimeoutMs uint `json:"proxy-read-timeout"` - ProxyCfgFile string `json:"proxy"` - - // security - clientTLSInfo, peerTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerAutoTLS bool - ClientSecurityCfgFile securityConfig `json:"client-transport-security"` - PeerSecurityCfgFile securityConfig `json:"peer-transport-security"` - - // Debug logging - Debug bool `json:"debug"` - LogPkgLevels string `json:"log-package-levels"` - - // ForceNewCluster is unsafe - ForceNewCluster bool `json:"force-new-cluster"` +type configProxy struct { + ProxyFailureWaitMs uint `json:"proxy-failure-wait"` + ProxyRefreshIntervalMs uint `json:"proxy-refresh-interval"` + ProxyDialTimeoutMs uint `json:"proxy-dial-timeout"` + ProxyWriteTimeoutMs uint `json:"proxy-write-timeout"` + ProxyReadTimeoutMs uint `json:"proxy-read-timeout"` + Fallback string + Proxy string + ProxyJSON string `json:"proxy"` + FallbackJSON string `json:"discovery-fallback"` +} +// config holds the config for a command line invocation of etcd +type config struct { + embed.Config + configProxy + configFlags + configFile string printVersion bool - - autoCompactionRetention int - - enablePprof bool - - configFile string - - ignored []string + ignored []string } -type securityConfig struct { - CAFile string `json:"ca-file"` - CertFile string `json:"cert-file"` - KeyFile string `json:"key-file"` - CertAuth bool `json:"client-cert-auth"` - TrustedCAFile string `json:"trusted-ca-file"` - AutoTLS bool `json:"auto-tls"` +// configFlags has the set of flags used for command line parsing a Config +type configFlags struct { + *flag.FlagSet + clusterState *flags.StringsFlag + fallback *flags.StringsFlag + proxy *flags.StringsFlag } -func NewConfig() *config { +func newConfig() *config { cfg := &config{ - corsInfo: &cors.CORSInfo{}, + Config: *embed.NewConfig(), + configProxy: configProxy{ + Proxy: proxyFlagOff, + ProxyFailureWaitMs: 5000, + ProxyRefreshIntervalMs: 30000, + ProxyDialTimeoutMs: 1000, + ProxyWriteTimeoutMs: 5000, + }, + ignored: ignored, + } + cfg.configFlags = configFlags{ + FlagSet: flag.NewFlagSet("etcd", flag.ContinueOnError), clusterState: flags.NewStringsFlag( - clusterStateFlagNew, - clusterStateFlagExisting, + embed.ClusterStateFlagNew, + embed.ClusterStateFlagExisting, ), fallback: flags.NewStringsFlag( fallbackFlagExit, fallbackFlagProxy, ), - ignored: ignored, proxy: flags.NewStringsFlag( proxyFlagOff, proxyFlagReadonly, @@ -178,7 +114,6 @@ func NewConfig() *config { ), } - cfg.FlagSet = flag.NewFlagSet("etcd", flag.ContinueOnError) fs := cfg.FlagSet fs.Usage = func() { fmt.Println(usageline) @@ -187,38 +122,38 @@ func NewConfig() *config { fs.StringVar(&cfg.configFile, "config-file", "", "Path to the server configuration file") // member - fs.Var(cfg.corsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).") - fs.StringVar(&cfg.Dir, "data-dir", "", "Path to the data directory.") - fs.StringVar(&cfg.WalDir, "wal-dir", "", "Path to the dedicated wal directory.") - fs.Var(flags.NewURLsValue(defaultListenPeerURLs), "listen-peer-urls", "List of URLs to listen on for peer traffic.") - fs.Var(flags.NewURLsValue(defaultListenClientURLs), "listen-client-urls", "List of URLs to listen on for client traffic.") - fs.UintVar(&cfg.MaxSnapFiles, "max-snapshots", defaultMaxSnapshots, "Maximum number of snapshot files to retain (0 is unlimited).") - fs.UintVar(&cfg.MaxWalFiles, "max-wals", defaultMaxWALs, "Maximum number of wal files to retain (0 is unlimited).") - fs.StringVar(&cfg.Name, "name", defaultName, "Human-readable name for this member.") - fs.Uint64Var(&cfg.SnapCount, "snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot to disk.") - fs.UintVar(&cfg.TickMs, "heartbeat-interval", 100, "Time (in milliseconds) of a heartbeat interval.") - fs.UintVar(&cfg.ElectionMs, "election-timeout", 1000, "Time (in milliseconds) for an election to timeout.") - fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", 0, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.") + fs.Var(cfg.CorsInfo, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).") + fs.StringVar(&cfg.Dir, "data-dir", cfg.Dir, "Path to the data directory.") + fs.StringVar(&cfg.WalDir, "wal-dir", cfg.WalDir, "Path to the dedicated wal directory.") + fs.Var(flags.NewURLsValue(embed.DefaultListenPeerURLs), "listen-peer-urls", "List of URLs to listen on for peer traffic.") + fs.Var(flags.NewURLsValue(embed.DefaultListenClientURLs), "listen-client-urls", "List of URLs to listen on for client traffic.") + fs.UintVar(&cfg.MaxSnapFiles, "max-snapshots", cfg.MaxSnapFiles, "Maximum number of snapshot files to retain (0 is unlimited).") + fs.UintVar(&cfg.MaxWalFiles, "max-wals", cfg.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).") + fs.StringVar(&cfg.Name, "name", cfg.Name, "Human-readable name for this member.") + fs.Uint64Var(&cfg.SnapCount, "snapshot-count", cfg.SnapCount, "Number of committed transactions to trigger a snapshot to disk.") + fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.") + fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.") + fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.") // clustering - fs.Var(flags.NewURLsValue(defaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.") - fs.Var(flags.NewURLsValue(defaultAdvertiseClientURLs), "advertise-client-urls", "List of this member's client URLs to advertise to the public.") - fs.StringVar(&cfg.Durl, "discovery", "", "Discovery URL used to bootstrap the cluster.") + fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.") + fs.Var(flags.NewURLsValue(embed.DefaultAdvertiseClientURLs), "advertise-client-urls", "List of this member's client URLs to advertise to the public.") + fs.StringVar(&cfg.Durl, "discovery", cfg.Durl, "Discovery URL used to bootstrap the cluster.") fs.Var(cfg.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(cfg.fallback.Values, ", "))) if err := cfg.fallback.Set(fallbackFlagProxy); err != nil { // Should never happen. plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err) } - fs.StringVar(&cfg.Dproxy, "discovery-proxy", "", "HTTP proxy to use for traffic to discovery service.") - fs.StringVar(&cfg.DNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster.") - fs.StringVar(&cfg.InitialCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for bootstrapping.") - fs.StringVar(&cfg.InitialClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap.") + fs.StringVar(&cfg.Dproxy, "discovery-proxy", cfg.Dproxy, "HTTP proxy to use for traffic to discovery service.") + fs.StringVar(&cfg.DNSCluster, "discovery-srv", cfg.DNSCluster, "DNS domain used to bootstrap initial cluster.") + fs.StringVar(&cfg.InitialCluster, "initial-cluster", cfg.InitialCluster, "Initial cluster configuration for bootstrapping.") + fs.StringVar(&cfg.InitialClusterToken, "initial-cluster-token", cfg.InitialClusterToken, "Initial cluster token for the etcd cluster during bootstrap.") fs.Var(cfg.clusterState, "initial-cluster-state", "Initial cluster state ('new' or 'existing').") - if err := cfg.clusterState.Set(clusterStateFlagNew); err != nil { + if err := cfg.clusterState.Set(embed.ClusterStateFlagNew); err != nil { // Should never happen. plog.Panicf("unexpected error setting up clusterStateFlag: %v", err) } - fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", false, "Reject reconfiguration requests that would cause quorum loss.") + fs.BoolVar(&cfg.StrictReconfigCheck, "strict-reconfig-check", cfg.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.") // proxy fs.Var(cfg.proxy, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(cfg.proxy.Values, ", "))) @@ -226,24 +161,24 @@ func NewConfig() *config { // Should never happen. plog.Panicf("unexpected error setting up proxyFlag: %v", err) } - fs.UintVar(&cfg.ProxyFailureWaitMs, "proxy-failure-wait", 5000, "Time (in milliseconds) an endpoint will be held in a failed state.") - fs.UintVar(&cfg.ProxyRefreshIntervalMs, "proxy-refresh-interval", 30000, "Time (in milliseconds) of the endpoints refresh interval.") - fs.UintVar(&cfg.ProxyDialTimeoutMs, "proxy-dial-timeout", 1000, "Time (in milliseconds) for a dial to timeout.") - fs.UintVar(&cfg.ProxyWriteTimeoutMs, "proxy-write-timeout", 5000, "Time (in milliseconds) for a write to timeout.") - fs.UintVar(&cfg.ProxyReadTimeoutMs, "proxy-read-timeout", 0, "Time (in milliseconds) for a read to timeout.") + fs.UintVar(&cfg.ProxyFailureWaitMs, "proxy-failure-wait", cfg.ProxyFailureWaitMs, "Time (in milliseconds) an endpoint will be held in a failed state.") + fs.UintVar(&cfg.ProxyRefreshIntervalMs, "proxy-refresh-interval", cfg.ProxyRefreshIntervalMs, "Time (in milliseconds) of the endpoints refresh interval.") + fs.UintVar(&cfg.ProxyDialTimeoutMs, "proxy-dial-timeout", cfg.ProxyDialTimeoutMs, "Time (in milliseconds) for a dial to timeout.") + fs.UintVar(&cfg.ProxyWriteTimeoutMs, "proxy-write-timeout", cfg.ProxyWriteTimeoutMs, "Time (in milliseconds) for a write to timeout.") + fs.UintVar(&cfg.ProxyReadTimeoutMs, "proxy-read-timeout", cfg.ProxyReadTimeoutMs, "Time (in milliseconds) for a read to timeout.") // security - fs.StringVar(&cfg.clientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.") - fs.StringVar(&cfg.clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.") - fs.StringVar(&cfg.clientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.") - fs.BoolVar(&cfg.clientTLSInfo.ClientCertAuth, "client-cert-auth", false, "Enable client cert authentication.") - fs.StringVar(&cfg.clientTLSInfo.TrustedCAFile, "trusted-ca-file", "", "Path to the client server TLS trusted CA key file.") + fs.StringVar(&cfg.ClientTLSInfo.CAFile, "ca-file", "", "DEPRECATED: Path to the client server TLS CA file.") + fs.StringVar(&cfg.ClientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.") + fs.StringVar(&cfg.ClientTLSInfo.KeyFile, "key-file", "", "Path to the client server TLS key file.") + fs.BoolVar(&cfg.ClientTLSInfo.ClientCertAuth, "client-cert-auth", false, "Enable client cert authentication.") + fs.StringVar(&cfg.ClientTLSInfo.TrustedCAFile, "trusted-ca-file", "", "Path to the client server TLS trusted CA key file.") fs.BoolVar(&cfg.ClientAutoTLS, "auto-tls", false, "Client TLS using generated certificates") - fs.StringVar(&cfg.peerTLSInfo.CAFile, "peer-ca-file", "", "DEPRECATED: Path to the peer server TLS CA file.") - fs.StringVar(&cfg.peerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.") - fs.StringVar(&cfg.peerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.") - fs.BoolVar(&cfg.peerTLSInfo.ClientCertAuth, "peer-client-cert-auth", false, "Enable peer client cert authentication.") - fs.StringVar(&cfg.peerTLSInfo.TrustedCAFile, "peer-trusted-ca-file", "", "Path to the peer server TLS trusted CA file.") + fs.StringVar(&cfg.PeerTLSInfo.CAFile, "peer-ca-file", "", "DEPRECATED: Path to the peer server TLS CA file.") + fs.StringVar(&cfg.PeerTLSInfo.CertFile, "peer-cert-file", "", "Path to the peer server TLS cert file.") + fs.StringVar(&cfg.PeerTLSInfo.KeyFile, "peer-key-file", "", "Path to the peer server TLS key file.") + fs.BoolVar(&cfg.PeerTLSInfo.ClientCertAuth, "peer-client-cert-auth", false, "Enable peer client cert authentication.") + fs.StringVar(&cfg.PeerTLSInfo.TrustedCAFile, "peer-trusted-ca-file", "", "Path to the peer server TLS trusted CA file.") fs.BoolVar(&cfg.PeerAutoTLS, "peer-auto-tls", false, "Peer TLS using generated certificates") // logging @@ -256,10 +191,10 @@ func NewConfig() *config { // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.") - fs.IntVar(&cfg.autoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.") + fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.") // pprof profiler via HTTP - fs.BoolVar(&cfg.enablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") + fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") // ignored for _, f := range cfg.ignored { @@ -268,7 +203,7 @@ func NewConfig() *config { return cfg } -func (cfg *config) Parse(arguments []string) error { +func (cfg *config) parse(arguments []string) error { perr := cfg.FlagSet.Parse(arguments) switch perr { case nil: @@ -293,11 +228,10 @@ func (cfg *config) Parse(arguments []string) error { var err error if cfg.configFile != "" { plog.Infof("Loading server configuration from %q", cfg.configFile) - err = cfg.configFromFile() + err = cfg.configFromFile(cfg.configFile) } else { err = cfg.configFromCmdLine() } - return err } @@ -307,152 +241,72 @@ func (cfg *config) configFromCmdLine() error { plog.Fatalf("%v", err) } - cfg.lpurls = flags.URLsFromFlag(cfg.FlagSet, "listen-peer-urls") - cfg.apurls = flags.URLsFromFlag(cfg.FlagSet, "initial-advertise-peer-urls") - cfg.lcurls = flags.URLsFromFlag(cfg.FlagSet, "listen-client-urls") - cfg.acurls = flags.URLsFromFlag(cfg.FlagSet, "advertise-client-urls") - - return cfg.validateConfig(func(field string) bool { - return flags.IsSet(cfg.FlagSet, field) - }) -} - -func (cfg *config) configFromFile() error { - b, err := ioutil.ReadFile(cfg.configFile) - if err != nil { - return err + cfg.LPUrls = flags.URLsFromFlag(cfg.FlagSet, "listen-peer-urls") + cfg.APUrls = flags.URLsFromFlag(cfg.FlagSet, "initial-advertise-peer-urls") + cfg.LCUrls = flags.URLsFromFlag(cfg.FlagSet, "listen-client-urls") + cfg.ACUrls = flags.URLsFromFlag(cfg.FlagSet, "advertise-client-urls") + cfg.ClusterState = cfg.clusterState.String() + cfg.Fallback = cfg.fallback.String() + cfg.Proxy = cfg.proxy.String() + + // disable default advertise-client-urls if lcurls is set + missingAC := flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") + if !cfg.mayBeProxy() && missingAC { + cfg.ACUrls = nil } - err = yaml.Unmarshal(b, cfg) - if err != nil { - return err + // disable default initial-cluster if discovery is set + if (cfg.Durl != "" || cfg.DNSCluster != "") && !flags.IsSet(cfg.FlagSet, "initial-cluster") { + cfg.InitialCluster = "" } - if cfg.LPUrlsCfgFile != "" { - u, err := types.NewURLs(strings.Split(cfg.LPUrlsCfgFile, ",")) - if err != nil { - plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err) - } - cfg.lpurls = []url.URL(u) - } - - if cfg.LCUrlsCfgFile != "" { - u, err := types.NewURLs(strings.Split(cfg.LCUrlsCfgFile, ",")) - if err != nil { - plog.Fatalf("unexpected error setting up listen-client-urls: %v", err) - } - cfg.lcurls = []url.URL(u) - } - - if cfg.CorsCfgFile != "" { - if err := cfg.corsInfo.Set(cfg.CorsCfgFile); err != nil { - plog.Panicf("unexpected error setting up cors: %v", err) - } - } + return cfg.validate() +} - if cfg.ApurlsCfgFile != "" { - u, err := types.NewURLs(strings.Split(cfg.ApurlsCfgFile, ",")) - if err != nil { - plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err) - } - cfg.apurls = []url.URL(u) +func (cfg *config) configFromFile(path string) error { + eCfg, err := embed.ConfigFromFile(path) + if err != nil { + return err } + cfg.Config = *eCfg - if cfg.AcurlsCfgFile != "" { - u, err := types.NewURLs(strings.Split(cfg.AcurlsCfgFile, ",")) - if err != nil { - plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err) - } - cfg.acurls = []url.URL(u) + // load extra config information + b, rerr := ioutil.ReadFile(path) + if rerr != nil { + return rerr } - - if cfg.ClusterStateCfgFile != "" { - if err := cfg.clusterState.Set(cfg.ClusterStateCfgFile); err != nil { - plog.Panicf("unexpected error setting up clusterStateFlag: %v", err) - } + if yerr := yaml.Unmarshal(b, &cfg.configProxy); yerr != nil { + return yerr } - - if cfg.FallbackCfgFile != "" { - if err := cfg.fallback.Set(cfg.FallbackCfgFile); err != nil { + if cfg.FallbackJSON != "" { + if err := cfg.fallback.Set(cfg.FallbackJSON); err != nil { plog.Panicf("unexpected error setting up discovery-fallback flag: %v", err) } + cfg.Fallback = cfg.fallback.String() } - - if cfg.ProxyCfgFile != "" { - if err := cfg.proxy.Set(cfg.ProxyCfgFile); err != nil { + if cfg.ProxyJSON != "" { + if err := cfg.proxy.Set(cfg.ProxyJSON); err != nil { plog.Panicf("unexpected error setting up proxyFlag: %v", err) } + cfg.Proxy = cfg.proxy.String() } - - copySecurityDetails := func(tls *transport.TLSInfo, ysc *securityConfig) { - tls.CAFile = ysc.CAFile - tls.CertFile = ysc.CertFile - tls.KeyFile = ysc.KeyFile - tls.ClientCertAuth = ysc.CertAuth - tls.TrustedCAFile = ysc.TrustedCAFile - } - copySecurityDetails(&cfg.clientTLSInfo, &cfg.ClientSecurityCfgFile) - copySecurityDetails(&cfg.peerTLSInfo, &cfg.PeerSecurityCfgFile) - cfg.ClientAutoTLS = cfg.ClientSecurityCfgFile.AutoTLS - cfg.PeerAutoTLS = cfg.PeerSecurityCfgFile.AutoTLS - - fieldsToBeChecked := map[string]bool{ - "discovery": (cfg.Durl != ""), - "listen-client-urls": (cfg.LCUrlsCfgFile != ""), - "advertise-client-urls": (cfg.AcurlsCfgFile != ""), - "initial-cluster": (cfg.InitialCluster != ""), - "discovery-srv": (cfg.DNSCluster != ""), - } - - return cfg.validateConfig(func(field string) bool { - return fieldsToBeChecked[field] - }) + return nil } -func (cfg *config) validateConfig(isSet func(field string) bool) error { - // when etcd runs in member mode user needs to set --advertise-client-urls if --listen-client-urls is set. - // TODO(yichengq): check this for joining through discovery service case - mayFallbackToProxy := isSet("discovery") && cfg.fallback.String() == fallbackFlagProxy - mayBeProxy := cfg.proxy.String() != proxyFlagOff || mayFallbackToProxy - if !mayBeProxy { - if isSet("listen-client-urls") && !isSet("advertise-client-urls") { - return errUnsetAdvertiseClientURLsFlag - } - } - - // Check if conflicting flags are passed. - nSet := 0 - for _, v := range []bool{isSet("discovery"), isSet("initial-cluster"), isSet("discovery-srv")} { - if v { - nSet++ - } - } - - if nSet > 1 { - return ErrConflictBootstrapFlags - } - - if 5*cfg.TickMs > cfg.ElectionMs { - return fmt.Errorf("--election-timeout[%vms] should be at least as 5 times as --heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs) - } - if cfg.ElectionMs > maxElectionMs { - return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs) - } - - return nil +func (cfg *config) mayBeProxy() bool { + mayFallbackToProxy := cfg.Durl != "" && cfg.Fallback == fallbackFlagProxy + return cfg.Proxy != proxyFlagOff || mayFallbackToProxy } -func initialClusterFromName(name string) string { - n := name - if name == "" { - n = defaultName +func (cfg *config) validate() error { + err := cfg.Config.Validate() + // TODO(yichengq): check this for joining through discovery service case + if err == embed.ErrUnsetAdvertiseClientURLsFlag && cfg.mayBeProxy() { + return nil } - return fmt.Sprintf("%s=http://localhost:2380", n) + return err } -func (cfg config) isNewCluster() bool { return cfg.clusterState.String() == clusterStateFlagNew } func (cfg config) isProxy() bool { return cfg.proxy.String() != proxyFlagOff } func (cfg config) isReadonlyProxy() bool { return cfg.proxy.String() == proxyFlagReadonly } func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy } - -func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) } diff --git a/etcdmain/config_test.go b/etcdmain/config_test.go index ef402352fece..1acbc1f46112 100644 --- a/etcdmain/config_test.go +++ b/etcdmain/config_test.go @@ -23,6 +23,7 @@ import ( "strings" "testing" + "github.com/coreos/etcd/embed" "github.com/ghodss/yaml" ) @@ -39,8 +40,8 @@ func TestConfigParsingMemberFlags(t *testing.T) { "-advertise-client-urls=http://localhost:7000,https://localhost:7001", } - cfg := NewConfig() - err := cfg.Parse(args) + cfg := newConfig() + err := cfg.parse(args) if err != nil { t.Fatal(err) } @@ -81,9 +82,8 @@ func TestConfigFileMemberFields(t *testing.T) { fmt.Sprintf("--config-file=%s", tmpfile.Name()), } - cfg := NewConfig() - err = cfg.Parse(args) - if err != nil { + cfg := newConfig() + if err = cfg.parse(args); err != nil { t.Fatal(err) } @@ -100,9 +100,8 @@ func TestConfigParsingClusteringFlags(t *testing.T) { "-discovery-fallback=exit", } - cfg := NewConfig() - err := cfg.Parse(args) - if err != nil { + cfg := newConfig() + if err := cfg.parse(args); err != nil { t.Fatal(err) } @@ -137,8 +136,8 @@ func TestConfigFileClusteringFields(t *testing.T) { args := []string{ fmt.Sprintf("--config-file=%s", tmpfile.Name()), } - cfg := NewConfig() - err = cfg.Parse(args) + cfg := newConfig() + err = cfg.parse(args) if err != nil { t.Fatal(err) } @@ -147,19 +146,10 @@ func TestConfigFileClusteringFields(t *testing.T) { } func TestConfigParsingOtherFlags(t *testing.T) { - args := []string{ - "-proxy=readonly", - "-ca-file=cafile", - "-cert-file=certfile", - "-key-file=keyfile", - "-peer-ca-file=peercafile", - "-peer-cert-file=peercertfile", - "-peer-key-file=peerkeyfile", - "-force-new-cluster=true", - } - - cfg := NewConfig() - err := cfg.Parse(args) + args := []string{"-proxy=readonly"} + + cfg := newConfig() + err := cfg.parse(args) if err != nil { t.Fatal(err) } @@ -169,23 +159,9 @@ func TestConfigParsingOtherFlags(t *testing.T) { func TestConfigFileOtherFields(t *testing.T) { yc := struct { - ProxyCfgFile string `json:"proxy"` - ClientSecurityCfgFile securityConfig `json:"client-transport-security"` - PeerSecurityCfgFile securityConfig `json:"peer-transport-security"` - ForceNewCluster bool `json:"force-new-cluster"` + ProxyCfgFile string `json:"proxy"` }{ "readonly", - securityConfig{ - CAFile: "cafile", - CertFile: "certfile", - KeyFile: "keyfile", - }, - securityConfig{ - CAFile: "peercafile", - CertFile: "peercertfile", - KeyFile: "peerkeyfile", - }, - true, } b, err := yaml.Marshal(&yc) @@ -200,8 +176,8 @@ func TestConfigFileOtherFields(t *testing.T) { fmt.Sprintf("--config-file=%s", tmpfile.Name()), } - cfg := NewConfig() - err = cfg.Parse(args) + cfg := newConfig() + err = cfg.parse(args) if err != nil { t.Fatal(err) } @@ -231,10 +207,9 @@ func TestConfigParsingConflictClusteringFlags(t *testing.T) { } for i, tt := range conflictArgs { - cfg := NewConfig() - err := cfg.Parse(tt) - if err != ErrConflictBootstrapFlags { - t.Errorf("%d: err = %v, want %v", i, err, ErrConflictBootstrapFlags) + cfg := newConfig() + if err := cfg.parse(tt); err != embed.ErrConflictBootstrapFlags { + t.Errorf("%d: err = %v, want %v", i, err, embed.ErrConflictBootstrapFlags) } } } @@ -277,10 +252,9 @@ func TestConfigFileConflictClusteringFlags(t *testing.T) { fmt.Sprintf("--config-file=%s", tmpfile.Name()), } - cfg := NewConfig() - err = cfg.Parse(args) - if err != ErrConflictBootstrapFlags { - t.Errorf("%d: err = %v, want %v", i, err, ErrConflictBootstrapFlags) + cfg := newConfig() + if err := cfg.parse(args); err != embed.ErrConflictBootstrapFlags { + t.Errorf("%d: err = %v, want %v", i, err, embed.ErrConflictBootstrapFlags) } } } @@ -295,14 +269,14 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) { "-initial-cluster=infra1=http://127.0.0.1:2380", "-listen-client-urls=http://127.0.0.1:2379", }, - errUnsetAdvertiseClientURLsFlag, + embed.ErrUnsetAdvertiseClientURLsFlag, }, { []string{ "-discovery-srv=example.com", "-listen-client-urls=http://127.0.0.1:2379", }, - errUnsetAdvertiseClientURLsFlag, + embed.ErrUnsetAdvertiseClientURLsFlag, }, { []string{ @@ -310,13 +284,13 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) { "-discovery-fallback=exit", "-listen-client-urls=http://127.0.0.1:2379", }, - errUnsetAdvertiseClientURLsFlag, + embed.ErrUnsetAdvertiseClientURLsFlag, }, { []string{ "-listen-client-urls=http://127.0.0.1:2379", }, - errUnsetAdvertiseClientURLsFlag, + embed.ErrUnsetAdvertiseClientURLsFlag, }, { []string{ @@ -342,9 +316,8 @@ func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) { } for i, tt := range tests { - cfg := NewConfig() - err := cfg.Parse(tt.args) - if err != tt.werr { + cfg := newConfig() + if err := cfg.parse(tt.args); err != tt.werr { t.Errorf("%d: err = %v, want %v", i, err, tt.werr) } } @@ -355,15 +328,16 @@ func TestConfigIsNewCluster(t *testing.T) { state string wIsNew bool }{ - {clusterStateFlagExisting, false}, - {clusterStateFlagNew, true}, + {embed.ClusterStateFlagExisting, false}, + {embed.ClusterStateFlagNew, true}, } for i, tt := range tests { - cfg := NewConfig() - if err := cfg.clusterState.Set(tt.state); err != nil { + cfg := newConfig() + args := []string{"--initial-cluster-state", tests[i].state} + if err := cfg.parse(args); err != nil { t.Fatalf("#%d: unexpected clusterState.Set error: %v", i, err) } - if g := cfg.isNewCluster(); g != tt.wIsNew { + if g := cfg.IsNewCluster(); g != tt.wIsNew { t.Errorf("#%d: isNewCluster = %v, want %v", i, g, tt.wIsNew) } } @@ -379,7 +353,7 @@ func TestConfigIsProxy(t *testing.T) { {proxyFlagOn, true}, } for i, tt := range tests { - cfg := NewConfig() + cfg := newConfig() if err := cfg.proxy.Set(tt.proxy); err != nil { t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err) } @@ -399,7 +373,7 @@ func TestConfigIsReadonlyProxy(t *testing.T) { {proxyFlagOn, false}, } for i, tt := range tests { - cfg := NewConfig() + cfg := newConfig() if err := cfg.proxy.Set(tt.proxy); err != nil { t.Fatalf("#%d: unexpected proxy.Set error: %v", i, err) } @@ -418,7 +392,7 @@ func TestConfigShouldFallbackToProxy(t *testing.T) { {fallbackFlagExit, false}, } for i, tt := range tests { - cfg := NewConfig() + cfg := newConfig() if err := cfg.fallback.Set(tt.fallback); err != nil { t.Fatalf("#%d: unexpected fallback.Set error: %v", i, err) } @@ -458,9 +432,8 @@ func TestConfigFileElectionTimeout(t *testing.T) { fmt.Sprintf("--config-file=%s", tmpfile.Name()), } - cfg := NewConfig() - err = cfg.Parse(args) - if !strings.Contains(err.Error(), tt.errStr) { + cfg := newConfig() + if err := cfg.parse(args); err == nil || !strings.Contains(err.Error(), tt.errStr) { t.Errorf("%d: Wrong err = %v", i, err) } } @@ -485,10 +458,10 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { } func validateMemberFlags(t *testing.T, cfg *config) { - wcfg := &config{ + wcfg := &embed.Config{ Dir: "testdir", - lpurls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - lcurls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, MaxSnapFiles: 10, MaxWalFiles: 10, Name: "testname", @@ -510,25 +483,25 @@ func validateMemberFlags(t *testing.T, cfg *config) { if cfg.SnapCount != wcfg.SnapCount { t.Errorf("snapcount = %v, want %v", cfg.SnapCount, wcfg.SnapCount) } - if !reflect.DeepEqual(cfg.lpurls, wcfg.lpurls) { - t.Errorf("listen-peer-urls = %v, want %v", cfg.lpurls, wcfg.lpurls) + if !reflect.DeepEqual(cfg.LPUrls, wcfg.LPUrls) { + t.Errorf("listen-peer-urls = %v, want %v", cfg.LPUrls, wcfg.LPUrls) } - if !reflect.DeepEqual(cfg.lcurls, wcfg.lcurls) { - t.Errorf("listen-client-urls = %v, want %v", cfg.lcurls, wcfg.lcurls) + if !reflect.DeepEqual(cfg.LCUrls, wcfg.LCUrls) { + t.Errorf("listen-client-urls = %v, want %v", cfg.LCUrls, wcfg.LCUrls) } } func validateClusteringFlags(t *testing.T, cfg *config) { - wcfg := NewConfig() - wcfg.apurls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} - wcfg.acurls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} - wcfg.clusterState.Set(clusterStateFlagExisting) + wcfg := newConfig() + wcfg.APUrls = []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}} + wcfg.ACUrls = []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}} + wcfg.ClusterState = embed.ClusterStateFlagExisting wcfg.fallback.Set(fallbackFlagExit) wcfg.InitialCluster = "0=http://localhost:8000" wcfg.InitialClusterToken = "etcdtest" - if cfg.clusterState.String() != wcfg.clusterState.String() { - t.Errorf("clusterState = %v, want %v", cfg.clusterState, wcfg.clusterState) + if cfg.ClusterState != wcfg.ClusterState { + t.Errorf("clusterState = %v, want %v", cfg.ClusterState, wcfg.ClusterState) } if cfg.fallback.String() != wcfg.fallback.String() { t.Errorf("fallback = %v, want %v", cfg.fallback, wcfg.fallback) @@ -539,35 +512,18 @@ func validateClusteringFlags(t *testing.T, cfg *config) { if cfg.InitialClusterToken != wcfg.InitialClusterToken { t.Errorf("initialClusterToken = %v, want %v", cfg.InitialClusterToken, wcfg.InitialClusterToken) } - if !reflect.DeepEqual(cfg.apurls, wcfg.apurls) { - t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.lpurls, wcfg.lpurls) + if !reflect.DeepEqual(cfg.APUrls, wcfg.APUrls) { + t.Errorf("initial-advertise-peer-urls = %v, want %v", cfg.LPUrls, wcfg.LPUrls) } - if !reflect.DeepEqual(cfg.acurls, wcfg.acurls) { - t.Errorf("advertise-client-urls = %v, want %v", cfg.lcurls, wcfg.lcurls) + if !reflect.DeepEqual(cfg.ACUrls, wcfg.ACUrls) { + t.Errorf("advertise-client-urls = %v, want %v", cfg.LCUrls, wcfg.LCUrls) } } func validateOtherFlags(t *testing.T, cfg *config) { - wcfg := NewConfig() + wcfg := newConfig() wcfg.proxy.Set(proxyFlagReadonly) - wcfg.clientTLSInfo.CAFile = "cafile" - wcfg.clientTLSInfo.CertFile = "certfile" - wcfg.clientTLSInfo.KeyFile = "keyfile" - wcfg.peerTLSInfo.CAFile = "peercafile" - wcfg.peerTLSInfo.CertFile = "peercertfile" - wcfg.peerTLSInfo.KeyFile = "peerkeyfile" - wcfg.ForceNewCluster = true - if cfg.proxy.String() != wcfg.proxy.String() { t.Errorf("proxy = %v, want %v", cfg.proxy, wcfg.proxy) } - if cfg.clientTLSInfo.String() != wcfg.clientTLSInfo.String() { - t.Errorf("clientTLS = %v, want %v", cfg.clientTLSInfo, wcfg.clientTLSInfo) - } - if cfg.peerTLSInfo.String() != wcfg.peerTLSInfo.String() { - t.Errorf("peerTLS = %v, want %v", cfg.peerTLSInfo, wcfg.peerTLSInfo) - } - if cfg.ForceNewCluster != wcfg.ForceNewCluster { - t.Errorf("forceNewCluster = %t, want %t", cfg.ForceNewCluster, wcfg.ForceNewCluster) - } } diff --git a/etcdmain/const_windows.go b/etcdmain/const_windows.go deleted file mode 100644 index 0cbfe9fe38d8..000000000000 --- a/etcdmain/const_windows.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build windows - -package etcdmain - -// TODO(barakmich): So because file locking on Windows is untested, the -// temporary fix is to default to unlimited snapshots and WAL files, with manual -// removal. Perhaps not the most elegant solution, but it's at least safe and -// we'd totally love a PR to fix the story around locking. -const ( - defaultMaxSnapshots = 0 - defaultMaxWALs = 0 -) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 6293f9c4893b..adba4cd98e03 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "net" "net/http" - _ "net/http/pprof" "os" "path" "reflect" @@ -30,17 +29,15 @@ import ( "time" "github.com/coreos/etcd/discovery" + "github.com/coreos/etcd/embed" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/api/v2http" "github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/fileutil" pkgioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/osutil" - runtimeutil "github.com/coreos/etcd/pkg/runtime" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/proxy/httpproxy" - "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/version" "github.com/coreos/go-systemd/daemon" systemdutil "github.com/coreos/go-systemd/util" @@ -52,23 +49,6 @@ type dirType string var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdmain") -const ( - // the owner can make/remove files inside the directory - privateDirMode = 0700 - - // internal fd usage includes disk usage and transport usage. - // To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs - // at most 2 to read/lock/write WALs. One case that it needs to 2 is to - // read all logs after some snapshot index, which locates at the end of - // the second last and the head of the last. For purging, it needs to read - // directory, so it needs 1. For fd monitor, it needs 1. - // For transport, rafthttp builds two long-polling connections and at most - // four temporary connections with each member. There are at most 9 members - // in a cluster, so it should reserve 96. - // For the safety, we set the total reserved number to 150. - reservedInternalFDNum = 150 -) - var ( dirMember = dirType("member") dirProxy = dirType("proxy") @@ -76,12 +56,12 @@ var ( ) func startEtcdOrProxyV2() { - cfg := NewConfig() - err := cfg.Parse(os.Args[1:]) + cfg := newConfig() + err := cfg.parse(os.Args[1:]) if err != nil { plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err) switch err { - case errUnsetAdvertiseClientURLsFlag: + case embed.ErrUnsetAdvertiseClientURLsFlag: plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.") } os.Exit(1) @@ -89,6 +69,7 @@ func startEtcdOrProxyV2() { setupLogging(cfg) var stopped <-chan struct{} + var errc <-chan error plog.Infof("etcd Version: %s\n", version.Version) plog.Infof("Git SHA: %s\n", version.GitSHA) @@ -99,8 +80,8 @@ func startEtcdOrProxyV2() { plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU()) // TODO: check whether fields are set instead of whether fields have default value - if cfg.Name != defaultName && cfg.InitialCluster == initialClusterFromName(defaultName) { - cfg.InitialCluster = initialClusterFromName(cfg.Name) + if cfg.Name != embed.DefaultName && cfg.InitialCluster == cfg.InitialClusterFromName(embed.DefaultName) { + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) } if cfg.Dir == "" { @@ -113,7 +94,7 @@ func startEtcdOrProxyV2() { plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which) switch which { case dirMember: - stopped, err = startEtcd(cfg) + stopped, errc, err = startEtcd(&cfg.Config) case dirProxy: err = startProxy(cfg) default: @@ -122,7 +103,7 @@ func startEtcdOrProxyV2() { } else { shouldProxy := cfg.isProxy() if !shouldProxy { - stopped, err = startEtcd(cfg) + stopped, errc, err = startEtcd(&cfg.Config) if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster { if cfg.shouldFallbackToProxy() { plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy) @@ -157,13 +138,13 @@ func startEtcdOrProxyV2() { if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") { plog.Infof("%v", err) - if cfg.InitialCluster == initialClusterFromName(cfg.Name) { + if cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) { plog.Infof("forgot to set --initial-cluster flag?") } - if types.URLs(cfg.apurls).String() == defaultInitialAdvertisePeerURLs { + if types.URLs(cfg.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs { plog.Infof("forgot to set --initial-advertise-peer-urls flag?") } - if cfg.InitialCluster == initialClusterFromName(cfg.Name) && len(cfg.Durl) == 0 { + if cfg.InitialCluster == cfg.InitialClusterFromName(cfg.Name) && len(cfg.Durl) == 0 { plog.Infof("if you want to use discovery service, please set --discovery flag.") } os.Exit(1) @@ -188,233 +169,43 @@ func startEtcdOrProxyV2() { } } - <-stopped - osutil.Exit(0) -} - -// startEtcd launches the etcd server and HTTP handlers for client/server communication. -func startEtcd(cfg *config) (<-chan struct{}, error) { - urlsmap, token, err := getPeerURLsMapAndToken(cfg, "etcd") - if err != nil { - return nil, fmt.Errorf("error setting up initial cluster: %v", err) - } - - if cfg.PeerAutoTLS && cfg.peerTLSInfo.Empty() { - var phosts []string - for _, u := range cfg.lpurls { - phosts = append(phosts, u.Host) - } - cfg.peerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts) - if err != nil { - plog.Fatalf("could not get certs (%v)", err) - } - } else if cfg.PeerAutoTLS { - plog.Warningf("ignoring peer auto TLS since certs given") - } - - if !cfg.peerTLSInfo.Empty() { - plog.Infof("peerTLS: %s", cfg.peerTLSInfo) + select { + case lerr := <-errc: + // fatal out on listener errors + plog.Fatal(lerr) + case <-stopped: } - var plns []net.Listener - for _, u := range cfg.lpurls { - if u.Scheme == "http" { - if !cfg.peerTLSInfo.Empty() { - plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) - } - if cfg.peerTLSInfo.ClientCertAuth { - plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) - } - } - var ( - l net.Listener - tlscfg *tls.Config - ) - - if !cfg.peerTLSInfo.Empty() { - tlscfg, err = cfg.peerTLSInfo.ServerConfig() - if err != nil { - return nil, err - } - } - - l, err = rafthttp.NewListener(u, tlscfg) - if err != nil { - return nil, err - } - - urlStr := u.String() - plog.Info("listening for peers on ", urlStr) - defer func() { - if err != nil { - l.Close() - plog.Info("stopping listening for peers on ", urlStr) - } - }() - plns = append(plns, l) - } - - if cfg.ClientAutoTLS && cfg.clientTLSInfo.Empty() { - var chosts []string - for _, u := range cfg.lcurls { - chosts = append(chosts, u.Host) - } - cfg.clientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts) - if err != nil { - plog.Fatalf("could not get certs (%v)", err) - } - } else if cfg.ClientAutoTLS { - plog.Warningf("ignoring client auto TLS since certs given") - } - - var ctlscfg *tls.Config - if !cfg.clientTLSInfo.Empty() { - plog.Infof("clientTLS: %s", cfg.clientTLSInfo) - ctlscfg, err = cfg.clientTLSInfo.ServerConfig() - if err != nil { - return nil, err - } - } - - sctxs := make(map[string]*serveCtx) - for _, u := range cfg.lcurls { - if u.Scheme == "http" { - if !cfg.clientTLSInfo.Empty() { - plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) - } - if cfg.clientTLSInfo.ClientCertAuth { - plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) - } - } - if u.Scheme == "https" && ctlscfg == nil { - return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) - } - - ctx := &serveCtx{host: u.Host} - - if u.Scheme == "https" { - ctx.secure = true - } else { - ctx.insecure = true - } - - if sctxs[u.Host] != nil { - if ctx.secure { - sctxs[u.Host].secure = true - } - if ctx.insecure { - sctxs[u.Host].insecure = true - } - continue - } - - var l net.Listener - - l, err = net.Listen("tcp", u.Host) - if err != nil { - return nil, err - } - - var fdLimit uint64 - if fdLimit, err = runtimeutil.FDLimit(); err == nil { - if fdLimit <= reservedInternalFDNum { - plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) - } - l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum)) - } - - l, err = transport.NewKeepAliveListener(l, "tcp", nil) - ctx.l = l - if err != nil { - return nil, err - } - - plog.Info("listening for client requests on ", u.Host) - defer func() { - if err != nil { - l.Close() - plog.Info("stopping listening for client requests on ", u.Host) - } - }() - sctxs[u.Host] = ctx - } + osutil.Exit(0) +} - srvcfg := &etcdserver.ServerConfig{ - Name: cfg.Name, - ClientURLs: cfg.acurls, - PeerURLs: cfg.apurls, - DataDir: cfg.Dir, - DedicatedWALDir: cfg.WalDir, - SnapCount: cfg.SnapCount, - MaxSnapFiles: cfg.MaxSnapFiles, - MaxWALFiles: cfg.MaxWalFiles, - InitialPeerURLsMap: urlsmap, - InitialClusterToken: token, - DiscoveryURL: cfg.Durl, - DiscoveryProxy: cfg.Dproxy, - NewCluster: cfg.isNewCluster(), - ForceNewCluster: cfg.ForceNewCluster, - PeerTLSInfo: cfg.peerTLSInfo, - TickMs: cfg.TickMs, - ElectionTicks: cfg.electionTicks(), - AutoCompactionRetention: cfg.autoCompactionRetention, - QuotaBackendBytes: cfg.QuotaBackendBytes, - StrictReconfigCheck: cfg.StrictReconfigCheck, - EnablePprof: cfg.enablePprof, - } - var s *etcdserver.EtcdServer - s, err = etcdserver.NewServer(srvcfg) +// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd. +func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { + e, err := embed.StartEtcd(cfg) if err != nil { - return nil, err - } - s.Start() - osutil.RegisterInterruptHandler(s.Stop) - - if cfg.corsInfo.String() != "" { - plog.Infof("cors = %s", cfg.corsInfo) - } - ch := http.Handler(&cors.CORSHandler{ - Handler: v2http.NewClientHandler(s, srvcfg.ReqTimeout()), - Info: cfg.corsInfo, - }) - ph := v2http.NewPeerHandler(s) - - // Start the peer server in a goroutine - for _, l := range plns { - go func(l net.Listener) { - plog.Fatal(servePeerHTTP(l, ph)) - }(l) - } - // Start a client server goroutine for each listen address - for _, sctx := range sctxs { - go func(sctx *serveCtx) { - // read timeout does not work with http close notify - // TODO: https://github.com/golang/go/issues/9524 - plog.Fatal(serve(sctx, s, ctlscfg, ch)) - }(sctx) + return nil, nil, err } - - <-s.ReadyNotify() - return s.StopNotify(), nil + osutil.RegisterInterruptHandler(e.Server.Stop) + return e.Server.StopNotify(), e.Err(), nil } // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. func startProxy(cfg *config) error { plog.Notice("proxy: this proxy supports v2 API only!") - pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) + pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { return err } pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost - tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) + tr, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { return err } cfg.Dir = path.Join(cfg.Dir, "proxy") - err = os.MkdirAll(cfg.Dir, privateDirMode) + err = os.MkdirAll(cfg.Dir, fileutil.PrivateDirMode) if err != nil { return err } @@ -440,7 +231,7 @@ func startProxy(cfg *config) error { plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile) case os.IsNotExist(err): var urlsmap types.URLsMap - urlsmap, _, err = getPeerURLsMapAndToken(cfg, "proxy") + urlsmap, _, err = cfg.PeerURLsMapAndToken("proxy") if err != nil { return fmt.Errorf("error setting up initial cluster: %v", err) } @@ -502,20 +293,20 @@ func startProxy(cfg *config) error { ph := httpproxy.NewHandler(pt, uf, time.Duration(cfg.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.ProxyRefreshIntervalMs)*time.Millisecond) ph = &cors.CORSHandler{ Handler: ph, - Info: cfg.corsInfo, + Info: cfg.CorsInfo, } if cfg.isReadonlyProxy() { ph = httpproxy.NewReadonlyHandler(ph) } // Start a proxy server goroutine for each listen address - for _, u := range cfg.lcurls { + for _, u := range cfg.LCUrls { var ( l net.Listener tlscfg *tls.Config ) - if !cfg.clientTLSInfo.Empty() { - tlscfg, err = cfg.clientTLSInfo.ServerConfig() + if !cfg.ClientTLSInfo.Empty() { + tlscfg, err = cfg.ClientTLSInfo.ServerConfig() if err != nil { return err } @@ -538,37 +329,6 @@ func startProxy(cfg *config) error { return nil } -// getPeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery. -func getPeerURLsMapAndToken(cfg *config, which string) (urlsmap types.URLsMap, token string, err error) { - switch { - case cfg.Durl != "": - urlsmap = types.URLsMap{} - // If using discovery, generate a temporary cluster based on - // self's advertised peer URLs - urlsmap[cfg.Name] = cfg.apurls - token = cfg.Durl - case cfg.DNSCluster != "": - var clusterStr string - clusterStr, token, err = discovery.SRVGetCluster(cfg.Name, cfg.DNSCluster, cfg.InitialClusterToken, cfg.apurls) - if err != nil { - return nil, "", err - } - urlsmap, err = types.NewURLsMap(clusterStr) - // only etcd member must belong to the discovered cluster. - // proxy does not need to belong to the discovered cluster. - if which == "etcd" { - if _, ok := urlsmap[cfg.Name]; !ok { - return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name) - } - } - default: - // We're statically configured, and cluster has appropriately been set. - urlsmap, err = types.NewURLsMap(cfg.InitialCluster) - token = cfg.InitialClusterToken - } - return urlsmap, token, err -} - // identifyDataDirOrDie returns the type of the data dir. // Dies if the datadir is invalid. func identifyDataDirOrDie(dir string) dirType { diff --git a/etcdmain/help.go b/etcdmain/help.go index 1be8245b9059..e425a90cd36a 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -14,7 +14,11 @@ package etcdmain -import "strconv" +import ( + "strconv" + + "github.com/coreos/etcd/embed" +) var ( usageline = `usage: etcd [flags] @@ -48,9 +52,9 @@ member flags: list of URLs to listen on for peer traffic. --listen-client-urls 'http://localhost:2379' list of URLs to listen on for client traffic. - --max-snapshots '` + strconv.Itoa(defaultMaxSnapshots) + `' + --max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `' maximum number of snapshot files to retain (0 is unlimited). - --max-wals '` + strconv.Itoa(defaultMaxWALs) + `' + --max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `' maximum number of wal files to retain (0 is unlimited). --cors '' comma-separated whitelist of origins for CORS (cross-origin resource sharing).