Skip to content

Commit

Permalink
Add Peer field to service-defaults upstream overrides.
Browse files Browse the repository at this point in the history
  • Loading branch information
hashi-derek committed Jan 11, 2023
1 parent f5231b9 commit d1b1d17
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 95 deletions.
17 changes: 11 additions & 6 deletions agent/configentry/merge_service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ func MergeNodeServiceWithCentralConfig(
// so we can learn about their configs.
for _, us := range ns.Proxy.Upstreams {
if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService {
sid := us.DestinationID()
// Peer services do not have service-defaults config entries to fetch.
if us.DestinationPeer != "" {
continue
}
sid := us.DestinationID().ServiceName.ToServiceID()
sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta)
upstreams = append(upstreams, sid)
}
Expand Down Expand Up @@ -149,7 +153,7 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
}

// remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs).
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
remoteUpstreams := make(map[structs.PeeredServiceName]structs.Upstream)

for _, us := range defaults.UpstreamIDConfigs {
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
Expand All @@ -158,9 +162,10 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
}

remoteUpstreams[us.Upstream] = structs.Upstream{
DestinationNamespace: us.Upstream.NamespaceOrDefault(),
DestinationPartition: us.Upstream.PartitionOrDefault(),
DestinationName: us.Upstream.ID,
DestinationNamespace: us.Upstream.ServiceName.NamespaceOrDefault(),
DestinationPartition: us.Upstream.ServiceName.PartitionOrDefault(),
DestinationName: us.Upstream.ServiceName.Name,
DestinationPeer: us.Upstream.Peer,
Config: us.Config,
MeshGateway: parsed.MeshGateway,
CentrallyConfigured: true,
Expand All @@ -170,7 +175,7 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstreams with those values.
localUpstreams := make(map[structs.ServiceID]struct{})
localUpstreams := make(map[structs.PeeredServiceName]struct{})

// Merge upstream defaults into the local registration
for i := range ns.Proxy.Upstreams {
Expand Down
38 changes: 10 additions & 28 deletions agent/configentry/merge_service_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
zapUpstreamId := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("zap", structs.DefaultEnterpriseMetaInDefaultPartition()),
}
tests := []struct {
name string
args args
Expand All @@ -286,10 +289,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
Expand Down Expand Up @@ -357,10 +357,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "grpc",
},
Expand Down Expand Up @@ -432,10 +429,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "grpc",
},
Expand Down Expand Up @@ -490,10 +484,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
Expand Down Expand Up @@ -550,10 +541,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
Expand Down Expand Up @@ -613,10 +601,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down Expand Up @@ -661,10 +646,7 @@ func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: zapUpstreamId,
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down
29 changes: 17 additions & 12 deletions agent/configentry/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func ComputeResolvedServiceConfig(
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
seenUpstreams := map[structs.PeeredServiceName]struct{}{}

var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
Expand All @@ -132,17 +132,20 @@ func ComputeResolvedServiceConfig(

// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
psn := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(sid.ID, &sid.EnterpriseMeta),
}
if _, ok := seenUpstreams[psn]; !ok {
seenUpstreams[psn] = struct{}{}
}
}

// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamOverrides = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamOverrides = make(map[structs.PeeredServiceName]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// resolvedConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
resolvedConfigs = make(map[structs.ServiceID]map[string]interface{})
resolvedConfigs = make(map[structs.PeeredServiceName]map[string]interface{})
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
Expand All @@ -156,8 +159,9 @@ func ComputeResolvedServiceConfig(
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamOverrides[override.ServiceID()] = override
psn := override.PeeredServiceName()
seenUpstreams[psn] = struct{}{}
upstreamOverrides[psn] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
Expand All @@ -175,7 +179,9 @@ func ComputeResolvedServiceConfig(
cfgMap["mesh_gateway"] = args.MeshGateway
}

wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
wildcard := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(structs.WildcardSpecifier, args.WithWildcardNamespace()),
}
resolvedConfigs[wildcard] = cfgMap
}
}
Expand All @@ -190,9 +196,7 @@ func ComputeResolvedServiceConfig(
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol

upstreamSvcDefaults := entries.GetServiceDefaults(
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
)
upstreamSvcDefaults := entries.GetServiceDefaults(upstream.ServiceName.ToServiceID())
if upstreamSvcDefaults != nil {
if upstreamSvcDefaults.Protocol != "" {
protocol = upstreamSvcDefaults.Protocol
Expand Down Expand Up @@ -249,12 +253,13 @@ func ComputeResolvedServiceConfig(
return &thisReply, nil
}

// TODO can we remove these legacy upstreams?
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})

for us, conf := range resolvedConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
thisReply.UpstreamConfigs[us.ServiceName.Name] = conf
}

} else {
Expand Down
17 changes: 10 additions & 7 deletions agent/configentry/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
ID: "sid",
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}
uid := structs.ServiceID{
ID: "upstream1",
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
uid := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("upstream1", acl.DefaultEnterpriseMeta()),
}

uids := []structs.ServiceID{uid.ServiceName.ToServiceID()}

wildcard := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(structs.WildcardSpecifier, acl.WildcardEnterpriseMeta()),
}
uids := []structs.ServiceID{uid}
wildcard := structs.NewServiceID(structs.WildcardSpecifier, acl.WildcardEnterpriseMeta())

localMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}
remoteMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}
Expand Down Expand Up @@ -361,7 +364,7 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
},
Overrides: []*structs.UpstreamConfig{
{
Name: uid.ID,
Name: uid.ServiceName.Name,
MeshGateway: remoteMeshGW, // applied 3rd
},
},
Expand Down Expand Up @@ -473,7 +476,7 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
require.NoError(t, err)
// This is needed because map iteration is random and determines the order of some outputs.
sort.Slice(got.UpstreamIDConfigs, func(i, j int) bool {
return got.UpstreamIDConfigs[i].Upstream.ID < got.UpstreamIDConfigs[j].Upstream.ID
return got.UpstreamIDConfigs[i].Upstream.ServiceName.Name < got.UpstreamIDConfigs[j].Upstream.ServiceName.Name
})
require.Equal(t, tt.want, got)
})
Expand Down
27 changes: 16 additions & 11 deletions agent/consul/config_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,9 +1227,15 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
}
t.Parallel()

mysql := structs.NewServiceID("mysql", structs.DefaultEnterpriseMetaInDefaultPartition())
cache := structs.NewServiceID("cache", structs.DefaultEnterpriseMetaInDefaultPartition())
wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition())
cache := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("cache", structs.DefaultEnterpriseMetaInDefaultPartition()),
}
mysql := structs.PeeredServiceName{
ServiceName: structs.NewServiceName("mysql", structs.DefaultEnterpriseMetaInDefaultPartition()),
}
wildcard := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition()),
}

tt := []struct {
name string
Expand Down Expand Up @@ -1306,7 +1312,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
Name: "api",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
cache,
cache.ServiceName.ToServiceID(),
},
},
expect: structs.ServiceConfigResponse{
Expand All @@ -1321,10 +1327,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
},
},
{
Upstream: structs.ServiceID{
ID: "mysql",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Upstream: mysql,
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down Expand Up @@ -1352,7 +1355,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
Mode: structs.MeshGatewayModeNone,
},
UpstreamIDs: []structs.ServiceID{
mysql,
mysql.ServiceName.ToServiceID(),
},
},
expect: structs.ServiceConfigResponse{
Expand Down Expand Up @@ -1421,7 +1424,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
Mode: structs.MeshGatewayModeNone,
},
UpstreamIDs: []structs.ServiceID{
mysql,
mysql.ServiceName.ToServiceID(),
},
},
expect: structs.ServiceConfigResponse{
Expand Down Expand Up @@ -1856,7 +1859,9 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
},
UpstreamIDConfigs: []structs.OpaqueUpstreamConfig{
{
Upstream: structs.NewServiceID("bar", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("bar", nil),
},
Config: map[string]interface{}{
"protocol": "http",
},
Expand Down
6 changes: 5 additions & 1 deletion agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,11 @@ func (s *Store) ReadResolvedServiceConfigEntries(
if override.Name == "" {
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
if override.Peer != "" {
continue // Peer services do not have service-defaults config entries to fetch.
}
sid := override.PeeredServiceName().ServiceName.ToServiceID()
seenUpstreams[sid] = struct{}{}
}
}

Expand Down
2 changes: 1 addition & 1 deletion agent/service_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
// learn about their configs.
for _, us := range ns.Proxy.Upstreams {
if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService {
sid := us.DestinationID()
sid := us.DestinationID().ServiceName.ToServiceID()
sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta)
upstreams = append(upstreams, sid)
}
Expand Down
12 changes: 9 additions & 3 deletions agent/service_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,9 @@ func TestServiceManager_PersistService_API(t *testing.T) {
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("redis", nil),
},
Config: map[string]interface{}{
"protocol": "tcp",
},
Expand Down Expand Up @@ -468,7 +470,9 @@ func TestServiceManager_PersistService_API(t *testing.T) {
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("redis", nil),
},
Config: map[string]interface{}{
"protocol": "tcp",
},
Expand Down Expand Up @@ -647,7 +651,9 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Upstream: structs.PeeredServiceName{
ServiceName: structs.NewServiceName("redis", nil),
},
Config: map[string]interface{}{
"protocol": "tcp",
},
Expand Down
Loading

0 comments on commit d1b1d17

Please sign in to comment.