Skip to content

Commit

Permalink
feat: add support for health-check flag (GoogleCloudPlatform#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
enocom authored Jul 28, 2022
1 parent 1e7b33e commit e0b95b9
Show file tree
Hide file tree
Showing 7 changed files with 596 additions and 51 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ and from a AlloyDB instance. The `ALL_PROXY` environment variable supports
`socks5h` protocol.

The `HTTPS_PROXY` (or `HTTP_PROXY`) specifies the proxy for all HTTP(S) traffic
to the SQL Admin API. Specifying `HTTPS_PROXY` or `HTTP_PROXY` is only necessary
to the AlloyDB Admin API. Specifying `HTTPS_PROXY` or `HTTP_PROXY` is only necessary
when you want to proxy this traffic. Otherwise, it is optional. See
[`http.ProxyFromEnvironment`](https://pkg.go.dev/net/http@go1.17.3#ProxyFromEnvironment)
for possible values.
Expand Down
98 changes: 61 additions & 37 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"contrib.go.opencensus.io/exporter/prometheus"
"contrib.go.opencensus.io/exporter/stackdriver"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/alloydb"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/healthcheck"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/log"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/proxy"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -88,6 +89,7 @@ type Command struct {
telemetryProject string
telemetryPrefix string
prometheusNamespace string
healthCheck bool
httpPort string
}

Expand Down Expand Up @@ -186,6 +188,10 @@ the maximum time has passed. Defaults to 0s.`)
"Enable Prometheus for metric collection using the provided namespace")
cmd.PersistentFlags().StringVar(&c.httpPort, "http-port", "9090",
"Port for the Prometheus server to use")
cmd.PersistentFlags().BoolVar(&c.healthCheck, "health-check", false,
`Enables HTTP endpoints /startup, /liveness, and /readiness
that report on the proxy's health. Endpoints are available on localhost
only. Uses the port specified by the http-port flag.`)

// Global and per instance flags
cmd.PersistentFlags().StringVarP(&c.conf.Addr, "address", "a", "127.0.0.1",
Expand Down Expand Up @@ -241,18 +247,18 @@ func parseConfig(cmd *Command, conf *proxy.Config, args []string) error {
cmd.logger.Infof("Using API Endpoint %v", conf.APIEndpointURL)
}

if userHasSet("http-port") && !userHasSet("prometheus-namespace") {
return newBadCommandError("cannot specify --http-port without --prometheus-namespace")
if userHasSet("http-port") && !userHasSet("prometheus-namespace") && !userHasSet("health-check") {
cmd.logger.Infof("Ignoring --http-port because --prometheus-namespace or --health-check was not set")
}

if !userHasSet("telemetry-project") && userHasSet("telemetry-prefix") {
cmd.logger.Infof("Ignoring telementry-prefix as telemetry-project was not set")
cmd.logger.Infof("Ignoring --telementry-prefix as --telemetry-project was not set")
}
if !userHasSet("telemetry-project") && userHasSet("disable-metrics") {
cmd.logger.Infof("Ignoring disable-metrics as telemetry-project was not set")
cmd.logger.Infof("Ignoring --disable-metrics as --telemetry-project was not set")
}
if !userHasSet("telemetry-project") && userHasSet("disable-traces") {
cmd.logger.Infof("Ignoring disable-traces as telemetry-project was not set")
cmd.logger.Infof("Ignoring --disable-traces as --telemetry-project was not set")
}

var ics []proxy.InstanceConnConfig
Expand Down Expand Up @@ -328,9 +334,8 @@ func runSignalWrapper(cmd *Command) error {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Configure Cloud Trace and/or Cloud Monitoring based on command
// invocation. If a project has not been enabled, no traces or metrics are
// enabled.
// Configure collectors before the proxy has started to ensure we are
// collecting metrics before *ANY* AlloyDB Admin API calls are made.
enableMetrics := !cmd.disableMetrics
enableTraces := !cmd.disableTraces
if cmd.telemetryProject != "" && (enableMetrics || enableTraces) {
Expand Down Expand Up @@ -358,40 +363,22 @@ func runSignalWrapper(cmd *Command) error {
}()
}

shutdownCh := make(chan error)

var (
needsHTTPServer bool
mux = http.NewServeMux()
)
if cmd.prometheusNamespace != "" {
needsHTTPServer = true
e, err := prometheus.NewExporter(prometheus.Options{
Namespace: cmd.prometheusNamespace,
})
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/metrics", e)
addr := fmt.Sprintf("localhost:%s", cmd.httpPort)
server := &http.Server{Addr: addr, Handler: mux}
go func() {
select {
case <-ctx.Done():
// Give the HTTP server a second to shutdown cleanly.
ctx2, _ := context.WithTimeout(context.Background(), time.Second)
if err := server.Shutdown(ctx2); err != nil {
cmd.logger.Errorf("failed to shutdown Prometheus HTTP server: %v\n", err)
}
}
}()
go func() {
err := server.ListenAndServe()
if err == http.ErrServerClosed {
return
}
if err != nil {
shutdownCh <- fmt.Errorf("failed to start prometheus HTTP server: %v", err)
}
}()
}

shutdownCh := make(chan error)
// watch for sigterm / sigint signals
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
Expand Down Expand Up @@ -429,18 +416,55 @@ func runSignalWrapper(cmd *Command) error {
cmd.logger.Errorf("The proxy has encountered a terminal error: %v", err)
return err
case p = <-startCh:
cmd.logger.Infof("The proxy has started successfully and is ready for new connections!")
}
cmd.logger.Infof("The proxy has started successfully and is ready for new connections!")
defer p.Close()
defer func() {
if cErr := p.Close(); cErr != nil {
cmd.logger.Errorf("error during shutdown: %v", cErr)
}
}()

go func() {
shutdownCh <- p.Serve(ctx)
}()
notify := func() {}
if cmd.healthCheck {
needsHTTPServer = true
hc := healthcheck.NewCheck(p, cmd.logger)
mux.HandleFunc("/startup", hc.HandleStartup)
mux.HandleFunc("/readiness", hc.HandleReadiness)
mux.HandleFunc("/liveness", hc.HandleLiveness)
notify = hc.NotifyStarted
}

// Start the HTTP server if anything requiring HTTP is specified.
if needsHTTPServer {
server := &http.Server{
Addr: fmt.Sprintf("localhost:%s", cmd.httpPort),
Handler: mux,
}
// Start the HTTP server.
go func() {
err := server.ListenAndServe()
if err == http.ErrServerClosed {
return
}
if err != nil {
shutdownCh <- fmt.Errorf("failed to start HTTP server: %v", err)
}
}()
// Handle shutdown of the HTTP server gracefully.
go func() {
select {
case <-ctx.Done():
// Give the HTTP server a second to shutdown cleanly.
ctx2, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := server.Shutdown(ctx2); err != nil {
cmd.logger.Errorf("failed to shutdown Prometheus HTTP server: %v\n", err)
}
}
}()
}

go func() { shutdownCh <- p.Serve(ctx, notify) }()

err := <-shutdownCh
switch {
Expand Down
4 changes: 0 additions & 4 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ func TestNewCommandWithErrors(t *testing.T) {
desc: "using the unix socket and port query params",
args: []string{"projects/proj/locations/region/clusters/clust/instances/inst?unix-socket=/path&port=5000"},
},
{
desc: "enabling a Prometheus port without a namespace",
args: []string{"--http-port", "1111", "proj:region:inst"},
},
{
desc: "using an invalid url for host flag",
args: []string{"--host", "https://invalid:url[/]", "proj:region:inst"},
Expand Down
110 changes: 110 additions & 0 deletions internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package healthcheck tests and communicates the health of the AlloyDB Auth
// proxy.
package healthcheck

import (
"context"
"errors"
"fmt"
"net/http"
"sync"

"github.com/GoogleCloudPlatform/alloydb-auth-proxy/alloydb"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/proxy"
)

// Check provides HTTP handlers for use as healthchecks typically in a
// Kubernetes context.
type Check struct {
once *sync.Once
started chan struct{}
proxy *proxy.Client
logger alloydb.Logger
}

// NewCheck is the initializer for Check.
func NewCheck(p *proxy.Client, l alloydb.Logger) *Check {
return &Check{
once: &sync.Once{},
started: make(chan struct{}),
proxy: p,
logger: l,
}
}

// NotifyStarted notifies the check that the proxy has started up successfully.
func (c *Check) NotifyStarted() {
c.once.Do(func() { close(c.started) })
}

// HandleStartup reports whether the Check has been notified of startup.
func (c *Check) HandleStartup(w http.ResponseWriter, _ *http.Request) {
select {
case <-c.started:
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
default:
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
}
}

var errNotStarted = errors.New("proxy is not started")

// HandleReadiness ensures the Check has been notified of successful startup,
// that the proxy has not reached maximum connections, and that all connections
// are healthy.
func (c *Check) HandleReadiness(w http.ResponseWriter, _ *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

select {
case <-c.started:
default:
c.logger.Errorf("[Health Check] Readiness failed: %v", errNotStarted)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(errNotStarted.Error()))
return
}

if open, max := c.proxy.ConnCount(); max > 0 && open == max {
err := fmt.Errorf("max connections reached (open = %v, max = %v)", open, max)
c.logger.Errorf("[Health Check] Readiness failed: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(err.Error()))
return
}

err := c.proxy.CheckConnections(ctx)
if err != nil {
c.logger.Errorf("[Health Check] Readiness failed: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(err.Error()))
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}

// HandleLiveness indicates the process is up and responding to HTTP requests.
// If this check fails (because it's not reachable), the process is in a bad
// state and should be restarted.
func (c *Check) HandleLiveness(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}
Loading

0 comments on commit e0b95b9

Please sign in to comment.