Skip to content

Commit

Permalink
Warn if ACL is enabled but no token is provided to Envoy (#15967)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris S. Kim authored Jan 16, 2023
1 parent 87ff8c1 commit e4a268e
Show file tree
Hide file tree
Showing 11 changed files with 589 additions and 51 deletions.
9 changes: 9 additions & 0 deletions .changelog/15967.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
```release-note:improvement
connect: Warn if ACLs are enabled but a token is not provided to envoy
```

```release-note:improvement
telemetry: Added a `consul.xds.server.streamsUnauthenticated` metric to track
the number of active xDS streams handled by the server that are unauthenticated
because ACLs are not enabled or ACL tokens were missing.
````
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

"github.com/hashicorp/consul/agent/structs"
)

func TestQueryOptionsFromContextRoundTrip(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggrega

// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
defer s.activeStreams.Increment("v3")()
defer s.activeStreams.Increment(stream.Context())()

// a channel for receiving incoming requests
reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
Expand Down
26 changes: 23 additions & 3 deletions agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"github.com/stretchr/testify/require"
rpcstatus "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -1075,6 +1074,17 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
// includes the token in the ext rbac filter so lets us test more stuff.
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

// If there is no token, check that we increment the gauge
if tt.token == "" {
data := scenario.sink.Data()
require.Len(t, data, 1)

item := data[0]
val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"]
require.True(t, ok)
require.Equal(t, float32(1), val.Value)
}

if !tt.wantDenied {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Expand Down Expand Up @@ -1106,6 +1116,17 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}

// If there is no token, check that we decrement the gauge
if tt.token == "" {
data := scenario.sink.Data()
require.Len(t, data, 1)

item := data[0]
val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"]
require.True(t, ok)
require.Equal(t, float32(0), val.Value)
}
})
}
}
Expand Down Expand Up @@ -1459,13 +1480,12 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
require.Len(t, data, 1)

item := data[0]
require.Len(t, item.Counters, 1)
require.Len(t, item.Samples, 1)

val, ok := item.Samples["consul.xds.test.xds.server.streamStart"]
require.True(t, ok)
require.Equal(t, 1, val.Count)
})

}

type testLimiter struct {
Expand Down
84 changes: 45 additions & 39 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,30 @@ import (
"github.com/hashicorp/consul/agent/xds/xdscommon"
)

var StatsGauges = []prometheus.GaugeDefinition{
{
Name: []string{"xds", "server", "streams"},
Help: "Measures the number of active xDS streams handled by the server split by protocol version.",
},
}

var StatsCounters = []prometheus.CounterDefinition{
{
Name: []string{"xds", "server", "streamDrained"},
Help: "Counts the number of xDS streams that are drained when rebalancing the load between servers.",
},
}

var StatsSummaries = []prometheus.SummaryDefinition{
{
Name: []string{"xds", "server", "streamStart"},
Help: "Measures the time in milliseconds after an xDS stream is opened until xDS resources are first generated for the stream.",
},
}
var (
StatsGauges = []prometheus.GaugeDefinition{
{
Name: []string{"xds", "server", "streams"},
Help: "Measures the number of active xDS streams handled by the server split by protocol version.",
},
{
Name: []string{"xds", "server", "streamsUnauthenticated"},
Help: "Counts the number of active xDS streams handled by the server that are unauthenticated because ACLs are not enabled or ACL tokens were missing.",
},
}
StatsCounters = []prometheus.CounterDefinition{
{
Name: []string{"xds", "server", "streamDrained"},
Help: "Counts the number of xDS streams that are drained when rebalancing the load between servers.",
},
}
StatsSummaries = []prometheus.SummaryDefinition{
{
Name: []string{"xds", "server", "streamStart"},
Help: "Measures the time in milliseconds after an xDS stream is opened until xDS resources are first generated for the stream.",
},
}
)

// ADSStream is a shorter way of referring to this thing...
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
Expand Down Expand Up @@ -126,32 +130,34 @@ type Server struct {
activeStreams *activeStreamCounters
}

// activeStreamCounters simply encapsulates two counters accessed atomically to
// ensure alignment is correct. This further requires that activeStreamCounters
// be a pointer field.
// TODO(eculver): refactor to remove xDSv2 refs
// activeStreamCounters tracks various stream-related metrics.
// Requires that activeStreamCounters be a pointer field.
type activeStreamCounters struct {
xDSv3 uint64
xDSv2 uint64
xDSv3 atomic.Uint64
unauthenticated atomic.Uint64
}

func (c *activeStreamCounters) Increment(xdsVersion string) func() {
var counter *uint64
switch xdsVersion {
case "v3":
counter = &c.xDSv3
case "v2":
counter = &c.xDSv2
default:
return func() {}
func (c *activeStreamCounters) Increment(ctx context.Context) func() {
// If no ACL token is found, increase the gauge.
o, _ := external.QueryOptionsFromContext(ctx)
if o.Token == "" {
unauthn := c.unauthenticated.Add(1)
metrics.SetGauge([]string{"xds", "server", "streamsUnauthenticated"}, float32(unauthn))
}

labels := []metrics.Label{{Name: "version", Value: xdsVersion}}

count := atomic.AddUint64(counter, 1)
// Historically there had been a "v2" version.
labels := []metrics.Label{{Name: "version", Value: "v3"}}
count := c.xDSv3.Add(1)
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)

// This closure should be called in a defer to decrement the gauges after the stream is closed.
return func() {
count := atomic.AddUint64(counter, ^uint64(0))
if o.Token == "" {
unauthn := c.unauthenticated.Add(^uint64(0))
metrics.SetGauge([]string{"xds", "server", "streamsUnauthenticated"}, float32(unauthn))
}

count := c.xDSv3.Add(^uint64(0))
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)
}
}
Expand Down
2 changes: 1 addition & 1 deletion agent/xds/xds_protocol_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func requireProtocolVersionGauge(
require.Len(t, data, 1)

item := data[0]
require.Len(t, item.Gauges, 1)
require.Len(t, item.Gauges, 2)

val, ok := item.Gauges["consul.xds.test.xds.server.streams;version="+xdsVersion]
require.True(t, ok)
Expand Down
12 changes: 11 additions & 1 deletion command/connect/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/mitchellh/mapstructure"
"google.golang.org/protobuf/encoding/protojson"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/agent/xds/accesslogs"
Expand Down Expand Up @@ -612,6 +613,16 @@ func (c *cmd) generateConfig() ([]byte, error) {

var bsCfg BootstrapConfig

// Make a call to an arbitrary ACL endpoint. If we get back an ErrNotFound
// (meaning ACLs are enabled) check that the token is not empty.
if _, _, err := c.client.ACL().TokenReadSelf(
&api.QueryOptions{Token: args.Token},
); acl.IsErrNotFound(err) {
if args.Token == "" {
c.UI.Warn("No ACL token was provided to Envoy. Because the ACL system is enabled, pass a suitable ACL token for this service to Envoy to avoid potential communication failure.")
}
}

// Fetch any customization from the registration
var svcProxyConfig *api.AgentServiceConnectProxyConfig
var serviceName, ns, partition, datacenter string
Expand Down Expand Up @@ -754,7 +765,6 @@ func generateAccessLogs(c *cmd, args *BootstrapTplArgs) error {
return nil
}

// TODO: make method a function
func (c *cmd) xdsAddress() (GRPC, error) {
g := GRPC{}

Expand Down
81 changes: 76 additions & 5 deletions command/connect/envoy/envoy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/agent/xds/proxysupport"
Expand Down Expand Up @@ -113,6 +114,7 @@ func testSetAndResetEnv(t *testing.T, env []string) func() {
type generateConfigTestCase struct {
Name string
TLSServer bool
ACLEnabled bool
Flags []string
Env []string
Files map[string]string
Expand All @@ -123,6 +125,7 @@ type generateConfigTestCase struct {
AgentSelf110 bool // fake the agent API from versions v1.10 and earlier
WantArgs BootstrapTplArgs
WantErr string
WantWarn string
}

// This tests the args we use to generate the template directly because they
Expand Down Expand Up @@ -553,8 +556,9 @@ func TestGenerateConfig(t *testing.T) {
},
},
{
Name: "access-log-path",
Flags: []string{"-proxy-id", "test-proxy", "-admin-access-log-path", "/some/path/access.log"},
Name: "access-log-path",
Flags: []string{"-proxy-id", "test-proxy", "-admin-access-log-path", "/some/path/access.log"},
WantWarn: "-admin-access-log-path is deprecated",
WantArgs: BootstrapTplArgs{
ProxyCluster: "test-proxy",
ProxyID: "test-proxy",
Expand Down Expand Up @@ -1116,6 +1120,52 @@ func TestGenerateConfig(t *testing.T) {
},
},
},
{
Name: "acl-enabled-but-no-token",
Flags: []string{"-proxy-id", "test-proxy"},
ACLEnabled: true,
WantWarn: "No ACL token was provided to Envoy.",
WantArgs: BootstrapTplArgs{
ProxyCluster: "test-proxy",
ProxyID: "test-proxy",
// We don't know this til after the lookup so it will be empty in the
// initial args call we are testing here.
ProxySourceService: "",
GRPC: GRPC{
AgentAddress: "127.0.0.1",
AgentPort: "8502", // Note this is the gRPC port
},
AdminAccessLogPath: "/dev/null",
AdminBindAddress: "127.0.0.1",
AdminBindPort: "19000",
LocalAgentClusterName: xds.LocalAgentClusterName,
PrometheusBackendPort: "",
PrometheusScrapePath: "/metrics",
},
},
{
Name: "acl-enabled-and-token",
Flags: []string{"-proxy-id", "test-proxy", "-token", "foo"},
ACLEnabled: true,
WantArgs: BootstrapTplArgs{
ProxyCluster: "test-proxy",
ProxyID: "test-proxy",
// We don't know this til after the lookup so it will be empty in the
// initial args call we are testing here.
ProxySourceService: "",
GRPC: GRPC{
AgentAddress: "127.0.0.1",
AgentPort: "8502", // Note this is the gRPC port
},
AdminAccessLogPath: "/dev/null",
AdminBindAddress: "127.0.0.1",
AdminBindPort: "19000",
Token: "foo",
LocalAgentClusterName: xds.LocalAgentClusterName,
PrometheusBackendPort: "",
PrometheusScrapePath: "/metrics",
},
},
}

cases = append(cases, enterpriseGenerateConfigTestCases()...)
Expand Down Expand Up @@ -1177,12 +1227,16 @@ func TestGenerateConfig(t *testing.T) {

require.NoError(t, c.flags.Parse(args))
code := c.run(c.flags.Args())
if tc.WantErr == "" {
require.Equal(t, 0, code, ui.ErrorWriter.String())
} else {
if tc.WantErr != "" {
require.Equal(t, 1, code, ui.ErrorWriter.String())
require.Contains(t, ui.ErrorWriter.String(), tc.WantErr)
return
} else if tc.WantWarn != "" {
require.Equal(t, 0, code, ui.ErrorWriter.String())
require.Contains(t, ui.ErrorWriter.String(), tc.WantWarn)
} else {
require.Equal(t, 0, code, ui.ErrorWriter.String())
require.Empty(t, ui.ErrorWriter.String())
}

// Verify we handled the env and flags right first to get correct template
Expand Down Expand Up @@ -1316,6 +1370,8 @@ func testMockAgent(tc generateConfigTestCase) http.HandlerFunc {
testMockCatalogNodeServiceList()(w, r)
case strings.Contains(r.URL.Path, "/config/proxy-defaults/global"):
testMockConfigProxyDefaults(tc.ProxyDefaults)(w, r)
case strings.Contains(r.URL.Path, "/acl/token/self"):
testMockTokenReadSelf(tc.ACLEnabled, tc.Flags)(w, r)
default:
http.NotFound(w, r)
}
Expand Down Expand Up @@ -1467,6 +1523,21 @@ func testMockConfigProxyDefaults(entry api.ProxyConfigEntry) http.HandlerFunc {
}
}

func testMockTokenReadSelf(aclEnabled bool, flags []string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if aclEnabled {
for _, f := range flags {
if f == "-token" {
w.WriteHeader(200)
return
}
}
w.WriteHeader(403)
w.Write([]byte(acl.ErrNotFound.Error()))
return
}
}
}
func TestEnvoyCommand_canBindInternal(t *testing.T) {
t.Parallel()
type testCheck struct {
Expand Down
Loading

0 comments on commit e4a268e

Please sign in to comment.