Skip to content

Commit

Permalink
Merge pull request #9976 from hashicorp/centralized-upstream-fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
freddygv authored Apr 8, 2021
2 parents e385e59 + d6db67e commit a02245b
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 38 deletions.
36 changes: 30 additions & 6 deletions agent/consul/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,29 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
upstreamIDs := args.UpstreamIDs
legacyUpstreams := false

// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
if len(upstreamIDs) == 0 {
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0

// Check the args and the resolved value. If it was exclusively set via a config entry, then args.TransparentProxy
// will never be true because the service config request does not use the resolved value.
tproxy = args.TransparentProxy || reply.TransparentProxy
)

// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only returned the resolved config if the proxy has TransparentProxy mode enabled.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return nil
}

// The request is considered legacy if the deprecated args.Upstream was used
if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 {
legacyUpstreams = true

upstreamIDs = make([]structs.ServiceID, 0)
for _, upstream := range args.Upstreams {
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
upstreamIDs = append(upstreamIDs, sid)
}
Expand All @@ -436,22 +452,30 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}
}

// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs := make(map[structs.ServiceID]map[string]interface{})

var (
upstreamDefaults *structs.UpstreamConfig
upstreamConfigs map[string]*structs.UpstreamConfig
)
if serviceConf != nil && serviceConf.Connect != nil {
if serviceConf.Connect.UpstreamDefaults != nil {
upstreamDefaults = serviceConf.Connect.UpstreamDefaults

// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)

wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
usConfigs[wildcard] = cfgMap
}
if serviceConf.Connect.UpstreamConfigs != nil {
upstreamConfigs = serviceConf.Connect.UpstreamConfigs
}
}

// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs := make(map[structs.ServiceID]map[string]interface{})

for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})

Expand Down
146 changes: 146 additions & 0 deletions agent/consul/config_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {

mysql := structs.NewServiceID("mysql", structs.DefaultEnterpriseMeta())
cache := structs.NewServiceID("cache", structs.DefaultEnterpriseMeta())
wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())

tt := []struct {
name string
Expand Down Expand Up @@ -1126,6 +1127,14 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
},
expect: structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
Expand Down Expand Up @@ -1188,6 +1197,19 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
"protocol": "udp",
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
"protocol": "http",
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
Expand All @@ -1204,6 +1226,130 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
},
},
},
{
name: "without upstream args we should return centralized config with tproxy arg",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "api",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
mysql.String(): {
Protocol: "grpc",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "api",
Datacenter: "dc1",
TransparentProxy: true,

// Empty Upstreams/UpstreamIDs
},
expect: structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
"protocol": "grpc",
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
},
},
},
{
name: "without upstream args we should return centralized config with tproxy default",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "api",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
mysql.String(): {
Protocol: "grpc",
},
},
},

// TransparentProxy on the config entry but not the config request
TransparentProxy: true,
},
},
request: structs.ServiceConfigRequest{
Name: "api",
Datacenter: "dc1",

// Empty Upstreams/UpstreamIDs
},
expect: structs.ServiceConfigResponse{
TransparentProxy: true,
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
"protocol": "grpc",
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
},
},
},
{
name: "without upstream args we should NOT return centralized config outside tproxy mode",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "api",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
mysql.String(): {
Protocol: "grpc",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "api",
Datacenter: "dc1",
TransparentProxy: false,

// Empty Upstreams/UpstreamIDs
},
expect: structs.ServiceConfigResponse{},
},
}

for _, tc := range tt {
Expand Down
64 changes: 53 additions & 11 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
for i := range s.proxyCfg.Upstreams {
u := s.proxyCfg.Upstreams[i]

// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
if u.DestinationName == structs.WildcardSpecifier {
snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u
continue
}

// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
// Watches should not be created for them.
if u.CentrallyConfigured {
Expand Down Expand Up @@ -795,6 +801,17 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
if ok {
cfgMap = u.Config
} else {
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
// by the ResolveServiceConfig endpoint.
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
if ok {
u = defaults
cfgMap = defaults.Config
snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults
}
}

cfg, err := parseReducedUpstreamConfig(cfgMap)
Expand All @@ -808,7 +825,18 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
)
}

err = s.watchDiscoveryChain(snap, cfg, svc.String(), svc.Name, svc.NamespaceOrDefault())
meshGateway := s.proxyCfg.MeshGateway
if u != nil {
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
}
watchOpts := discoveryChainWatchOpts{
id: svc.String(),
name: svc.Name,
namespace: svc.NamespaceOrDefault(),
cfg: cfg,
meshGateway: meshGateway,
}
err = s.watchDiscoveryChain(snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
}
Expand Down Expand Up @@ -1592,7 +1620,12 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
for _, service := range services.Services {
u := makeUpstream(service)

err := s.watchDiscoveryChain(snap, reducedUpstreamConfig{}, u.Identifier(), u.DestinationName, u.DestinationNamespace)
watchOpts := discoveryChainWatchOpts{
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
}
err := s.watchDiscoveryChain(snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}
Expand Down Expand Up @@ -1642,31 +1675,40 @@ func makeUpstream(g *structs.GatewayService) structs.Upstream {
return upstream
}

func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamConfig, id, name, namespace string) error {
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[id]; ok {
type discoveryChainWatchOpts struct {
id string
name string
namespace string
cfg reducedUpstreamConfig
meshGateway structs.MeshGatewayConfig
}

func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
return nil
}

ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: name,
Name: opts.name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: namespace,
OverrideProtocol: cfg.Protocol,
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+id, s.ch)
EvaluateInNamespace: opts.namespace,
OverrideProtocol: opts.cfg.Protocol,
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
OverrideMeshGateway: opts.meshGateway,
}, "discovery-chain:"+opts.id, s.ch)
if err != nil {
cancel()
return err
}

switch s.kind {
case structs.ServiceKindIngressGateway:
snap.IngressGateway.WatchedDiscoveryChains[id] = cancel
snap.IngressGateway.WatchedDiscoveryChains[opts.id] = cancel
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.WatchedDiscoveryChains[id] = cancel
snap.ConnectProxy.WatchedDiscoveryChains[opts.id] = cancel
default:
cancel()
return fmt.Errorf("unsupported kind %s", s.kind)
Expand Down
Loading

0 comments on commit a02245b

Please sign in to comment.