Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual Backport of NET-6294 - v1 Agentless proxycfg datasource errors after v2 changes #19415

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ MOCKERY_VERSION='v2.20.0'
BUF_VERSION='v1.26.0'

PROTOC_GEN_GO_GRPC_VERSION="v1.2.0"
MOG_VERSION='v0.4.0'
MOG_VERSION='v0.4.1'
PROTOC_GO_INJECT_TAG_VERSION='v1.3.0'
PROTOC_GEN_GO_BINARY_VERSION="v0.1.0"
DEEP_COPY_VERSION='bc3f5aa5735d8a54961580a3a24422c308c831c2'
118 changes: 54 additions & 64 deletions agent/agent.go
Original file line number Diff line number Diff line change
@@ -656,43 +656,12 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}

// Create proxy config manager now because it is a dependency of creating the proxyWatcher
// which will be passed to consul.NewServer so that it is then passed to the
// controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode.
intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

// proxyWatcher will be used in the creation of the XDS server and also
// in the registration of the xds controller.
proxyWatcher := a.getProxyWatcher()
// proxyTracker will be used in the creation of the XDS server and also
// in the registration of the v2 xds controller
var proxyTracker *proxytracker.ProxyTracker

// Setup either the client or the server.
var consulServer *consul.Server
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)

@@ -729,16 +698,18 @@ func (a *Agent) Start(ctx context.Context) error {
},
)

var pt *proxytracker.ProxyTracker
if a.baseDeps.UseV2Resources() {
pt = proxyWatcher.(*proxytracker.ProxyTracker)
proxyTracker = proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt)
consulServer, err = consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, proxyTracker)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
incomingRPCLimiter.Register(server)
a.delegate = server
incomingRPCLimiter.Register(consulServer)
a.delegate = consulServer

if a.config.PeeringEnabled && a.config.ConnectEnabled {
d := servercert.Deps{
@@ -748,7 +719,7 @@ func (a *Agent) Start(ctx context.Context) error {
ACLsEnabled: a.config.ACLsEnabled,
},
LeafCertManager: a.leafCertManager,
GetStore: func() servercert.Store { return server.FSM().State() },
GetStore: func() servercert.Store { return consulServer.FSM().State() },
TLSConfigurator: a.tlsConfigurator,
}
a.certManager = servercert.NewCertManager(d)
@@ -804,6 +775,35 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(consulServer),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{
@@ -856,7 +856,7 @@ func (a *Agent) Start(ctx context.Context) error {
}

// Start grpc and grpc_tls servers.
if err := a.listenAndServeGRPC(proxyWatcher); err != nil {
if err := a.listenAndServeGRPC(proxyTracker, consulServer); err != nil {
return err
}

@@ -921,29 +921,13 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}

// getProxyWatcher returns the proper implementation of the ProxyWatcher interface.
// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise,
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.baseDeps.UseV2Resources() {
a.logger.Trace("returning proxyTracker for getProxyWatcher")
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
a.logger.Trace("returning configSource for getProxyWatcher")
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}

// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher, server *consul.Server) {
// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
switch proxyWatcher.(type) {
case *proxytracker.ProxyTracker:
go func() {
@@ -979,12 +963,18 @@ func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
a.xdsServer.Register(a.externalGRPCServer)
}

func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error {
func (a *Agent) listenAndServeGRPC(proxyTracker *proxytracker.ProxyTracker, server *consul.Server) error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}
var proxyWatcher xds.ProxyWatcher
if a.baseDeps.UseV2Resources() {
proxyWatcher = proxyTracker
} else {
proxyWatcher = localproxycfg.NewConfigSource(a.proxyConfig)
}

a.configureXDSServer(proxyWatcher)
a.configureXDSServer(proxyWatcher, server)

// Attempt to spawn listeners
var listeners []net.Listener
@@ -4579,7 +4569,7 @@ func (a *Agent) listenerPortLocked(svcID structs.ServiceID, checkID structs.Chec
return port, nil
}

func (a *Agent) proxyDataSources() proxycfg.DataSources {
func (a *Agent) proxyDataSources(server *consul.Server) proxycfg.DataSources {
sources := proxycfg.DataSources{
CARoots: proxycfgglue.CacheCARoots(a.cache),
CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache),
@@ -4606,7 +4596,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache),
}

if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
deps := proxycfgglue.ServerDataSourceDeps{
Datacenter: a.config.Datacenter,
EventPublisher: a.baseDeps.EventPublisher,
74 changes: 0 additions & 74 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -23,19 +23,12 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/tcpproxy"
@@ -6442,73 +6435,6 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
})
}

func TestAgent_getProxyWatcher(t *testing.T) {
type testcase struct {
description string
getExperiments func() []string
expectedType xds.ProxyWatcher
}
testscases := []testcase{
{
description: "config source is returned when api-resources experiment is not configured",
expectedType: &local.ConfigSource{},
getExperiments: func() []string {
return []string{}
},
},
{
description: "proxy tracker is returned when api-resources experiment is configured",
expectedType: &proxytracker.ProxyTracker{},
getExperiments: func() []string {
return []string{consul.CatalogResourceExperimentName}
},
},
}
for _, tc := range testscases {
caConfig := tlsutil.Config{}
tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil))
require.NoError(t, err)

bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)},
},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}

bd.XDSStreamLimiter = limiter.NewSessionLimiter()
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})

cfg := config.RuntimeConfig{
BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC),
}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)

bd.Experiments = tc.getExperiments()

agent, err := New(bd)
require.NoError(t, err)
agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}})
require.NoError(t, err)
require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType)))
}

}
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
9 changes: 3 additions & 6 deletions agent/consul/discoverychain/gateway_httproute.go
Original file line number Diff line number Diff line change
@@ -177,12 +177,9 @@ func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.Ser
}

if rule.Filters.RetryFilter != nil {
if rule.Filters.RetryFilter.NumRetries != nil {
destination.NumRetries = *rule.Filters.RetryFilter.NumRetries
}
if rule.Filters.RetryFilter.RetryOnConnectFailure != nil {
destination.RetryOnConnectFailure = *rule.Filters.RetryFilter.RetryOnConnectFailure
}

destination.NumRetries = rule.Filters.RetryFilter.NumRetries
destination.RetryOnConnectFailure = rule.Filters.RetryFilter.RetryOnConnectFailure

if len(rule.Filters.RetryFilter.RetryOn) > 0 {
destination.RetryOn = rule.Filters.RetryFilter.RetryOn
6 changes: 4 additions & 2 deletions agent/consul/gateways/controller_gateways.go
Original file line number Diff line number Diff line change
@@ -686,14 +686,16 @@ func (g *gatewayMeta) updateRouteBinding(route structs.BoundRoute) (bool, []stru
errors[ref] = err
}

isValidJWT := true
if httpRoute, ok := route.(*structs.HTTPRouteConfigEntry); ok {
var jwtErrors map[structs.ResourceReference]error
didBind, jwtErrors = g.validateJWTForRoute(httpRoute)
isValidJWT, jwtErrors = g.validateJWTForRoute(httpRoute)
for ref, err := range jwtErrors {
errors[ref] = err
}
}
if didBind {

if didBind && isValidJWT {
refDidBind = true
listenerBound[listener.Name] = true
}
4 changes: 2 additions & 2 deletions agent/structs/config_entry_routes.go
Original file line number Diff line number Diff line change
@@ -475,10 +475,10 @@ type URLRewrite struct {
}

type RetryFilter struct {
NumRetries *uint32
NumRetries uint32
RetryOn []string
RetryOnStatusCodes []uint32
RetryOnConnectFailure *bool
RetryOnConnectFailure bool
}

type TimeoutFilter struct {
16 changes: 0 additions & 16 deletions agent/structs/structs.deepcopy.go
Original file line number Diff line number Diff line change
@@ -400,10 +400,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
if o.Rules[i2].Filters.RetryFilter != nil {
cp.Rules[i2].Filters.RetryFilter = new(RetryFilter)
*cp.Rules[i2].Filters.RetryFilter = *o.Rules[i2].Filters.RetryFilter
if o.Rules[i2].Filters.RetryFilter.NumRetries != nil {
cp.Rules[i2].Filters.RetryFilter.NumRetries = new(uint32)
*cp.Rules[i2].Filters.RetryFilter.NumRetries = *o.Rules[i2].Filters.RetryFilter.NumRetries
}
if o.Rules[i2].Filters.RetryFilter.RetryOn != nil {
cp.Rules[i2].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Filters.RetryFilter.RetryOn))
copy(cp.Rules[i2].Filters.RetryFilter.RetryOn, o.Rules[i2].Filters.RetryFilter.RetryOn)
@@ -412,10 +408,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes))
copy(cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes)
}
if o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure != nil {
cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = new(bool)
*cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure
}
}
if o.Rules[i2].Filters.TimeoutFilter != nil {
cp.Rules[i2].Filters.TimeoutFilter = new(TimeoutFilter)
@@ -493,10 +485,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
if o.Rules[i2].Services[i4].Filters.RetryFilter != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter = new(RetryFilter)
*cp.Rules[i2].Services[i4].Filters.RetryFilter = *o.Rules[i2].Services[i4].Filters.RetryFilter
if o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = new(uint32)
*cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = *o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries
}
if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn))
copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn)
@@ -505,10 +493,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes))
copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes)
}
if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = new(bool)
*cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure
}
}
if o.Rules[i2].Services[i4].Filters.TimeoutFilter != nil {
cp.Rules[i2].Services[i4].Filters.TimeoutFilter = new(TimeoutFilter)
Loading