Skip to content

Commit

Permalink
consul/connect: add support for bridge networks with connect native t…
Browse files Browse the repository at this point in the history
…asks

Before, Connect Native Tasks needed one of these to work:

- To be run in host networking mode
- To have the Consul agent configured to listen to a unix socket
- To have the Consul agent configured to listen to a public interface

None of these are a great experience, though running in host networking is
still the best solution for non-Linux hosts. This PR establishes a connection
proxy between the Consul HTTP listener and a unix socket inside the alloc fs,
bypassing the network namespace for any Connect Native task. Similar to and
re-uses a bunch of code from the gRPC listener version for envoy sidecar proxies.

Proxy is established only if the alloc is configured for bridge networking and
there is at least one Connect Native task in the Task Group.

Fixes #8290
  • Loading branch information
shoenig committed Jul 15, 2020
1 parent f6f697a commit 7adb1c5
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 89 deletions.
4 changes: 4 additions & 0 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ var (
// AllocGRPCSocket is the path relative to the task dir root for the
// unix socket connected to Consul's gRPC endpoint.
AllocGRPCSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_grpc.sock")

// AllocHTTPSocket is the path relative to the task dir root for the unix
// socket connected to Consul's HTTP endpoint.
AllocHTTPSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_http.sock")
)

// AllocDir allows creating, destroying, and accessing an allocation's
Expand Down
3 changes: 2 additions & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
logger: hookLogger,
}),
newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newCSIHook(ar, hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,57 @@ import (
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/pkg/errors"
)

// consulSockHook creates Unix sockets to allow communication from inside a
// netns to Consul.
//
// Noop for allocations without a group Connect stanza.
type consulSockHook struct {
alloc *structs.Allocation
const (
consulGRPCSockHookName = "consul_grpc_socket"
)

var (
errSocketProxyTimeout = errors.New("timed out waiting for socket proxy to exit")
)

proxy *sockProxy
// consulGRPCSocketHook creates Unix sockets to allow communication from inside a
// netns to Consul gRPC endpoint.
//
// Noop for allocations without a group Connect stanza using bridge networking.
type consulGRPCSocketHook struct {
logger hclog.Logger
alloc *structs.Allocation
proxy *grpcSocketProxy

// mu synchronizes group & cancel as they may be mutated and accessed
// mu synchronizes proxy as they may be mutated and accessed
// concurrently via Prerun, Update, Postrun.
mu sync.Mutex

logger hclog.Logger
}

func newConsulSockHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulSockHook {
h := &consulSockHook{
alloc: alloc,
proxy: newSockProxy(logger, allocDir, config),
func newConsulGRPCSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulGRPCSocketHook {
return &consulGRPCSocketHook{
alloc: alloc,
proxy: newGRPCSocketProxy(logger, allocDir, config),
logger: logger.Named(consulGRPCSockHookName),
}
h.logger = logger.Named(h.Name())
return h
}

func (*consulSockHook) Name() string {
return "consul_socket"
func (*consulGRPCSocketHook) Name() string {
return consulGRPCSockHookName
}

// shouldRun returns true if the Unix socket should be created and proxied.
// Requires the mutex to be held.
func (h *consulSockHook) shouldRun() bool {
func (h *consulGRPCSocketHook) shouldRun() bool {
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
for _, s := range tg.Services {
if s.Connect.HasSidecar() {
if s.Connect.HasSidecar() { // todo(shoenig) check we are in bridge mode
return true
}
}

return false
}

func (h *consulSockHook) Prerun() error {
func (h *consulGRPCSocketHook) Prerun() error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -73,7 +79,7 @@ func (h *consulSockHook) Prerun() error {

// Update creates a gRPC socket file and proxy if there are any Connect
// services.
func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error {
func (h *consulGRPCSocketHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -86,7 +92,7 @@ func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.proxy.run(h.alloc)
}

func (h *consulSockHook) Postrun() error {
func (h *consulGRPCSocketHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -98,21 +104,20 @@ func (h *consulSockHook) Postrun() error {
return nil
}

type sockProxy struct {
type grpcSocketProxy struct {
logger hclog.Logger
allocDir *allocdir.AllocDir
config *config.ConsulConfig

ctx context.Context
cancel func()
doneCh chan struct{}
runOnce bool

logger hclog.Logger
}

func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *sockProxy {
func newGRPCSocketProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *grpcSocketProxy {
ctx, cancel := context.WithCancel(context.Background())
return &sockProxy{
return &grpcSocketProxy{
allocDir: allocDir,
config: config,
ctx: ctx,
Expand All @@ -126,50 +131,55 @@ func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *conf
// hasn't been told to stop.
//
// NOT safe for concurrent use.
func (s *sockProxy) run(alloc *structs.Allocation) error {
func (p *grpcSocketProxy) run(alloc *structs.Allocation) error {
// Only run once.
if s.runOnce {
if p.runOnce {
return nil
}

// Only run once. Never restart.
select {
case <-s.doneCh:
s.logger.Trace("socket proxy already shutdown; exiting")
case <-p.doneCh:
p.logger.Trace("socket proxy already shutdown; exiting")
return nil
case <-s.ctx.Done():
s.logger.Trace("socket proxy already done; exiting")
case <-p.ctx.Done():
p.logger.Trace("socket proxy already done; exiting")
return nil
default:
}

destAddr := s.config.GRPCAddr
// make sure either grpc or http consul address has been configured
if p.config.GRPCAddr == "" && p.config.Addr == "" {
return errors.New("consul address must be set on nomad client")
}

destAddr := p.config.GRPCAddr
if destAddr == "" {
// No GRPCAddr defined. Use Addr but replace port with the gRPC
// default of 8502.
host, _, err := net.SplitHostPort(s.config.Addr)
host, _, err := net.SplitHostPort(p.config.Addr)
if err != nil {
return fmt.Errorf("error parsing Consul address %q: %v",
s.config.Addr, err)
p.config.Addr, err)
}

destAddr = net.JoinHostPort(host, "8502")
}

hostGRPCSockPath := filepath.Join(s.allocDir.AllocDir, allocdir.AllocGRPCSocket)
hostGRPCSocketPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocGRPCSocket)

// if the socket already exists we'll try to remove it, but if not then any
// other errors will bubble up to the caller here or when we try to listen
_, err := os.Stat(hostGRPCSockPath)
_, err := os.Stat(hostGRPCSocketPath)
if err == nil {
err := os.Remove(hostGRPCSockPath)
err := os.Remove(hostGRPCSocketPath)
if err != nil {
return fmt.Errorf(
"unable to remove existing unix socket for Consul gRPC endpoint: %v", err)
}
}

listener, err := net.Listen("unix", hostGRPCSockPath)
listener, err := net.Listen("unix", hostGRPCSocketPath)
if err != nil {
return fmt.Errorf("unable to create unix socket for Consul gRPC endpoint: %v", err)
}
Expand All @@ -179,55 +189,55 @@ func (s *sockProxy) run(alloc *structs.Allocation) error {
// socket permissions when creating the file, so we must manually call
// chmod afterwards.
// https://github.com/golang/go/issues/11822
if err := os.Chmod(hostGRPCSockPath, os.ModePerm); err != nil {
if err := os.Chmod(hostGRPCSocketPath, os.ModePerm); err != nil {
return fmt.Errorf("unable to set permissions on unix socket for Consul gRPC endpoint: %v", err)
}

go func() {
proxy(s.ctx, s.logger, destAddr, listener)
s.cancel()
close(s.doneCh)
proxy(p.ctx, p.logger, destAddr, listener)
p.cancel()
close(p.doneCh)
}()

s.runOnce = true
p.runOnce = true
return nil
}

// stop the proxy and blocks until the proxy has stopped. Returns an error if
// the proxy does not exit in a timely fashion.
func (s *sockProxy) stop() error {
s.cancel()
func (p *grpcSocketProxy) stop() error {
p.cancel()

// If proxy was never run, don't wait for anything to shutdown.
if !s.runOnce {
if !p.runOnce {
return nil
}

select {
case <-s.doneCh:
case <-p.doneCh:
return nil
case <-time.After(3 * time.Second):
return fmt.Errorf("timed out waiting for proxy to exit")
return errSocketProxyTimeout
}
}

// Proxy between a listener and dest
func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener) {
// Proxy between a listener and destination.
func proxy(ctx context.Context, logger hclog.Logger, destAddr string, l net.Listener) {
// Wait for all connections to be done before exiting to prevent
// goroutine leaks.
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(ctx)
defer func() {
// Must cancel context and close listener before waiting
cancel()
l.Close()
_ = l.Close()
wg.Wait()
}()

// Close Accept() when context is cancelled
go func() {
<-ctx.Done()
l.Close()
_ = l.Close()
}()

for ctx.Err() == nil {
Expand All @@ -237,14 +247,14 @@ func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener
// Accept errors during shutdown are to be expected
return
}
logger.Error("error in grpc proxy; shutting down proxy", "error", err, "dest", dest)
logger.Error("error in socket proxy; shutting down proxy", "error", err, "dest", destAddr)
return
}

wg.Add(1)
go func() {
defer wg.Done()
proxyConn(ctx, logger, dest, conn)
proxyConn(ctx, logger, destAddr, conn)
}()
}
}
Expand Down Expand Up @@ -286,7 +296,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n
wg := sync.WaitGroup{}
defer wg.Wait()

// socket -> gRPC
// socket -> consul
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -305,7 +315,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n
)
}()

// gRPC -> socket
// consul -> socket
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -326,6 +336,6 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n

// When cancelled close connections to break out of copies goroutines.
<-ctx.Done()
conn.Close()
dest.Close()
_ = conn.Close()
_ = dest.Close()
}
Loading

0 comments on commit 7adb1c5

Please sign in to comment.