Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-6294 - v1 Agentless proxycfg datasource errors after v2 changes #19365

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ MOCKERY_VERSION='v2.20.0'
BUF_VERSION='v1.26.0'

PROTOC_GEN_GO_GRPC_VERSION="v1.2.0"
MOG_VERSION='v0.4.0'
MOG_VERSION='v0.4.1'
PROTOC_GO_INJECT_TAG_VERSION='v1.3.0'
PROTOC_GEN_GO_BINARY_VERSION="v0.1.0"
DEEP_COPY_VERSION='bc3f5aa5735d8a54961580a3a24422c308c831c2'
Expand Down
118 changes: 54 additions & 64 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,43 +656,12 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}

// Create proxy config manager now because it is a dependency of creating the proxyWatcher
// which will be passed to consul.NewServer so that it is then passed to the
// controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode.
intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

// proxyWatcher will be used in the creation of the XDS server and also
// in the registration of the xds controller.
proxyWatcher := a.getProxyWatcher()
// proxyTracker will be used in the creation of the XDS server and also
// in the registration of the v2 xds controller
var proxyTracker *proxytracker.ProxyTracker

// Setup either the client or the server.
var consulServer *consul.Server
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)

Expand Down Expand Up @@ -729,16 +698,18 @@ func (a *Agent) Start(ctx context.Context) error {
},
)

var pt *proxytracker.ProxyTracker
if a.baseDeps.UseV2Resources() {
pt = proxyWatcher.(*proxytracker.ProxyTracker)
proxyTracker = proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt)
consulServer, err = consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, proxyTracker)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
incomingRPCLimiter.Register(server)
a.delegate = server
incomingRPCLimiter.Register(consulServer)
a.delegate = consulServer

if a.config.PeeringEnabled && a.config.ConnectEnabled {
d := servercert.Deps{
Expand All @@ -748,7 +719,7 @@ func (a *Agent) Start(ctx context.Context) error {
ACLsEnabled: a.config.ACLsEnabled,
},
LeafCertManager: a.leafCertManager,
GetStore: func() servercert.Store { return server.FSM().State() },
GetStore: func() servercert.Store { return consulServer.FSM().State() },
TLSConfigurator: a.tlsConfigurator,
}
a.certManager = servercert.NewCertManager(d)
Expand Down Expand Up @@ -804,6 +775,35 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(consulServer),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{
Expand Down Expand Up @@ -856,7 +856,7 @@ func (a *Agent) Start(ctx context.Context) error {
}

// Start grpc and grpc_tls servers.
if err := a.listenAndServeGRPC(proxyWatcher); err != nil {
if err := a.listenAndServeGRPC(proxyTracker, consulServer); err != nil {
return err
}

Expand Down Expand Up @@ -921,29 +921,13 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}

// getProxyWatcher returns the proper implementation of the ProxyWatcher interface.
// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise,
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.baseDeps.UseV2Resources() {
a.logger.Trace("returning proxyTracker for getProxyWatcher")
jmurret marked this conversation as resolved.
Show resolved Hide resolved
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
a.logger.Trace("returning configSource for getProxyWatcher")
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}

// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher, server *consul.Server) {
// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about reducing server pointers being passed

switch proxyWatcher.(type) {
case *proxytracker.ProxyTracker:
go func() {
Expand Down Expand Up @@ -979,12 +963,18 @@ func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
a.xdsServer.Register(a.externalGRPCServer)
}

func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error {
func (a *Agent) listenAndServeGRPC(proxyTracker *proxytracker.ProxyTracker, server *consul.Server) error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}
var proxyWatcher xds.ProxyWatcher
if a.baseDeps.UseV2Resources() {
proxyWatcher = proxyTracker
} else {
proxyWatcher = localproxycfg.NewConfigSource(a.proxyConfig)
}

a.configureXDSServer(proxyWatcher)
a.configureXDSServer(proxyWatcher, server)

// Attempt to spawn listeners
var listeners []net.Listener
Expand Down Expand Up @@ -4579,7 +4569,7 @@ func (a *Agent) listenerPortLocked(svcID structs.ServiceID, checkID structs.Chec
return port, nil
}

func (a *Agent) proxyDataSources() proxycfg.DataSources {
func (a *Agent) proxyDataSources(server *consul.Server) proxycfg.DataSources {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Should we minimize the pointers to the server object and just have a isServer bool passed in?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be nice, but not like a mandatory thing, as we could chase that immediately after for x.x.1

sources := proxycfg.DataSources{
CARoots: proxycfgglue.CacheCARoots(a.cache),
CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache),
Expand All @@ -4606,7 +4596,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache),
}

if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
deps := proxycfgglue.ServerDataSourceDeps{
Datacenter: a.config.Datacenter,
EventPublisher: a.baseDeps.EventPublisher,
Expand Down
74 changes: 0 additions & 74 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,12 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/tcpproxy"
Expand Down Expand Up @@ -6442,73 +6435,6 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
})
}

func TestAgent_getProxyWatcher(t *testing.T) {
type testcase struct {
description string
getExperiments func() []string
expectedType xds.ProxyWatcher
}
testscases := []testcase{
{
description: "config source is returned when api-resources experiment is not configured",
expectedType: &local.ConfigSource{},
getExperiments: func() []string {
return []string{}
},
},
{
description: "proxy tracker is returned when api-resources experiment is configured",
expectedType: &proxytracker.ProxyTracker{},
getExperiments: func() []string {
return []string{consul.CatalogResourceExperimentName}
},
},
}
for _, tc := range testscases {
caConfig := tlsutil.Config{}
tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil))
require.NoError(t, err)

bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)},
},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}

bd.XDSStreamLimiter = limiter.NewSessionLimiter()
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})

cfg := config.RuntimeConfig{
BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC),
}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)

bd.Experiments = tc.getExperiments()

agent, err := New(bd)
require.NoError(t, err)
agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}})
require.NoError(t, err)
require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType)))
}

}
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
Expand Down
9 changes: 3 additions & 6 deletions agent/consul/discoverychain/gateway_httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,9 @@ func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.Ser
}

if rule.Filters.RetryFilter != nil {
if rule.Filters.RetryFilter.NumRetries != nil {
destination.NumRetries = *rule.Filters.RetryFilter.NumRetries
}
if rule.Filters.RetryFilter.RetryOnConnectFailure != nil {
destination.RetryOnConnectFailure = *rule.Filters.RetryFilter.RetryOnConnectFailure
}

destination.NumRetries = rule.Filters.RetryFilter.NumRetries
destination.RetryOnConnectFailure = rule.Filters.RetryFilter.RetryOnConnectFailure

if len(rule.Filters.RetryFilter.RetryOn) > 0 {
destination.RetryOn = rule.Filters.RetryFilter.RetryOn
Expand Down
6 changes: 4 additions & 2 deletions agent/consul/gateways/controller_gateways.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,14 +686,16 @@ func (g *gatewayMeta) updateRouteBinding(route structs.BoundRoute) (bool, []stru
errors[ref] = err
}

isValidJWT := true
if httpRoute, ok := route.(*structs.HTTPRouteConfigEntry); ok {
var jwtErrors map[structs.ResourceReference]error
didBind, jwtErrors = g.validateJWTForRoute(httpRoute)
isValidJWT, jwtErrors = g.validateJWTForRoute(httpRoute)
for ref, err := range jwtErrors {
errors[ref] = err
}
}
if didBind {

if didBind && isValidJWT {
refDidBind = true
listenerBound[listener.Name] = true
}
Expand Down
4 changes: 2 additions & 2 deletions agent/structs/config_entry_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,10 @@ type URLRewrite struct {
}

type RetryFilter struct {
NumRetries *uint32
NumRetries uint32
RetryOn []string
RetryOnStatusCodes []uint32
RetryOnConnectFailure *bool
RetryOnConnectFailure bool
}

type TimeoutFilter struct {
Expand Down
16 changes: 0 additions & 16 deletions agent/structs/structs.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
if o.Rules[i2].Filters.RetryFilter != nil {
cp.Rules[i2].Filters.RetryFilter = new(RetryFilter)
*cp.Rules[i2].Filters.RetryFilter = *o.Rules[i2].Filters.RetryFilter
if o.Rules[i2].Filters.RetryFilter.NumRetries != nil {
cp.Rules[i2].Filters.RetryFilter.NumRetries = new(uint32)
*cp.Rules[i2].Filters.RetryFilter.NumRetries = *o.Rules[i2].Filters.RetryFilter.NumRetries
}
if o.Rules[i2].Filters.RetryFilter.RetryOn != nil {
cp.Rules[i2].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Filters.RetryFilter.RetryOn))
copy(cp.Rules[i2].Filters.RetryFilter.RetryOn, o.Rules[i2].Filters.RetryFilter.RetryOn)
Expand All @@ -412,10 +408,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes))
copy(cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes)
}
if o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure != nil {
cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = new(bool)
*cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure
}
}
if o.Rules[i2].Filters.TimeoutFilter != nil {
cp.Rules[i2].Filters.TimeoutFilter = new(TimeoutFilter)
Expand Down Expand Up @@ -493,10 +485,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
if o.Rules[i2].Services[i4].Filters.RetryFilter != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter = new(RetryFilter)
*cp.Rules[i2].Services[i4].Filters.RetryFilter = *o.Rules[i2].Services[i4].Filters.RetryFilter
if o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = new(uint32)
*cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = *o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries
}
if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn))
copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn)
Expand All @@ -505,10 +493,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes))
copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes)
}
if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = new(bool)
*cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure
}
}
if o.Rules[i2].Services[i4].Filters.TimeoutFilter != nil {
cp.Rules[i2].Services[i4].Filters.TimeoutFilter = new(TimeoutFilter)
Expand Down
Loading