Skip to content

Commit

Permalink
Check ACLs more often for xDS endpoints.
Browse files Browse the repository at this point in the history
For established xDS gRPC streams recheck ACLs for each DiscoveryRequest
or DiscoveryResponse. If more than 5 minutes has elapsed since the last
ACL check, recheck even without an incoming DiscoveryRequest or
DiscoveryResponse. ACL failures will terminate the stream.
  • Loading branch information
rboyer committed Jan 22, 2019
1 parent 2dea3e2 commit d3eb781
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 16 deletions.
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ func (a *Agent) listenAndServeGRPC() error {
Authz: a,
ResolveToken: a.resolveToken,
}
a.xdsServer.Initialize()

var err error
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
if err != nil {
Expand Down
79 changes: 66 additions & 13 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -56,6 +57,10 @@ const (
// LocalAgentClusterName is the name we give the local agent "cluster" in
// Envoy config.
LocalAgentClusterName = "local_agent"

// DefaultAuthCheckFrequency is the default value for
// Server.AuthCheckFrequency to use when the zero value is provided.
DefaultAuthCheckFrequency = 5 * time.Minute
)

// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
Expand Down Expand Up @@ -90,6 +95,18 @@ type Server struct {
CfgMgr ConfigManager
Authz ConnectAuthz
ResolveToken ACLResolverFunc
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
// This is only used during idle periods of stream interactions (i.e. when
// there has been no recent DiscoveryRequest).
AuthCheckFrequency time.Duration
}

// Initialize will finish configuring the Server for first use.
func (s *Server) Initialize() {
if s.AuthCheckFrequency == 0 {
s.AuthCheckFrequency = DefaultAuthCheckFrequency
}
}

// StreamAggregatedResources implements
Expand Down Expand Up @@ -126,7 +143,7 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {

const (
stateInit int = iota
statePendingAuth
statePendingInitialConfig
stateRunning
)

Expand Down Expand Up @@ -176,8 +193,44 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
},
}

var authTimer <-chan time.Time
extendAuthTimer := func() {
authTimer = time.After(s.AuthCheckFrequency)
}

checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
if cfgSnap == nil {
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
}

token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)

if acl.IsErrNotFound(err) {
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
} else if acl.IsErrPermissionDenied(err) {
return status.Errorf(codes.PermissionDenied, "permission denied: %v", err)
} else if err != nil {
return err
}

if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}

// Authed OK!
return nil
}

for {
select {
case <-authTimer:
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
extendAuthTimer()

case req, ok = <-reqCh:
if !ok {
// reqCh is closed when stream.Recv errors which is how we detect client
Expand Down Expand Up @@ -218,27 +271,27 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
defer watchCancel()

// Now wait for the config so we can check ACL
state = statePendingAuth
case statePendingAuth:
state = statePendingInitialConfig
case statePendingInitialConfig:
if cfgSnap == nil {
// Nothing we can do until we get the initial config
continue
}
// Got config, try to authenticate
token := tokenFromStream(stream)
rule, err := s.ResolveToken(token)
if err != nil {
return err
}
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied")
}
// Authed OK!

// Got config, try to authenticate next.
state = stateRunning

// Lets actually process the config we just got or we'll mis responding
fallthrough
case stateRunning:
// Check ACLs on every Discovery{Request,Response}.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
// For the first time through the state machine, this is when the
// timer is first started.
extendAuthTimer()

// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
// range the map which has no determined order. It's important because:
Expand Down
215 changes: 212 additions & 3 deletions agent/xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -115,7 +116,13 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
envoy := NewTestEnvoy(t, "web-sidecar-proxy", "")
defer envoy.Close()

s := Server{logger, mgr, mgr, aclResolve}
s := Server{
Logger: logger,
CfgMgr: mgr,
Authz: mgr,
ResolveToken: aclResolve,
}
s.Initialize()

go func() {
err := s.StreamAggregatedResources(envoy.stream)
Expand Down Expand Up @@ -589,7 +596,13 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
envoy := NewTestEnvoy(t, "web-sidecar-proxy", tt.token)
defer envoy.Close()

s := Server{logger, mgr, mgr, aclResolve}
s := Server{
Logger: logger,
CfgMgr: mgr,
Authz: mgr,
ResolveToken: aclResolve,
}
s.Initialize()

errCh := make(chan error, 1)
go func() {
Expand Down Expand Up @@ -632,6 +645,196 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
}
}

func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuringDiscoveryRequest(t *testing.T) {
aclRules := `service "web" { policy = "write" }`
token := "service-write-on-web"

policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil)
require.NoError(t, err)

var validToken atomic.Value
validToken.Store(token)

logger := log.New(os.Stderr, "", log.LstdFlags)
mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) {
return nil, acl.ErrNotFound
}

return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
defer envoy.Close()

s := Server{
Logger: logger,
CfgMgr: mgr,
Authz: mgr,
ResolveToken: aclResolve,
AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in
}
s.Initialize()

errCh := make(chan error, 1)
go func() {
errCh <- s.StreamAggregatedResources(envoy.stream)
}()

getError := func() (gotErr error, ok bool) {
select {
case err := <-errCh:
return err, true
default:
return nil, false
}
}

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, "web-sidecar-proxy")

// Send initial cluster discover (OK)
envoy.SendReq(t, ClusterType, 0, 0)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}

// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}

// Deliver a new snapshot
snap := proxycfg.TestConfigSnapshot(t)
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)

assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1))

// Now nuke the ACL token.
validToken.Store("")

// It also (in parallel) issues the next cluster request (which acts as an ACK
// of the version we sent)
envoy.SendReq(t, ClusterType, 1, 1)

select {
case err := <-errCh:
require.Error(t, err)
gerr, ok := status.FromError(err)
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
require.Equal(t, codes.Unauthenticated, gerr.Code())
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())

mgr.AssertWatchCancelled(t, "web-sidecar-proxy")
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBackground(t *testing.T) {
aclRules := `service "web" { policy = "write" }`
token := "service-write-on-web"

policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil)
require.NoError(t, err)

var validToken atomic.Value
validToken.Store(token)

logger := log.New(os.Stderr, "", log.LstdFlags)
mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) {
return nil, acl.ErrNotFound
}

return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
defer envoy.Close()

s := Server{
Logger: logger,
CfgMgr: mgr,
Authz: mgr,
ResolveToken: aclResolve,
AuthCheckFrequency: 100 * time.Millisecond, // Make this short.
}
s.Initialize()

errCh := make(chan error, 1)
go func() {
errCh <- s.StreamAggregatedResources(envoy.stream)
}()

getError := func() (gotErr error, ok bool) {
select {
case err := <-errCh:
return err, true
default:
return nil, false
}
}

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, "web-sidecar-proxy")

// Send initial cluster discover (OK)
envoy.SendReq(t, ClusterType, 0, 0)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}

// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}

// Deliver a new snapshot
snap := proxycfg.TestConfigSnapshot(t)
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)

assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1))

// It also (in parallel) issues the next cluster request (which acts as an ACK
// of the version we sent)
envoy.SendReq(t, ClusterType, 1, 1)

// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}

// Now nuke the ACL token while there's no activity.
validToken.Store("")

select {
case err := <-errCh:
require.Error(t, err)
gerr, ok := status.FromError(err)
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
require.Equal(t, codes.Unauthenticated, gerr.Code())
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())

mgr.AssertWatchCancelled(t, "web-sidecar-proxy")
case <-time.After(200 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

// This tests the ext_authz service method that implements connect authz.
func TestServer_Check(t *testing.T) {

Expand Down Expand Up @@ -729,7 +932,13 @@ func TestServer_Check(t *testing.T) {
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
defer envoy.Close()

s := Server{logger, mgr, mgr, aclResolve}
s := Server{
Logger: logger,
CfgMgr: mgr,
Authz: mgr,
ResolveToken: aclResolve,
}
s.Initialize()

// Create a context with the correct token
ctx := metadata.NewIncomingContext(context.Background(),
Expand Down

0 comments on commit d3eb781

Please sign in to comment.