Skip to content

Commit

Permalink
lifecycle: wire up lifecycle mgmt server into consul-dataplane main c…
Browse files Browse the repository at this point in the history
…onfig
  • Loading branch information
mikemorris committed Jun 6, 2023
1 parent ae041fc commit 52e5fd5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 39 deletions.
9 changes: 9 additions & 0 deletions pkg/consuldp/consul_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ConsulDataplane struct {
xdsServer *xdsServer
aclToken string
metricsConfig *metricsConfig
lifecycleConfig *lifecycleConfig
}

// NewConsulDP creates a new instance of ConsulDataplane
Expand Down Expand Up @@ -211,6 +212,12 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error {
return err
}

cdp.lifecycleConfig = NewLifecycleConfig(cdp.cfg)
err = cdp.lifecycleConfig.startLifecycleManager(ctx, bootstrapCfg)
if err != nil {
return err
}

doneCh := make(chan error)
go func() {
select {
Expand All @@ -225,6 +232,8 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error {
doneCh <- errors.New("xDS server exited unexpectedly")
case <-cdp.metricsConfig.metricsServerExited():
doneCh <- errors.New("metrics server exited unexpectedly")
case <-cdp.lifecycleConfig.lifecycleServerExited():
doneCh <- errors.New("proxy lifecycle maangement server exited unexpectedly")
}
}()
return <-doneCh
Expand Down
21 changes: 10 additions & 11 deletions pkg/consuldp/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type lifecycleConfig struct {
lifecycleServer *http.Server

// consuldp proxy lifecycle server control
doneCh chan struct{}
running bool
mu sync.Mutex
errorExitCh chan struct{}
running bool
mu sync.Mutex
}

func NewLifecycleConfig(cfg *Config) *lifecycleConfig {
Expand All @@ -68,8 +68,8 @@ func NewLifecycleConfig(cfg *Config) *lifecycleConfig {
Timeout: 10 * time.Second,
},

doneCh: make(chan struct{}, 1),
mu: sync.Mutex{},
errorExitCh: make(chan struct{}, 1),
mu: sync.Mutex{},
}
}

Expand Down Expand Up @@ -128,32 +128,31 @@ func (m *lifecycleConfig) startLifecycleServer() {
err := m.lifecycleServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
m.logger.Error("failed to serve proxy lifecycle managerments requests", "error", err)
close(m.doneCh)
close(m.errorExitCh)
}
}

// stopLifecycleServer stops the consul dataplane proxy lifecycle server
func (m *lifecycleConfig) stopLifecycleServer() {
m.mu.Lock()
defer m.mu.Unlock()

defer close(m.doneCh)
m.running = false

if m.lifecycleServer != nil {
m.logger.Info("stopping the lifecycle management server")
err := m.lifecycleServer.Close()
if err != nil {
m.logger.Warn("error while closing lifecycle server", "error", err)
close(m.errorExitCh)
}
}
}

// lifecycleServerExited is used to signal that the lifecycle server
// recieved a signal to initiate shutdown.
// func (m *lifecycleConfig) lifecycleServerExited() <-chan struct{} {
// return m.doneCh
// }
func (m *lifecycleConfig) lifecycleServerExited() <-chan struct{} {
return m.errorExitCh
}

// gracefulShutdown blocks until shutdownGracePeriod seconds have elapsed, and, if
// configured, will drain inbound connections to Envoy listeners during that time.
Expand Down
44 changes: 16 additions & 28 deletions pkg/consuldp/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
package consuldp

import (
// "bytes"
"context"
// "errors"
"fmt"
"io"
"log"
"net"
"net/http"
// "strings"
"sync"
"testing"
"time"

Expand All @@ -27,16 +23,13 @@ var (
)

func TestLifecycleServerClosed(t *testing.T) {
m := &lifecycleConfig{
mu: sync.Mutex{},
envoyAdminAddr: envoyAdminAddr,
envoyAdminBindPort: envoyAdminPort,
doneCh: make(chan struct{}, 1),

client: &http.Client{
Timeout: 10 * time.Second,
cfg := Config{
Envoy: &EnvoyConfig{
AdminBindAddress: envoyAdminAddr,
AdminBindPort: envoyAdminPort,
},
}
m := NewLifecycleConfig(&cfg)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -98,25 +91,21 @@ func TestLifecycleServerEnabled(t *testing.T) {
log.Printf("config = %v", c)

t.Run(name, func(t *testing.T) {
m := &lifecycleConfig{
envoyAdminAddr: envoyAdminAddr,
envoyAdminBindPort: envoyAdminPort,
shutdownDrainListeners: c.shutdownDrainListeners,
shutdownGracePeriod: c.shutdownGracePeriod,
gracefulShutdownPath: c.gracefulShutdownPath,
gracefulPort: c.gracefulPort,

client: &http.Client{
Timeout: 10 * time.Second,
cfg := Config{
Envoy: &EnvoyConfig{
AdminBindAddress: envoyAdminAddr,
AdminBindPort: envoyAdminPort,
ShutdownDrainListeners: c.shutdownDrainListeners,
ShutdownGracePeriod: c.shutdownGracePeriod,
GracefulShutdownPath: c.gracefulShutdownPath,
GracefulPort: c.gracefulPort,
},

doneCh: make(chan struct{}, 1),
mu: sync.Mutex{},
}
m := NewLifecycleConfig(&cfg)

require.NotNil(t, m)
require.NotNil(t, m.client)
require.NotNil(t, m.doneCh)
require.NotNil(t, m.errorExitCh)
require.IsType(t, &http.Client{}, m.client)
require.Greater(t, m.client.(*http.Client).Timeout, time.Duration(0))

Expand All @@ -132,7 +121,6 @@ func TestLifecycleServerEnabled(t *testing.T) {
// and figure out what port was used so we can make requests to it.
// Conveniently, this seems to wait until the server is ready for requests.
portCh := make(chan int, 1)
// m.lifecycleServer.Addr = "127.0.0.1:0"
m.lifecycleServer.BaseContext = func(l net.Listener) context.Context {
portCh <- l.Addr().(*net.TCPAddr).Port
return context.Background()
Expand Down Expand Up @@ -163,7 +151,7 @@ func TestLifecycleServerEnabled(t *testing.T) {

resp, err := http.Get(url)

// TODO: use mock client to check envoyAdminAddr and envoyAdminPort?
// TODO: use mock client to check expected requests to envoyAdminAddr and envoyAdminPort?
// m.client.Expect(address, port)

require.NoError(t, err)
Expand Down

0 comments on commit 52e5fd5

Please sign in to comment.