Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for health-check flag #85

Merged
merged 2 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ and from a AlloyDB instance. The `ALL_PROXY` environment variable supports
`socks5h` protocol.

The `HTTPS_PROXY` (or `HTTP_PROXY`) specifies the proxy for all HTTP(S) traffic
to the SQL Admin API. Specifying `HTTPS_PROXY` or `HTTP_PROXY` is only necessary
to the AlloyDB Admin API. Specifying `HTTPS_PROXY` or `HTTP_PROXY` is only necessary
when you want to proxy this traffic. Otherwise, it is optional. See
[`http.ProxyFromEnvironment`](https://pkg.go.dev/net/http@go1.17.3#ProxyFromEnvironment)
for possible values.
Expand Down
98 changes: 61 additions & 37 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"contrib.go.opencensus.io/exporter/prometheus"
"contrib.go.opencensus.io/exporter/stackdriver"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/alloydb"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/healthcheck"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/log"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/proxy"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -88,6 +89,7 @@ type Command struct {
telemetryProject string
telemetryPrefix string
prometheusNamespace string
healthCheck bool
httpPort string
}

Expand Down Expand Up @@ -186,6 +188,10 @@ the maximum time has passed. Defaults to 0s.`)
"Enable Prometheus for metric collection using the provided namespace")
cmd.PersistentFlags().StringVar(&c.httpPort, "http-port", "9090",
"Port for the Prometheus server to use")
cmd.PersistentFlags().BoolVar(&c.healthCheck, "health-check", false,
`Enables HTTP endpoints /startup, /liveness, and /readiness
that report on the proxy's health. Endpoints are available on localhost
only. Uses the port specified by the http-port flag.`)

// Global and per instance flags
cmd.PersistentFlags().StringVarP(&c.conf.Addr, "address", "a", "127.0.0.1",
Expand Down Expand Up @@ -241,18 +247,18 @@ func parseConfig(cmd *Command, conf *proxy.Config, args []string) error {
cmd.logger.Infof("Using API Endpoint %v", conf.APIEndpointURL)
}

if userHasSet("http-port") && !userHasSet("prometheus-namespace") {
return newBadCommandError("cannot specify --http-port without --prometheus-namespace")
if userHasSet("http-port") && !userHasSet("prometheus-namespace") && !userHasSet("health-check") {
cmd.logger.Infof("Ignoring --http-port because --prometheus-namespace or --health-check was not set")
}

if !userHasSet("telemetry-project") && userHasSet("telemetry-prefix") {
cmd.logger.Infof("Ignoring telementry-prefix as telemetry-project was not set")
cmd.logger.Infof("Ignoring --telementry-prefix as --telemetry-project was not set")
}
if !userHasSet("telemetry-project") && userHasSet("disable-metrics") {
cmd.logger.Infof("Ignoring disable-metrics as telemetry-project was not set")
cmd.logger.Infof("Ignoring --disable-metrics as --telemetry-project was not set")
}
if !userHasSet("telemetry-project") && userHasSet("disable-traces") {
cmd.logger.Infof("Ignoring disable-traces as telemetry-project was not set")
cmd.logger.Infof("Ignoring --disable-traces as --telemetry-project was not set")
}

var ics []proxy.InstanceConnConfig
Expand Down Expand Up @@ -328,9 +334,8 @@ func runSignalWrapper(cmd *Command) error {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

// Configure Cloud Trace and/or Cloud Monitoring based on command
// invocation. If a project has not been enabled, no traces or metrics are
// enabled.
// Configure collectors before the proxy has started to ensure we are
// collecting metrics before *ANY* AlloyDB Admin API calls are made.
enableMetrics := !cmd.disableMetrics
enableTraces := !cmd.disableTraces
if cmd.telemetryProject != "" && (enableMetrics || enableTraces) {
Expand Down Expand Up @@ -358,40 +363,22 @@ func runSignalWrapper(cmd *Command) error {
}()
}

shutdownCh := make(chan error)

var (
needsHTTPServer bool
mux = http.NewServeMux()
)
if cmd.prometheusNamespace != "" {
needsHTTPServer = true
e, err := prometheus.NewExporter(prometheus.Options{
Namespace: cmd.prometheusNamespace,
})
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/metrics", e)
addr := fmt.Sprintf("localhost:%s", cmd.httpPort)
server := &http.Server{Addr: addr, Handler: mux}
go func() {
select {
case <-ctx.Done():
// Give the HTTP server a second to shutdown cleanly.
ctx2, _ := context.WithTimeout(context.Background(), time.Second)
if err := server.Shutdown(ctx2); err != nil {
cmd.logger.Errorf("failed to shutdown Prometheus HTTP server: %v\n", err)
}
}
}()
go func() {
err := server.ListenAndServe()
if err == http.ErrServerClosed {
return
}
if err != nil {
shutdownCh <- fmt.Errorf("failed to start prometheus HTTP server: %v", err)
}
}()
}

shutdownCh := make(chan error)
// watch for sigterm / sigint signals
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
Expand Down Expand Up @@ -429,18 +416,55 @@ func runSignalWrapper(cmd *Command) error {
cmd.logger.Errorf("The proxy has encountered a terminal error: %v", err)
return err
case p = <-startCh:
cmd.logger.Infof("The proxy has started successfully and is ready for new connections!")
}
cmd.logger.Infof("The proxy has started successfully and is ready for new connections!")
defer p.Close()
defer func() {
if cErr := p.Close(); cErr != nil {
cmd.logger.Errorf("error during shutdown: %v", cErr)
}
}()

go func() {
shutdownCh <- p.Serve(ctx)
}()
notify := func() {}
if cmd.healthCheck {
needsHTTPServer = true
hc := healthcheck.NewCheck(p, cmd.logger)
mux.HandleFunc("/startup", hc.HandleStartup)
mux.HandleFunc("/readiness", hc.HandleReadiness)
mux.HandleFunc("/liveness", hc.HandleLiveness)
notify = hc.NotifyStarted
}

// Start the HTTP server if anything requiring HTTP is specified.
if needsHTTPServer {
server := &http.Server{
Addr: fmt.Sprintf("localhost:%s", cmd.httpPort),
Handler: mux,
}
// Start the HTTP server.
go func() {
err := server.ListenAndServe()
if err == http.ErrServerClosed {
return
}
if err != nil {
shutdownCh <- fmt.Errorf("failed to start HTTP server: %v", err)
}
}()
// Handle shutdown of the HTTP server gracefully.
go func() {
select {
case <-ctx.Done():
// Give the HTTP server a second to shutdown cleanly.
ctx2, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := server.Shutdown(ctx2); err != nil {
cmd.logger.Errorf("failed to shutdown Prometheus HTTP server: %v\n", err)
}
}
}()
}

go func() { shutdownCh <- p.Serve(ctx, notify) }()

err := <-shutdownCh
switch {
Expand Down
4 changes: 0 additions & 4 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ func TestNewCommandWithErrors(t *testing.T) {
desc: "using the unix socket and port query params",
args: []string{"projects/proj/locations/region/clusters/clust/instances/inst?unix-socket=/path&port=5000"},
},
{
desc: "enabling a Prometheus port without a namespace",
args: []string{"--http-port", "1111", "proj:region:inst"},
},
{
desc: "using an invalid url for host flag",
args: []string{"--host", "https://invalid:url[/]", "proj:region:inst"},
Expand Down
110 changes: 110 additions & 0 deletions internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package healthcheck tests and communicates the health of the AlloyDB Auth
// proxy.
package healthcheck

import (
"context"
"errors"
"fmt"
"net/http"
"sync"

"github.com/GoogleCloudPlatform/alloydb-auth-proxy/alloydb"
"github.com/GoogleCloudPlatform/alloydb-auth-proxy/internal/proxy"
)

// Check provides HTTP handlers for use as healthchecks typically in a
// Kubernetes context.
type Check struct {
once *sync.Once
started chan struct{}
proxy *proxy.Client
logger alloydb.Logger
}

// NewCheck is the initializer for Check.
func NewCheck(p *proxy.Client, l alloydb.Logger) *Check {
return &Check{
once: &sync.Once{},
started: make(chan struct{}),
proxy: p,
logger: l,
}
}

// NotifyStarted notifies the check that the proxy has started up successfully.
func (c *Check) NotifyStarted() {
c.once.Do(func() { close(c.started) })
}

// HandleStartup reports whether the Check has been notified of startup.
func (c *Check) HandleStartup(w http.ResponseWriter, _ *http.Request) {
select {
case <-c.started:
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
default:
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
}
}

var errNotStarted = errors.New("proxy is not started")

// HandleReadiness ensures the Check has been notified of successful startup,
// that the proxy has not reached maximum connections, and that all connections
// are healthy.
func (c *Check) HandleReadiness(w http.ResponseWriter, _ *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

select {
case <-c.started:
default:
c.logger.Errorf("[Health Check] Readiness failed: %v", errNotStarted)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(errNotStarted.Error()))
return
}

if open, max := c.proxy.ConnCount(); max > 0 && open == max {
err := fmt.Errorf("max connections reached (open = %v, max = %v)", open, max)
c.logger.Errorf("[Health Check] Readiness failed: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(err.Error()))
return
}

err := c.proxy.CheckConnections(ctx)
if err != nil {
c.logger.Errorf("[Health Check] Readiness failed: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(err.Error()))
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}

// HandleLiveness indicates the process is up and responding to HTTP requests.
// If this check fails (because it's not reachable), the process is in a bad
// state and should be restarted.
func (c *Check) HandleLiveness(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}
Loading