Skip to content

Commit

Permalink
NET-6294 - v1 Agentless proxycfg datasource errors after v2 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jmurret committed Oct 25, 2023
1 parent 6d5c01e commit 425fb0f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 138 deletions.
118 changes: 54 additions & 64 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,43 +656,12 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}

// Create proxy config manager now because it is a dependency of creating the proxyWatcher
// which will be passed to consul.NewServer so that it is then passed to the
// controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode.
intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

// proxyWatcher will be used in the creation of the XDS server and also
// in the registration of the xds controller.
proxyWatcher := a.getProxyWatcher()
// proxyTracker will be used in the creation of the XDS server and also
// in the registration of the v2 xds controller
var proxyTracker *proxytracker.ProxyTracker

// Setup either the client or the server.
var consulServer *consul.Server
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)

Expand Down Expand Up @@ -729,16 +698,18 @@ func (a *Agent) Start(ctx context.Context) error {
},
)

var pt *proxytracker.ProxyTracker
if a.baseDeps.UseV2Resources() {
pt = proxyWatcher.(*proxytracker.ProxyTracker)
proxyTracker = proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt)
consulServer, err = consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, proxyTracker)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
incomingRPCLimiter.Register(server)
a.delegate = server
incomingRPCLimiter.Register(consulServer)
a.delegate = consulServer

if a.config.PeeringEnabled && a.config.ConnectEnabled {
d := servercert.Deps{
Expand All @@ -748,7 +719,7 @@ func (a *Agent) Start(ctx context.Context) error {
ACLsEnabled: a.config.ACLsEnabled,
},
LeafCertManager: a.leafCertManager,
GetStore: func() servercert.Store { return server.FSM().State() },
GetStore: func() servercert.Store { return consulServer.FSM().State() },
TLSConfigurator: a.tlsConfigurator,
}
a.certManager = servercert.NewCertManager(d)
Expand Down Expand Up @@ -804,6 +775,35 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(consulServer),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{
Expand Down Expand Up @@ -856,7 +856,7 @@ func (a *Agent) Start(ctx context.Context) error {
}

// Start grpc and grpc_tls servers.
if err := a.listenAndServeGRPC(proxyWatcher); err != nil {
if err := a.listenAndServeGRPC(proxyTracker, consulServer); err != nil {
return err
}

Expand Down Expand Up @@ -921,29 +921,13 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}

// getProxyWatcher returns the proper implementation of the ProxyWatcher interface.
// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise,
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.baseDeps.UseV2Resources() {
a.logger.Trace("returning proxyTracker for getProxyWatcher")
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
a.logger.Trace("returning configSource for getProxyWatcher")
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}

// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher, server *consul.Server) {
// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
switch proxyWatcher.(type) {
case *proxytracker.ProxyTracker:
go func() {
Expand Down Expand Up @@ -979,12 +963,18 @@ func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
a.xdsServer.Register(a.externalGRPCServer)
}

func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error {
func (a *Agent) listenAndServeGRPC(proxyTracker *proxytracker.ProxyTracker, server *consul.Server) error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}
var proxyWatcher xds.ProxyWatcher
if a.baseDeps.UseV2Resources() {
proxyWatcher = proxyTracker
} else {
proxyWatcher = localproxycfg.NewConfigSource(a.proxyConfig)
}

a.configureXDSServer(proxyWatcher)
a.configureXDSServer(proxyWatcher, server)

// Attempt to spawn listeners
var listeners []net.Listener
Expand Down Expand Up @@ -4579,7 +4569,7 @@ func (a *Agent) listenerPortLocked(svcID structs.ServiceID, checkID structs.Chec
return port, nil
}

func (a *Agent) proxyDataSources() proxycfg.DataSources {
func (a *Agent) proxyDataSources(server *consul.Server) proxycfg.DataSources {
sources := proxycfg.DataSources{
CARoots: proxycfgglue.CacheCARoots(a.cache),
CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache),
Expand All @@ -4606,7 +4596,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache),
}

if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
deps := proxycfgglue.ServerDataSourceDeps{
Datacenter: a.config.Datacenter,
EventPublisher: a.baseDeps.EventPublisher,
Expand Down
74 changes: 0 additions & 74 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,12 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/tcpproxy"
Expand Down Expand Up @@ -6442,73 +6435,6 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
})
}

func TestAgent_getProxyWatcher(t *testing.T) {
type testcase struct {
description string
getExperiments func() []string
expectedType xds.ProxyWatcher
}
testscases := []testcase{
{
description: "config source is returned when api-resources experiment is not configured",
expectedType: &local.ConfigSource{},
getExperiments: func() []string {
return []string{}
},
},
{
description: "proxy tracker is returned when api-resources experiment is configured",
expectedType: &proxytracker.ProxyTracker{},
getExperiments: func() []string {
return []string{consul.CatalogResourceExperimentName}
},
},
}
for _, tc := range testscases {
caConfig := tlsutil.Config{}
tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil))
require.NoError(t, err)

bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)},
},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}

bd.XDSStreamLimiter = limiter.NewSessionLimiter()
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})

cfg := config.RuntimeConfig{
BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC),
}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)

bd.Experiments = tc.getExperiments()

agent, err := New(bd)
require.NoError(t, err)
agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}})
require.NoError(t, err)
require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType)))
}

}
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
Expand Down

0 comments on commit 425fb0f

Please sign in to comment.