Skip to content

Commit

Permalink
wip: lifecycle method cleanup, rename httpGetter to httpClient, add P…
Browse files Browse the repository at this point in the history
…ost method, start graceful shutdown impl
  • Loading branch information
mikemorris committed Jun 6, 2023
1 parent a1c21c9 commit 68f206d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 16 deletions.
4 changes: 3 additions & 1 deletion pkg/consuldp/consul_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
Expand All @@ -31,8 +32,9 @@ type xdsServer struct {
exitedCh chan struct{}
}

type httpGetter interface {
type httpClient interface {
Get(string) (*http.Response, error)
Post(string, string, io.Reader) (*http.Response, error)
}

// ConsulDataplane represents the consul-dataplane process
Expand Down
62 changes: 52 additions & 10 deletions pkg/consuldp/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ package consuldp
import (
"context"
"fmt"
"io"
"net/http"
// "net/url"
"strconv"
"sync"
// "time"
"time"

// "github.com/hashicorp/consul-server-connection-manager/discovery"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -39,7 +38,7 @@ type lifecycleConfig struct {
// consuldp proxy lifecycle management config
gracefulPort int
gracefulShutdownPath string
client httpGetter // client that will dial the managed Envoy proxy
client httpClient // client that will dial the managed Envoy proxy

// consuldp proxy lifecycle management server
lifecycleServer *http.Server
Expand Down Expand Up @@ -103,8 +102,7 @@ func (m *lifecycleConfig) startLifecycleServer() {
}
}

// stopLifecycleServer stops the main merged metrics server and the consul
// dataplane metrics server
// stopLifecycleServer stops the consul dataplane proxy lifecycle server
func (m *lifecycleConfig) stopLifecycleServer() {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -115,15 +113,15 @@ func (m *lifecycleConfig) stopLifecycleServer() {
m.logger.Info("stopping the merged server")
err := m.lifecycleServer.Close()
if err != nil {
m.logger.Warn("error while closing metrics server", "error", err)
m.logger.Warn("error while closing lifecycle server", "error", err)
errs = multierror.Append(err, errs)
}
}
if m.lifecycleServer != nil {
m.logger.Info("stopping consul dp promtheus server")
err := m.lifecycleServer.Close()
if err != nil {
m.logger.Warn("error while closing metrics server", "error", err)
m.logger.Warn("error while closing lifecycle server", "error", err)
errs = multierror.Append(err, errs)
}
}
Expand All @@ -143,9 +141,53 @@ func (m *lifecycleConfig) stopLifecycleServer() {
// or, if configured, until all open connections to Envoy listeners have been
// drained.
func (m *lifecycleConfig) gracefulShutdown(rw http.ResponseWriter, _ *http.Request) {
// envoyShutdownUrl := fmt.Sprintf("http://%s:%v/quitquitquit", m.envoyAdminAddr, m.envoyAdminBindPort)

m.logger.Debug("initiating graceful shutdown")

// TODO: implement
// Create a context that is both manually cancellable and will signal
// a cancel at the specified duration.
// TODO: calculate timeout from m.shutdownGracePeriod
// TODO: should this use lifecycleManager ctx instead of context.Background?
timeout := 15 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Create a channel to received a signal that work is done.
// TODO: should this be a buffered channel instead?
shutdownCh := make(chan int)

// Ask the goroutine to do some work for us.
// If shutdownDrainListeners is enabled, initiatie graceful shutdown of Envoy
// and wait until all open connections have closed or shutdownGracePeriod
// seconds have elapsed.
go func() {
// envoyDrainListenersUrl := fmt.Sprintf("http://%s:%v/drain_listeners?inboundonly", m.envoyAdminAddr, m.envoyAdminBindPort)
// envoyShutdownUrl := fmt.Sprintf("http://%s:%v/quitquitquit", m.envoyAdminAddr, m.envoyAdminBindPort)

// TODO: actually initiate Envoy shutdown and loop checking for open
// connections
// By default, the Envoy server will close listeners immediately on server
// shutdown. To drain listeners for some duration of time prior to server
// shutdown, use drain_listeners before shutting down the server.
// We want to start draining connections from inbound listeners if
// configured, but still allow outbound traffic until gracfulShutdownPeriod
// has elapsed to facilitate a graceful application shutdown.
// resp, err := m.client.Post(envoyDrainListenersUrl)

time.Sleep(5 * time.Second)

// Report the work is done.
// TODO: is there actually any point to sending this signal if we always just
// want to wait unitl the shutdownGracePeriod has elapsed?
shutdownCh <- 0
}()

for {
select {
case _ = <-shutdownCh:
m.logger.Info("shutting down, all open Envoy connections have been drained")
case <-ctx.Done():
m.logger.Info("shutdown grace period timeout reached")
// resp, err := m.client.Post(envoyShutdownUrl)
}
}
}
7 changes: 3 additions & 4 deletions pkg/consuldp/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// "bytes"
"context"
// "errors"
"fmt"
// "fmt"
// "io"
"log"
// "net"
Expand All @@ -22,9 +22,8 @@ import (
)

var (
envoyAdminPort = 19000
envoyAdminAddr = "127.0.0.1"
envoyShutdownUrl = fmt.Sprintf("http://%s:%v/quitquitquit", envoyAdminAddr, envoyAdminPort)
envoyAdminPort = 19000
envoyAdminAddr = "127.0.0.1"
)

func TestLifecycleServerClosed(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/consuldp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type metricsConfig struct {

// merged metrics config
promScrapeServer *http.Server // the server that will serve all the merged metrics
client httpGetter // the client that will scrape the urls
client httpClient // the client that will scrape the urls
urls []string // the urls that will be scraped

// consuldp metrics server
Expand Down
6 changes: 6 additions & 0 deletions pkg/consuldp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func (c *mockClient) Get(url string) (*http.Response, error) {
}, nil
}

func (c *mockClient) Post(url string, contentType string, body io.Reader) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}

func makeFakeMetric(url string) string {
return fmt.Sprintf(`fake_metric{url="%s"} 1\n`, url)
}
Expand Down

0 comments on commit 68f206d

Please sign in to comment.