Skip to content

Commit

Permalink
Backport of proxy-lifecycle: add HTTP Server with endpoints for proxy…
Browse files Browse the repository at this point in the history
… lifecycle shutdown into release/1.1.x (#181)

* backport of commit 937d893

* backport of commit 722f263

* backport of commit 2af6219

* backport of commit a1c21c9

* backport of commit 68f206d

* backport of commit 892392d

* backport of commit bb0f87a

* backport of commit cde897a

* backport of commit 471a087

* backport of commit 5b54f12

* backport of commit 2852040

* backport of commit bbb3785

* backport of commit c7e8f86

* backport of commit ae041fc

* backport of commit 52e5fd5

* backport of commit 095aaf0

* backport of commit 2b0f0ee

* backport of commit bf9acdb

* backport of commit 9833553

* backport of commit f0dfd78

* backport of commit 7f9b0f0

* backport of commit 8c8141c

* backport of commit f98ce24

* backport of commit 91a5b81

* backport of commit bfea751

* backport of commit aadfeed

* backport of commit 496d196

* backport of commit 4340c2f

* backport of commit 52b4557

* backport of commit 21595f0

* backport of commit b5e3aea

* backport of commit bf8f0c8

* backport of commit 790881e

* fix missing method

---------

Co-authored-by: Mike Morris <mikemorris@users.noreply.github.com>
Co-authored-by: Curt Bushko <cbushko@gmail.com>
  • Loading branch information
3 people authored Jun 27, 2023
1 parent 38bb3f0 commit 954f060
Show file tree
Hide file tree
Showing 11 changed files with 595 additions and 38 deletions.
3 changes: 3 additions & 0 deletions .changelog/115.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
Add HTTP server with configurable port and endpoint path for initiating graceful shutdown.
```
2 changes: 1 addition & 1 deletion .github/workflows/consul-dataplane-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- uses: actions/setup-go@v3
with:
go-version: ${{ needs.get-go-version.outputs.go-version }}
- run: go test ./...
- run: go test ./... -p 1 # disable parallelism to avoid port conflicts from default metrics and lifecycle server configuration
integration-tests:
name: integration-tests
needs:
Expand Down
16 changes: 11 additions & 5 deletions cmd/consul-dataplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ var (
promScrapePath string
promMergePort int

adminBindAddr string
adminBindPort int
readyBindAddr string
readyBindPort int
envoyConcurrency int
adminBindAddr string
adminBindPort int
readyBindAddr string
readyBindPort int
envoyConcurrency int
envoyDrainTimeSeconds int
envoyDrainStrategy string

xdsBindAddr string
xdsBindPort int
Expand Down Expand Up @@ -128,6 +130,8 @@ func init() {
StringVar(&readyBindAddr, "envoy-ready-bind-address", "", "DP_ENVOY_READY_BIND_ADDRESS", "The address on which Envoy's readiness probe is available.")
IntVar(&readyBindPort, "envoy-ready-bind-port", 0, "DP_ENVOY_READY_BIND_PORT", "The port on which Envoy's readiness probe is available.")
IntVar(&envoyConcurrency, "envoy-concurrency", 2, "DP_ENVOY_CONCURRENCY", "The number of worker threads that Envoy uses.")
IntVar(&envoyDrainTimeSeconds, "envoy-drain-time-seconds", 30, "DP_ENVOY_DRAIN_TIME", "The time in seconds for which Envoy will drain connections.")
StringVar(&envoyDrainStrategy, "envoy-drain-strategy", "immediate", "DP_ENVOY_DRAIN_STRATEGY", "The behaviour of Envoy during the drain sequence. Determines whether all open connections should be encouraged to drain immediately or to increase the percentage gradually as the drain time elapses.")

StringVar(&xdsBindAddr, "xds-bind-addr", "127.0.0.1", "DP_XDS_BIND_ADDR", "The address on which the Envoy xDS server is available.")
IntVar(&xdsBindPort, "xds-bind-port", 0, "DP_XDS_BIND_PORT", "The port on which the Envoy xDS server is available.")
Expand Down Expand Up @@ -232,6 +236,8 @@ func main() {
ReadyBindAddress: readyBindAddr,
ReadyBindPort: readyBindPort,
EnvoyConcurrency: envoyConcurrency,
EnvoyDrainTimeSeconds: envoyDrainTimeSeconds,
EnvoyDrainStrategy: envoyDrainStrategy,
ShutdownDrainListenersEnabled: shutdownDrainListenersEnabled,
ShutdownGracePeriodSeconds: shutdownGracePeriodSeconds,
GracefulShutdownPath: gracefulShutdownPath,
Expand Down
15 changes: 14 additions & 1 deletion pkg/consuldp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,20 @@ type EnvoyConfig struct {
ReadyBindPort int
// EnvoyConcurrency is the envoy concurrency https://www.envoyproxy.io/docs/envoy/latest/operations/cli#cmdoption-concurrency
EnvoyConcurrency int
// ShutdownDrainListenersEnabled configures whether to wait for all proxy listeners to drain before terminating the proxy container.
// EnvoyDrainTime is the time in seconds for which Envoy will drain connections
// during a hot restart, when listeners are modified or removed via LDS, or when
// initiated manually via a request to the Envoy admin API.
// The Envoy HTTP connection manager filter will add “Connection: close” to HTTP1
// requests, send HTTP2 GOAWAY, and terminate connections on request completion
// (after the delayed close period).
// https://www.envoyproxy.io/docs/envoy/latest/operations/cli#cmdoption-drain-time-s
EnvoyDrainTimeSeconds int
// EnvoyDrainStrategy is the behaviour of Envoy during the drain sequence.
// Determines whether all open connections should be encouraged to drain
// immediately or to increase the percentage gradually as the drain time elapses.
// https://www.envoyproxy.io/docs/envoy/latest/operations/cli#cmdoption-drain-strategy
EnvoyDrainStrategy string
// ShutdownDrainListenersEnabled configures whether to start draining proxy listeners before terminating the proxy container. Drain time defaults to the value of ShutdownGracePeriodSeconds, but may be set explicitly with EnvoyDrainTimeSeconds.
ShutdownDrainListenersEnabled bool
// ShutdownGracePeriodSeconds is the amount of time to wait after receiving a SIGTERM before terminating the proxy container.
ShutdownGracePeriodSeconds int
Expand Down
59 changes: 47 additions & 12 deletions pkg/consuldp/consul_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
Expand All @@ -28,8 +29,9 @@ type xdsServer struct {
exitedCh chan struct{}
}

type httpGetter interface {
type httpClient interface {
Get(string) (*http.Response, error)
Post(string, string, io.Reader) (*http.Response, error)
}

// ConsulDataplane represents the consul-dataplane process
Expand All @@ -41,6 +43,7 @@ type ConsulDataplane struct {
xdsServer *xdsServer
aclToken string
metricsConfig *metricsConfig
lifecycleConfig *lifecycleConfig
}

// NewConsulDP creates a new instance of ConsulDataplane
Expand Down Expand Up @@ -206,6 +209,12 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error {
return err
}

cdp.lifecycleConfig = NewLifecycleConfig(cdp.cfg, proxy)
err = cdp.lifecycleConfig.startLifecycleManager(ctx)
if err != nil {
return err
}

doneCh := make(chan error)
go func() {
select {
Expand All @@ -214,12 +223,25 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error {
case <-proxy.Exited():
doneCh <- errors.New("envoy proxy exited unexpectedly")
case <-cdp.xdsServerExited():
if err := proxy.Stop(); err != nil {
cdp.logger.Error("failed to stop proxy", "error", err)
// Initiate graceful shutdown of Envoy, kill if error
if err := proxy.Quit(); err != nil {
cdp.logger.Error("failed to stop proxy, will attempt to kill", "error", err)
if err := proxy.Kill(); err != nil {
cdp.logger.Error("failed to kill proxy", "error", err)
}
}
doneCh <- errors.New("xDS server exited unexpectedly")
case <-cdp.metricsConfig.metricsServerExited():
doneCh <- errors.New("metrics server exited unexpectedly")
case <-cdp.lifecycleConfig.lifecycleServerExited():
// Initiate graceful shutdown of Envoy, kill if error
if err := proxy.Quit(); err != nil {
cdp.logger.Error("failed to stop proxy", "error", err)
if err := proxy.Kill(); err != nil {
cdp.logger.Error("failed to kill proxy", "error", err)
}
}
doneCh <- errors.New("proxy lifecycle management server exited unexpectedly")
}
}()
return <-doneCh
Expand Down Expand Up @@ -247,20 +269,33 @@ func (cdp *ConsulDataplane) startDNSProxy(ctx context.Context) error {
}

func (cdp *ConsulDataplane) envoyProxyConfig(cfg []byte) envoy.ProxyConfig {
setConcurrency := true
extraArgs := cdp.cfg.Envoy.ExtraArgs
// Users could set the concurrency as an extra args. Take that as priority for best ux
// experience.
for _, v := range extraArgs {
if v == "--concurrency" {
setConcurrency = false
}

envoyArgs := map[string]interface{}{
"--concurrency": cdp.cfg.Envoy.EnvoyConcurrency,
"--drain-time-s": cdp.cfg.Envoy.EnvoyDrainTimeSeconds,
"--drain-strategy": cdp.cfg.Envoy.EnvoyDrainStrategy,
}
if setConcurrency {
extraArgs = append(extraArgs, fmt.Sprintf("--concurrency %v", cdp.cfg.Envoy.EnvoyConcurrency))

// Users could set the Envoy concurrency, drain time, or drain strategy as
// extra args. Prioritize values set in that way over passthrough or defaults
// from consul-dataplane.
for envoyArg, cdpEnvoyValue := range envoyArgs {
for _, v := range extraArgs {
// If found in extraArgs, skip setting value from consul-dataplane Envoy
// config
if v == envoyArg {
break
}
}

// If not found, append value from consul-dataplane Envoy config to extraArgs
extraArgs = append(extraArgs, fmt.Sprintf("%s %v", envoyArg, cdpEnvoyValue))
}

return envoy.ProxyConfig{
AdminAddr: cdp.cfg.Envoy.AdminBindAddress,
AdminBindPort: cdp.cfg.Envoy.AdminBindPort,
Logger: cdp.logger,
LogJSON: cdp.cfg.Logging.LogJSON,
BootstrapConfig: cfg,
Expand Down
195 changes: 195 additions & 0 deletions pkg/consuldp/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package consuldp

import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul-dataplane/pkg/envoy"
)

const (
// defaultLifecycleBindPort is the port which will serve the proxy lifecycle HTTP
// endpoints on the loopback interface.
defaultLifecycleBindPort = "20300"
cdpLifecycleBindAddr = "127.0.0.1"
cdpLifecycleUrl = "http://" + cdpLifecycleBindAddr

defaultLifecycleShutdownPath = "/graceful_shutdown"
)

// lifecycleConfig handles all configuration related to managing the Envoy proxy
// lifecycle, including exposing management controls via an HTTP server.
type lifecycleConfig struct {
logger hclog.Logger

// consuldp proxy lifecycle management config
shutdownDrainListenersEnabled bool
shutdownGracePeriodSeconds int
gracefulPort int
gracefulShutdownPath string

// manager for controlling the Envoy proxy process
proxy envoy.ProxyManager

// consuldp proxy lifecycle management server
lifecycleServer *http.Server

// consuldp proxy lifecycle server control
errorExitCh chan struct{}
running bool
mu sync.Mutex
}

func NewLifecycleConfig(cfg *Config, proxy envoy.ProxyManager) *lifecycleConfig {
return &lifecycleConfig{
shutdownDrainListenersEnabled: cfg.Envoy.ShutdownDrainListenersEnabled,
shutdownGracePeriodSeconds: cfg.Envoy.ShutdownGracePeriodSeconds,
gracefulPort: cfg.Envoy.GracefulPort,
gracefulShutdownPath: cfg.Envoy.GracefulShutdownPath,

proxy: proxy,

errorExitCh: make(chan struct{}, 1),
mu: sync.Mutex{},
}
}

func (m *lifecycleConfig) startLifecycleManager(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
return nil
}

m.logger = hclog.FromContext(ctx).Named("lifecycle")
m.running = true
go func() {
<-ctx.Done()
m.stopLifecycleServer()
}()

// Start the server which will expose HTTP endpoints for proxy lifecycle
// management control
mux := http.NewServeMux()

// Determine what HTTP endpoint paths to configure for the proxy lifecycle
// management server. These can be set as flags.
cdpLifecycleShutdownPath := defaultLifecycleShutdownPath
if m.gracefulShutdownPath != "" {
cdpLifecycleShutdownPath = m.gracefulShutdownPath
}

// Set config to allow introspection of default path for testing
m.gracefulShutdownPath = cdpLifecycleShutdownPath

m.logger.Info(fmt.Sprintf("setting graceful shutdown path: %s\n", cdpLifecycleShutdownPath))
mux.HandleFunc(cdpLifecycleShutdownPath, m.gracefulShutdown)

// Determine what the proxy lifecycle management server bind port is. It can be
// set as a flag.
cdpLifecycleBindPort := defaultLifecycleBindPort
if m.gracefulPort != 0 {
cdpLifecycleBindPort = strconv.Itoa(m.gracefulPort)
}
m.lifecycleServer = &http.Server{
Addr: fmt.Sprintf("%s:%s", cdpLifecycleBindAddr, cdpLifecycleBindPort),
Handler: mux,
}

// Start the proxy lifecycle management server
go m.startLifecycleServer()

return nil
}

// startLifecycleServer starts the main proxy lifecycle management server that
// exposes HTTP endpoints for proxy lifecycle control.
func (m *lifecycleConfig) startLifecycleServer() {
m.logger.Info("starting proxy lifecycle management server", "address", m.lifecycleServer.Addr)
err := m.lifecycleServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
m.logger.Error("failed to serve proxy lifecycle management requests", "error", err)
close(m.errorExitCh)
}
}

// stopLifecycleServer stops the consul dataplane proxy lifecycle server
func (m *lifecycleConfig) stopLifecycleServer() {
m.mu.Lock()
defer m.mu.Unlock()
m.running = false

if m.lifecycleServer != nil {
m.logger.Info("stopping the lifecycle management server")
err := m.lifecycleServer.Close()
if err != nil {
m.logger.Warn("error while closing lifecycle server", "error", err)
close(m.errorExitCh)
}
}
}

// lifecycleServerExited is used to signal that the lifecycle server
// recieved a signal to initiate shutdown.
func (m *lifecycleConfig) lifecycleServerExited() <-chan struct{} {
return m.errorExitCh
}

// gracefulShutdown blocks until shutdownGracePeriodSeconds seconds have elapsed, and, if
// configured, will drain inbound connections to Envoy listeners during that time.
func (m *lifecycleConfig) gracefulShutdown(rw http.ResponseWriter, _ *http.Request) {
m.logger.Info("initiating shutdown")

// Create a context that will signal a cancel at the specified duration.
// TODO: should this use lifecycleManager ctx instead of context.Background?
timeout := time.Duration(m.shutdownGracePeriodSeconds) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

m.logger.Info(fmt.Sprintf("waiting %d seconds before terminating dataplane proxy", m.shutdownGracePeriodSeconds))

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

// If shutdownDrainListenersEnabled, initiatie graceful shutdown of Envoy.
// We want to start draining connections from inbound listeners if
// configured, but still allow outbound traffic until gracefulShutdownPeriod
// has elapsed to facilitate a graceful application shutdown.
if m.shutdownDrainListenersEnabled {
err := m.proxy.Drain()
if err != nil {
m.logger.Warn("error while draining Envoy listeners", "error", err)
close(m.errorExitCh)
}
}

// Block until context timeout has elapsed
<-ctx.Done()

// Finish graceful shutdown, quit Envoy proxy
m.logger.Info("shutdown grace period timeout reached")
err := m.proxy.Quit()
if err != nil {
m.logger.Warn("error while shutting down Envoy", "error", err)
close(m.errorExitCh)
}
}()

// Wait for context timeout to elapse
wg.Wait()

// Return HTTP 200 Success
rw.WriteHeader(http.StatusOK)
}
Loading

0 comments on commit 954f060

Please sign in to comment.