Skip to content

Commit

Permalink
Add Peer field to service-defaults upstream overrides.
Browse files Browse the repository at this point in the history
  • Loading branch information
hashi-derek committed Jan 11, 2023
1 parent f5231b9 commit 1870dd9
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 95 deletions.
3 changes: 3 additions & 0 deletions .changelog/15956.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:breaking-change
connect: Add `peer` field to service-defaults upstream overrides. The addition of this field makes it possible to apply upstream overrides only to peer services. Prior to this change, overrides would be applied based on matching the `namespace` and `name` fields only, which means users could not have different configuration for local versus peer services. With this change, peer upstreams are only affected if the `peer` field matches the destination peer name.
```
17 changes: 11 additions & 6 deletions agent/configentry/merge_service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ func MergeNodeServiceWithCentralConfig(
// so we can learn about their configs.
for _, us := range ns.Proxy.Upstreams {
if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService {
sid := us.DestinationID()
// Peer services do not have service-defaults config entries to fetch.
if us.DestinationPeer != "" {
continue
}
sid := us.DestinationID().ServiceName.ToServiceID()
sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta)
upstreams = append(upstreams, sid)
}
Expand Down Expand Up @@ -149,7 +153,7 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
}

// remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs).
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
remoteUpstreams := make(map[structs.PeeredServiceName]structs.Upstream)

for _, us := range defaults.UpstreamIDConfigs {
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
Expand All @@ -158,9 +162,10 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
}

remoteUpstreams[us.Upstream] = structs.Upstream{
DestinationNamespace: us.Upstream.NamespaceOrDefault(),
DestinationPartition: us.Upstream.PartitionOrDefault(),
DestinationName: us.Upstream.ID,
DestinationNamespace: us.Upstream.ServiceName.NamespaceOrDefault(),
DestinationPartition: us.Upstream.ServiceName.PartitionOrDefault(),
DestinationName: us.Upstream.ServiceName.Name,
DestinationPeer: us.Upstream.Peer,
Config: us.Config,
MeshGateway: parsed.MeshGateway,
CentrallyConfigured: true,
Expand All @@ -170,7 +175,7 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstreams with those values.
localUpstreams := make(map[structs.ServiceID]struct{})
localUpstreams := make(map[structs.PeeredServiceName]struct{})

// Merge upstream defaults into the local registration
for i := range ns.Proxy.Upstreams {
Expand Down
38 changes: 10 additions & 28 deletions agent/configentry/merge_service_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
zapUpstreamId := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("zap", structs.DefaultEnterpriseMetaInDefaultPartition()),
}
tests := []struct {
name string
args args
Expand All @@ -286,10 +289,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
Expand Down Expand Up @@ -357,10 +357,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "grpc",
},
Expand Down Expand Up @@ -432,10 +429,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "grpc",
},
Expand Down Expand Up @@ -490,10 +484,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
Expand Down Expand Up @@ -550,10 +541,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
Expand Down Expand Up @@ -613,10 +601,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down Expand Up @@ -661,10 +646,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down
29 changes: 17 additions & 12 deletions agent/configentry/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func ComputeResolvedServiceConfig(
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
seenUpstreams := map[structs.PeeredServiceName]struct{}{}

var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
Expand All @@ -132,17 +132,20 @@ func ComputeResolvedServiceConfig(

// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
psn := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(sid.ID, &sid.EnterpriseMeta),
}
if _, ok := seenUpstreams[psn]; !ok {
seenUpstreams[psn] = struct{}{}
}
}

// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamOverrides = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamOverrides = make(map[structs.PeeredServiceName]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// resolvedConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
resolvedConfigs = make(map[structs.ServiceID]map[string]interface{})
resolvedConfigs = make(map[structs.PeeredServiceName]map[string]interface{})
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
Expand All @@ -156,8 +159,9 @@ func ComputeResolvedServiceConfig(
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamOverrides[override.ServiceID()] = override
psn := override.PeeredServiceName()
seenUpstreams[psn] = struct{}{}
upstreamOverrides[psn] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
Expand All @@ -175,7 +179,9 @@ func ComputeResolvedServiceConfig(
cfgMap["mesh_gateway"] = args.MeshGateway
}

wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
wildcard := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(structs.WildcardSpecifier, args.WithWildcardNamespace()),
}
resolvedConfigs[wildcard] = cfgMap
}
}
Expand All @@ -190,9 +196,7 @@ func ComputeResolvedServiceConfig(
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol

upstreamSvcDefaults := entries.GetServiceDefaults(
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
)
upstreamSvcDefaults := entries.GetServiceDefaults(upstream.ServiceName.ToServiceID())
if upstreamSvcDefaults != nil {
if upstreamSvcDefaults.Protocol != "" {
protocol = upstreamSvcDefaults.Protocol
Expand Down Expand Up @@ -249,12 +253,13 @@ func ComputeResolvedServiceConfig(
return &thisReply, nil
}

// TODO can we remove these legacy upstreams?
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})

for us, conf := range resolvedConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
thisReply.UpstreamConfigs[us.ServiceName.Name] = conf
}

} else {
Expand Down
17 changes: 10 additions & 7 deletions agent/configentry/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
ID: "sid",
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}
uid := structs.ServiceID{
ID: "upstream1",
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
uid := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("upstream1", acl.DefaultEnterpriseMeta()),
}

uids := []structs.ServiceID{uid.ServiceName.ToServiceID()}

wildcard := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(structs.WildcardSpecifier, acl.WildcardEnterpriseMeta()),
}
uids := []structs.ServiceID{uid}
wildcard := structs.NewServiceID(structs.WildcardSpecifier, acl.WildcardEnterpriseMeta())

localMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}
remoteMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}
Expand Down Expand Up @@ -361,7 +364,7 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
},
Overrides: []*structs.UpstreamConfig{
{
Name: uid.ID,
Name: uid.ServiceName.Name,
MeshGateway: remoteMeshGW, // applied 3rd
},
},
Expand Down Expand Up @@ -473,7 +476,7 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
require.NoError(t, err)
// This is needed because map iteration is random and determines the order of some outputs.
sort.Slice(got.UpstreamIDConfigs, func(i, j int) bool {
return got.UpstreamIDConfigs[i].Upstream.ID < got.UpstreamIDConfigs[j].Upstream.ID
return got.UpstreamIDConfigs[i].Upstream.ServiceName.Name < got.UpstreamIDConfigs[j].Upstream.ServiceName.Name
})
require.Equal(t, tt.want, got)
})
Expand Down
27 changes: 16 additions & 11 deletions agent/consul/config_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,9 +1227,15 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
}
t.Parallel()

mysql := structs.NewServiceID("mysql", structs.DefaultEnterpriseMetaInDefaultPartition())
cache := structs.NewServiceID("cache", structs.DefaultEnterpriseMetaInDefaultPartition())
wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition())
cache := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("cache", structs.DefaultEnterpriseMetaInDefaultPartition()),
}
mysql := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("mysql", structs.DefaultEnterpriseMetaInDefaultPartition()),
}
wildcard := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition()),
}

tt := []struct {
name string
Expand Down Expand Up @@ -1306,7 +1312,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
Name: "api",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
cache,
cache.ServiceName.ToServiceID(),
},
},
expect: structs.ServiceConfigResponse{
Expand All @@ -1321,10 +1327,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
},
},
{
Upstream: structs.ServiceID{
ID: "mysql",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: mysql,
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down Expand Up @@ -1352,7 +1355,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
Mode: structs.MeshGatewayModeNone,
},
UpstreamIDs: []structs.ServiceID{
mysql,
mysql.ServiceName.ToServiceID(),
},
},
expect: structs.ServiceConfigResponse{
Expand Down Expand Up @@ -1421,7 +1424,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
Mode: structs.MeshGatewayModeNone,
},
UpstreamIDs: []structs.ServiceID{
mysql,
mysql.ServiceName.ToServiceID(),
},
},
expect: structs.ServiceConfigResponse{
Expand Down Expand Up @@ -1856,7 +1859,9 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
},
UpstreamIDConfigs: []structs.OpaqueUpstreamConfig{
{
Upstream: structs.NewServiceID("bar", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("bar", nil),
},
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down
6 changes: 5 additions & 1 deletion agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,11 @@ func (s *Store) ReadResolvedServiceConfigEntries(
if override.Name == "" {
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
if override.Peer != "" {
continue // Peer services do not have service-defaults config entries to fetch.
}
sid := override.PeeredServiceName().ServiceName.ToServiceID()
seenUpstreams[sid] = struct{}{}
}
}

Expand Down
6 changes: 5 additions & 1 deletion agent/service_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
// learn about their configs.
for _, us := range ns.Proxy.Upstreams {
if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService {
sid := us.DestinationID()
// Peer services do not have service-defaults config entries to fetch.
if us.DestinationPeer != "" {
continue
}
sid := us.DestinationID().ServiceName.ToServiceID()
sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta)
upstreams = append(upstreams, sid)
}
Expand Down
12 changes: 9 additions & 3 deletions agent/service_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,9 @@ func TestServiceManager_PersistService_API(t *testing.T) {
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("redis", nil),
},
Config: map[string]interface{}{
"protocol": "tcp",
},
Expand Down Expand Up @@ -468,7 +470,9 @@ func TestServiceManager_PersistService_API(t *testing.T) {
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("redis", nil),
},
Config: map[string]interface{}{
"protocol": "tcp",
},
Expand Down Expand Up @@ -647,7 +651,9 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("redis", nil),
},
Config: map[string]interface{}{
"protocol": "tcp",
},
Expand Down
Loading

0 comments on commit 1870dd9

Please sign in to comment.