Skip to content

Commit

Permalink
lifecycle: get blocking gracefulShutdown working
Browse files Browse the repository at this point in the history
  • Loading branch information
mikemorris committed Jun 6, 2023
1 parent 5b54f12 commit 2852040
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 74 deletions.
91 changes: 41 additions & 50 deletions pkg/consuldp/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ import (
"context"
"fmt"
"net/http"
// "net/url"
"strconv"
"sync"
"time"

// "github.com/hashicorp/consul-server-connection-manager/discovery"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"

"github.com/hashicorp/consul-dataplane/internal/bootstrap"
)
Expand All @@ -25,6 +22,8 @@ const (
defaultLifecycleBindPort = "20300"
cdpLifecycleBindAddr = "127.0.0.1"
cdpLifecycleUrl = "http://" + cdpLifecycleBindAddr

defaultLifecycleShutdownPath = "/shutdown"
)

// lifecycleConfig handles all configuration related to managing the Envoy proxy
Expand All @@ -49,9 +48,9 @@ type lifecycleConfig struct {
lifecycleServer *http.Server

// consuldp proxy lifecycle server control
errorExitCh chan struct{}
running bool
mu sync.Mutex
doneCh chan struct{}
running bool
mu sync.Mutex
}

func NewLifecycleConfig(cfg *Config) *lifecycleConfig {
Expand All @@ -69,8 +68,8 @@ func NewLifecycleConfig(cfg *Config) *lifecycleConfig {
Timeout: 10 * time.Second,
},

errorExitCh: make(chan struct{}),
mu: sync.Mutex{},
doneCh: make(chan struct{}, 1),
mu: sync.Mutex{},
}
}

Expand All @@ -91,12 +90,15 @@ func (m *lifecycleConfig) startLifecycleManager(ctx context.Context, bcfg *boots
// Start the server which will expose HTTP endpoints for proxy lifecycle
// management control
mux := http.NewServeMux()
fmt.Printf("graceful shutdown path: %s\n", m.gracefulShutdownPath)
// TODO: set a default value in lifecycle manager init instead of empty string
// to avoid panic here
m.gracefulShutdownPath = "/shutdown"

mux.HandleFunc(m.gracefulShutdownPath, m.gracefulShutdown)
// Determine what HTTP endpoint paths to configure for the proxy lifecycle
// management server bind port is. These can be set as flags.
cdpLifecycleShutdownPath := defaultLifecycleShutdownPath
if m.gracefulShutdownPath != "" {
cdpLifecycleShutdownPath = m.gracefulShutdownPath
}
fmt.Printf("setting graceful shutdown path: %s\n", cdpLifecycleShutdownPath)
mux.HandleFunc(cdpLifecycleShutdownPath, m.gracefulShutdown)

// Determine what the proxy lifecycle management server bind port is. It can be
// set as a flag.
Expand All @@ -122,59 +124,54 @@ func (m *lifecycleConfig) startLifecycleServer() {
err := m.lifecycleServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
m.logger.Error("failed to serve proxy lifecycle managerments requests", "error", err)
close(m.errorExitCh)
close(m.doneCh)
}
}

// stopLifecycleServer stops the consul dataplane proxy lifecycle server
func (m *lifecycleConfig) stopLifecycleServer() {
m.mu.Lock()
defer m.mu.Unlock()

defer close(m.doneCh)
m.running = false
var errs error

if m.lifecycleServer != nil {
m.logger.Info("stopping the lifecycle management server")
err := m.lifecycleServer.Close()
if err != nil {
m.logger.Warn("error while closing lifecycle server", "error", err)
errs = multierror.Append(err, errs)
}
}

// Check if there were errors and then close the error channel
if errs != nil {
close(m.errorExitCh)
}
}

// lifecycleServerExited is used to signal that the lifecycle server
// recieved a signal to initiate shutdown.
// func (m *lifecycleConfig) lifecycleServerExited() <-chan struct{} {
// return m.errorExitCh
// }
func (m *lifecycleConfig) lifecycleServerExited() <-chan struct{} {
return m.doneCh
}

// gracefulShutdown blocks until at most shutdownGracePeriod seconds have elapsed,
// or, if configured, until all open connections to Envoy listeners have been
// drained.
// gracefulShutdown blocks until shutdownGracePeriod seconds have elapsed, and, if
// configured, will drain inbound connections to Envoy listeners during that time.
func (m *lifecycleConfig) gracefulShutdown(rw http.ResponseWriter, _ *http.Request) {
envoyDrainListenersUrl := fmt.Sprintf("http://%s:%v/drain_listeners?inboundonly&graceful", m.envoyAdminAddr, m.envoyAdminBindPort)
envoyShutdownUrl := fmt.Sprintf("http://%s:%v/quitquitquit", m.envoyAdminAddr, m.envoyAdminBindPort)

m.logger.Info("initiating shutdown")

// Wait until shutdownGracePeriod seconds have elapsed before actually
// terminating the Envoy proxy process.
m.logger.Info(fmt.Sprintf("waiting %d seconds before terminating dataplane proxy", m.shutdownGracePeriod))
// Create a context that will signal a cancel at the specified duration.
// TODO: should this use lifecycleManager ctx instead of context.Background?
timeout := time.Duration(m.shutdownGracePeriod) * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)

// Create a context that is both manually cancellable and will signal
// a cancel at the specified duration.
// TODO: should this use lifecycleManager ctx instead of context.Background?
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
m.logger.Info(fmt.Sprintf("waiting %d seconds before terminating dataplane proxy", m.shutdownGracePeriod))

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

// If shutdownDrainListeners enabled, initiatie graceful shutdown of Envoy.
// We want to start draining connections from inbound listeners if
// configured, but still allow outbound traffic until gracefulShutdownPeriod
Expand All @@ -183,28 +180,22 @@ func (m *lifecycleConfig) gracefulShutdown(rw http.ResponseWriter, _ *http.Reque
_, err := m.client.Post(envoyDrainListenersUrl, "text/plain", nil)
if err != nil {
m.logger.Error("envoy: failed to initiate listener drain", "error", err)
close(m.errorExitCh)
}
}

for {
select {
case <-ctx.Done():
m.logger.Info("shutdown grace period timeout reached")
_, err := m.client.Post(envoyShutdownUrl, "text/plain", nil)
if err != nil {
m.logger.Error("envoy: failed to initiate listener drain", "error", err)
close(m.errorExitCh)
}
select {
case <-ctx.Done():
m.logger.Info("shutdown grace period timeout reached")
_, err := m.client.Post(envoyShutdownUrl, "text/plain", nil)
if err != nil {
m.logger.Error("envoy: failed to quit", "error", err)
}
// TODO: is there a need to handle context cancelation here if not
// able to shutdown cleanly?
}

// TODO: is there actually any point to sending a signal if we always just
// want to wait unitl the shutdownGracePeriod has elapsed?
}()

// Wait for context timeout to elapse
wg.Wait()

// Return HTTP 200 Success
rw.WriteHeader(http.StatusOK)
}
92 changes: 68 additions & 24 deletions pkg/consuldp/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestLifecycleServerClosed(t *testing.T) {
mu: sync.Mutex{},
envoyAdminAddr: envoyAdminAddr,
envoyAdminBindPort: envoyAdminPort,
errorExitCh: make(chan struct{}),
doneCh: make(chan struct{}, 1),

client: &http.Client{
Timeout: 10 * time.Second,
Expand All @@ -51,44 +51,70 @@ func TestLifecycleServerClosed(t *testing.T) {

func TestLifecycleServerEnabled(t *testing.T) {
cases := map[string]struct {
shutdownDrainListeners string
shutdownDrainListeners bool
shutdownGracePeriod int
gracefulShutdownPath string
gracefulPort int
}{
"connection draining disabled without grace period": {},
"connection draining enabled without grace period": {
// TODO: long-timeout connection held open
"connection draining disabled without grace period": {
// All inbound and outbound connections are terminated immediately.
},
"connection draining disabled with grace period": {},
"connection draining enabled with grace period": {
// TODO: decide if grace period should be a minimum time to wait before
// shutdown even if all connections have drained, and/or a maximum time
// even if some connections are still open, test both
"connection draining enabled without grace period": {
// This should immediately send "Connection: close" to inbound HTTP1
// connections, GOAWAY to inbound HTTP2, and terminate connections on
// request completion. Outbound connections should start being rejected
// immediately.
shutdownDrainListeners: true,
},
"connection draining disabled with grace period": {
// This should immediately terminate any open inbound connections.
// Outbound connections should be allowed until the grace period has
// elapsed.
shutdownGracePeriod: 5,
},
"connection draining enabled with grace period": {
// This should immediately send "Connection: close" to inbound HTTP1
// connections, GOAWAY to inbound HTTP2, and terminate connections on
// request completion.
// Outbound connections should be allowed until the grace period has
// elapsed, then any remaining open connections should be closed and new
// outbound connections should start being rejected until pod termination.
shutdownDrainListeners: true,
shutdownGracePeriod: 5,
},
"custom graceful shutdown path and port": {
shutdownDrainListeners: true,
shutdownGracePeriod: 5,
gracefulShutdownPath: "/quit-nicely",
// TODO: should this be random or use freeport? logic disallows passing
// zero value explicitly
gracefulPort: 23108,
},
"custom graceful path": {},
"custom graceful port": {},
}
for name, c := range cases {
c := c
log.Printf("config = %v", c)

t.Run(name, func(t *testing.T) {

m := &lifecycleConfig{
mu: sync.Mutex{},
envoyAdminAddr: envoyAdminAddr,
envoyAdminBindPort: envoyAdminPort,
errorExitCh: make(chan struct{}),
envoyAdminAddr: envoyAdminAddr,
envoyAdminBindPort: envoyAdminPort,
shutdownDrainListeners: c.shutdownDrainListeners,
shutdownGracePeriod: c.shutdownGracePeriod,
gracefulShutdownPath: c.gracefulShutdownPath,
gracefulPort: c.gracefulPort,

client: &http.Client{
Timeout: 10 * time.Second,
},

doneCh: make(chan struct{}, 1),
mu: sync.Mutex{},
}

require.NotNil(t, m)
require.NotNil(t, m.client)
require.NotNil(t, m.errorExitCh)
require.NotNil(t, m.doneCh)
require.IsType(t, &http.Client{}, m.client)
require.Greater(t, m.client.(*http.Client).Timeout, time.Duration(0))

Expand All @@ -99,13 +125,12 @@ func TestLifecycleServerEnabled(t *testing.T) {
defer cancel()
err := m.startLifecycleManager(ctx, &bootstrap.BootstrapConfig{})
require.NoError(t, err)
// require.Equal(t, c.bindAddr, m.promScrapeServer.Addr)

// Have consul-dataplane's lifecycle server start on an open port
// and figure out what port was used so we can make requests to it.
// Conveniently, this seems to wait until the server is ready for requests.
portCh := make(chan int, 1)
m.lifecycleServer.Addr = "127.0.0.1:0"
// m.lifecycleServer.Addr = "127.0.0.1:0"
m.lifecycleServer.BaseContext = func(l net.Listener) context.Context {
portCh <- l.Addr().(*net.TCPAddr).Port
return context.Background()
Expand All @@ -117,11 +142,30 @@ func TestLifecycleServerEnabled(t *testing.T) {
case <-time.After(5 * time.Second):
}

require.NotEqual(t, port, 0, "test failed to figure out lifecycle server port")
log.Printf("port = %v", port)
// Check lifecycle server graceful port configuration
if c.gracefulPort != 0 {
require.Equal(t, port, c.gracefulPort, "failed to set lifecycle server port")
} else {
require.Equal(t, port, 20300, "failed to figure out default lifecycle server port")
}
log.Println(fmt.Sprintf("port = %v", port))

// Check lifecycle server graceful shutdown path configuration
if c.gracefulShutdownPath != "" {
require.Equal(t, m.gracefulShutdownPath, c.gracefulShutdownPath, "failed to set lifecycle server graceful shutdown HTTP endpoint path")
}

// TODO: open long-timeout connection and watch for response

// Check lifecycle server graceful shutdown path configuration
url := fmt.Sprintf("http://127.0.0.1:%d%s", port, m.gracefulShutdownPath)
log.Println(fmt.Sprintf("sending request to %s", url))

resp, err := http.Get(url)

// TODO: use mock client to check envoyAdminAddr and envoyAdminPort
// m.client.Expect(address, port)

url := fmt.Sprintf("http://127.0.0.1:%d/graceful_shutdown", port)
resp, err := http.Get(url) // TODO: longer timeout if needed
require.NoError(t, err)
require.NotNil(t, resp)

Expand Down

0 comments on commit 2852040

Please sign in to comment.