diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 5633df48beb3..ce08ef87a7ee 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -16,6 +16,8 @@ package etcdmain import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "io/ioutil" "log" @@ -114,7 +116,7 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address") cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "domain name to query for SRV records describing cluster endpoints") cmd.Flags().StringVar(&grpcProxyDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery") - cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface") + cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for endpoint /metrics requests on an additional interface") cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records") cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints") cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)") @@ -184,7 +186,6 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsinfo))) } m := mustListenCMux(lg, tlsinfo) - grpcl := m.Match(cmux.HTTP2()) defer func() { grpcl.Close() @@ -192,6 +193,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { }() client := mustNewClient(lg) + httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client) errc := make(chan error) @@ -202,7 +204,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { mhttpl := mustMetricsListener(lg, tlsinfo) go func() { mux := http.NewServeMux() - etcdhttp.HandlePrometheus(mux) + grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints()) grpcproxy.HandleHealth(mux, client) lg.Info("gRPC proxy server metrics URL serving") herr := http.Serve(mhttpl, mux) @@ -406,6 +408,48 @@ func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c return srvhttp, m.Match(cmux.Any()) } +func mustNewHTTPClient(lg *zap.Logger) *http.Client { + srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery, grpcProxyDNSClusterServiceName) + eps := srvs.Endpoints + if len(eps) == 0 { + eps = grpcProxyEndpoints + } + transport, err := newHTTPTransport(grpcProxyCA, grpcProxyCert, grpcProxyKey) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + return &http.Client{Transport: transport} +} + +func newHTTPTransport(ca, cert, key string) (*http.Transport, error) { + tr := &http.Transport{} + + if ca != "" && cert != "" && key != "" { + caCert, err := ioutil.ReadFile(ca) + if err != nil { + return nil, err + } + keyPair, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return nil, err + } + caPool := x509.NewCertPool() + caPool.AppendCertsFromPEM(caCert) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{keyPair}, + RootCAs: caPool, + } + tlsConfig.BuildNameToCertificate() + tr.TLSClientConfig = tlsConfig + } else if grpcProxyInsecureSkipTLSVerify { + tlsConfig := &tls.Config{InsecureSkipVerify: grpcProxyInsecureSkipTLSVerify} + tr.TLSClientConfig = tlsConfig + } + return tr, nil +} + func mustMetricsListener(lg *zap.Logger, tlsinfo *transport.TLSInfo) net.Listener { murl, err := url.Parse(grpcProxyMetricsListenAddr) if err != nil { diff --git a/etcdserver/api/etcdhttp/metrics.go b/etcdserver/api/etcdhttp/metrics.go index 4d058e46719b..f455e40a7406 100644 --- a/etcdserver/api/etcdhttp/metrics.go +++ b/etcdserver/api/etcdhttp/metrics.go @@ -29,19 +29,19 @@ import ( ) const ( - pathMetrics = "/metrics" + PathMetrics = "/metrics" PathHealth = "/health" ) // HandleMetricsHealth registers metrics and health handlers. func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) { - mux.Handle(pathMetrics, promhttp.Handler()) + mux.Handle(PathMetrics, promhttp.Handler()) mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) })) } // HandlePrometheus registers prometheus handler on '/metrics'. func HandlePrometheus(mux *http.ServeMux) { - mux.Handle(pathMetrics, promhttp.Handler()) + mux.Handle(PathMetrics, promhttp.Handler()) } // NewHealthHandler handles '/health' requests. diff --git a/proxy/grpcproxy/metrics.go b/proxy/grpcproxy/metrics.go index 864fa1609a0e..2a848db29ed5 100644 --- a/proxy/grpcproxy/metrics.go +++ b/proxy/grpcproxy/metrics.go @@ -14,7 +14,17 @@ package grpcproxy -import "github.com/prometheus/client_golang/prometheus" +import ( + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.etcd.io/etcd/etcdserver/api/etcdhttp" +) var ( watchersCoalescing = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -56,3 +66,43 @@ func init() { prometheus.MustRegister(cacheHits) prometheus.MustRegister(cachedMisses) } + +// HandleMetrics handler adds passthrough to endpoint '/metrics'. +func HandleMetrics(mux *http.ServeMux, c *http.Client, eps []string) { + // random shuffle endpoints + r := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) + eps = shuffleEndpoints(r, eps) + + pathMetrics := etcdhttp.PathMetrics + target := fmt.Sprintf("%s%s", eps[0], pathMetrics) + if !strings.HasPrefix(target, "http") { + target = fmt.Sprintf("%s%s", "http://", target) + } + + mux.HandleFunc(pathMetrics, func(w http.ResponseWriter, r *http.Request) { + resp, err := c.Get(target) + if err != nil { + http.Error(w, "Internal server error", http.StatusInternalServerError) + } + defer resp.Body.Close() + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + body, _ := ioutil.ReadAll(resp.Body) + fmt.Fprintf(w, "%s", body) + }) +} + +func shuffleEndpoints(r *rand.Rand, eps []string) []string { + // copied from Go 1.9<= rand.Rand.Perm + n := len(eps) + p := make([]int, n) + for i := 0; i < n; i++ { + j := r.Intn(i + 1) + p[i] = p[j] + p[j] = i + } + neps := make([]string, n) + for i, k := range p { + neps[i] = eps[k] + } + return neps +}