Skip to content

Commit

Permalink
Expose HTTP-based paths through Connect proxy (#6446)
Browse files Browse the repository at this point in the history
Fixes: #5396

This PR adds a proxy configuration stanza called expose. These flags register
listeners in Connect sidecar proxies to allow requests to specific HTTP paths from outside of the node. This allows services to protect themselves by only
listening on the loopback interface, while still accepting traffic from non
Connect-enabled services.

Under expose there is a boolean checks flag that would automatically expose all
registered HTTP and gRPC check paths.

This stanza also accepts a paths list to expose individual paths. The primary
use case for this functionality would be to expose paths for third parties like
Prometheus or the kubelet.

Listeners for requests to exposed paths are be configured dynamically at run
time. Any time a proxy, or check can be registered, a listener can also be
created.

In this initial implementation requests to these paths are not
authenticated/encrypted.
  • Loading branch information
freddygv committed Sep 27, 2019
1 parent 916c59f commit dcf32ed
Show file tree
Hide file tree
Showing 56 changed files with 3,278 additions and 213 deletions.
348 changes: 337 additions & 11 deletions agent/agent.go

Large diffs are not rendered by default.

82 changes: 13 additions & 69 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package agent
import (
"encoding/json"
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"
"log"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"

"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
Expand All @@ -24,7 +22,7 @@ import (
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
Expand Down Expand Up @@ -262,7 +260,7 @@ func (s *HTTPServer) AgentService(resp http.ResponseWriter, req *http.Request) (
// in QueryOptions but I didn't want to make very general changes right away.
hash := req.URL.Query().Get("hash")

return s.agentLocalBlockingQuery(resp, hash, &queryOpts,
resultHash, service, err := s.agent.LocalBlockingQuery(false, hash, queryOpts.MaxQueryTime,
func(ws memdb.WatchSet) (string, interface{}, error) {

svcState := s.agent.State.ServiceState(id)
Expand Down Expand Up @@ -299,7 +297,12 @@ func (s *HTTPServer) AgentService(resp http.ResponseWriter, req *http.Request) (
reply.ContentHash = fmt.Sprintf("%x", rawHash)

return reply.ContentHash, reply, nil
})
},
)
if resultHash != "" {
resp.Header().Set("X-Consul-ContentHash", resultHash)
}
return service, err
}

func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down Expand Up @@ -769,6 +772,9 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
"local_service_address": "LocalServiceAddress",
// SidecarService
"sidecar_service": "SidecarService",
// Expose Config
"local_path_port": "LocalPathPort",
"listener_port": "ListenerPort",

// DON'T Recurse into these opaque config maps or we might mangle user's
// keys. Note empty canonical is a special sentinel to prevent recursion.
Expand Down Expand Up @@ -1297,68 +1303,6 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.
return reply, nil
}

type agentLocalBlockingFunc func(ws memdb.WatchSet) (string, interface{}, error)

// agentLocalBlockingQuery performs a blocking query in a generic way against
// local agent state that has no RPC or raft to back it. It uses `hash` parameter
// instead of an `index`. The resp is needed to write the `X-Consul-ContentHash`
// header back on return no Status nor body content is ever written to it.
func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash string,
queryOpts *structs.QueryOptions, fn agentLocalBlockingFunc) (interface{}, error) {

// If we are not blocking we can skip tracking and allocating - nil WatchSet
// is still valid to call Add on and will just be a no op.
var ws memdb.WatchSet
var timeout *time.Timer

if hash != "" {
// TODO(banks) at least define these defaults somewhere in a const. Would be
// nice not to duplicate the ones in consul/rpc.go too...
wait := queryOpts.MaxQueryTime
if wait == 0 {
wait = 5 * time.Minute
}
if wait > 10*time.Minute {
wait = 10 * time.Minute
}
// Apply a small amount of jitter to the request.
wait += lib.RandomStagger(wait / 16)
timeout = time.NewTimer(wait)
}

for {
// Must reset this every loop in case the Watch set is already closed but
// hash remains same. In that case we'll need to re-block on ws.Watch()
// again.
ws = memdb.NewWatchSet()
curHash, curResp, err := fn(ws)
if err != nil {
return curResp, err
}
// Return immediately if there is no timeout, the hash is different or the
// Watch returns true (indicating timeout fired). Note that Watch on a nil
// WatchSet immediately returns false which would incorrectly cause this to
// loop and repeat again, however we rely on the invariant that ws == nil
// IFF timeout == nil in which case the Watch call is never invoked.
if timeout == nil || hash != curHash || ws.Watch(timeout.C) {
resp.Header().Set("X-Consul-ContentHash", curHash)
return curResp, err
}
// Watch returned false indicating a change was detected, loop and repeat
// the callback to load the new value. If agent sync is paused it means
// local state is currently being bulk-edited e.g. config reload. In this
// case it's likely that local state just got unloaded and may or may not be
// reloaded yet. Wait a short amount of time for Sync to resume to ride out
// typical config reloads.
if syncPauseCh := s.agent.syncPausedCh(); syncPauseCh != nil {
select {
case <-syncPauseCh:
case <-timeout.C:
}
}
}
}

// AgentConnectAuthorize
//
// POST /v1/agent/connect/authorize
Expand Down
92 changes: 72 additions & 20 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestAgent_Service(t *testing.T) {
Service: "web-sidecar-proxy",
Port: 8000,
Proxy: expectProxy.ToAPI(),
ContentHash: "f5826efc5ffc207a",
ContentHash: "4c7d5f8d3748be6d",
Weights: api.AgentWeights{
Passing: 1,
Warning: 1,
Expand All @@ -338,7 +338,7 @@ func TestAgent_Service(t *testing.T) {
// Copy and modify
updatedResponse := *expectedResponse
updatedResponse.Port = 9999
updatedResponse.ContentHash = "c8cb04cb77ef33d8"
updatedResponse.ContentHash = "713435ba1f5badcf"

// Simple response for non-proxy service registered in TestAgent config
expectWebResponse := &api.AgentService{
Expand Down Expand Up @@ -1981,39 +1981,51 @@ func TestAgent_RegisterCheck_ACLDeny(t *testing.T) {
require.NotNil(t, nodeToken)

t.Run("no token - node check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})

t.Run("svc token - node check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})

t.Run("node token - node check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(r, err)
})
})

t.Run("no token - svc check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})

t.Run("node token - svc check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})

t.Run("svc token - svc check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(r, err)
})
})

}
Expand Down Expand Up @@ -5581,3 +5593,43 @@ func TestAgent_HostBadACL(t *testing.T) {
assert.Equal(http.StatusOK, resp.Code)
assert.Nil(respRaw)
}

// Thie tests that a proxy with an ExposeConfig is returned as expected.
func TestAgent_Services_ExposeConfig(t *testing.T) {
t.Parallel()

a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()

testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv1 := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "proxy-id",
Service: "proxy-name",
Port: 8443,
Proxy: structs.ConnectProxyConfig{
Expose: structs.ExposeConfig{
Checks: true,
Paths: []structs.ExposePath{
{
ListenerPort: 8080,
LocalPathPort: 21500,
Protocol: "http2",
Path: "/metrics",
},
},
},
},
}
a.State.AddService(srv1, "")

req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
obj, err := a.srv.AgentServices(nil, req)
require.NoError(t, err)
val := obj.(map[string]*api.AgentService)
require.Len(t, val, 1)
actual := val["proxy-id"]
require.NotNil(t, actual)
require.Equal(t, api.ServiceKindConnectProxy, actual.Kind)
require.Equal(t, srv1.Proxy.ToAPI(), actual.Proxy)
}
Loading

0 comments on commit dcf32ed

Please sign in to comment.