diff --git a/control-plane/connect-inject/endpoints_controller.go b/control-plane/connect-inject/endpoints_controller.go index b32b401922..2fdd912b16 100644 --- a/control-plane/connect-inject/endpoints_controller.go +++ b/control-plane/connect-inject/endpoints_controller.go @@ -834,6 +834,176 @@ func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNam MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace, MetaKeyManagedBy, managedByValue)) } +// processPreparedQueryUpstream processes an upstream in the format: +// prepared_query:[query name]:[port]. +func processPreparedQueryUpstream(pod corev1.Pod, rawUpstream string) api.Upstream { + var preparedQuery string + var port int32 + parts := strings.SplitN(rawUpstream, ":", 3) + + port, _ = portValue(pod, strings.TrimSpace(parts[2])) + preparedQuery = strings.TrimSpace(parts[1]) + var upstream api.Upstream + if port > 0 { + + upstream = api.Upstream{ + DestinationType: api.UpstreamDestTypePreparedQuery, + DestinationName: preparedQuery, + LocalBindPort: int(port), + } + } + return upstream +} + +// processNonAnnotatedUpstream processes an upstream in the format: +// [service-name].[service-namespace].[service-partition]:[port]:[optional datacenter]. +func (r *EndpointsController) processNonAnnotatedUpstream(pod corev1.Pod, rawUpstream string) (api.Upstream, error) { + var datacenter, serviceName, namespace, partition, peer string + var port int32 + var upstream api.Upstream + + parts := strings.SplitN(rawUpstream, ":", 3) + + port, _ = portValue(pod, strings.TrimSpace(parts[1])) + + // If Consul Namespaces or Admin Partitions are enabled, attempt to parse the + // upstream for a namespace. + if r.EnableConsulNamespaces || r.EnableConsulPartitions { + pieces := strings.SplitN(parts[0], ".", 3) + switch len(pieces) { + case 3: + partition = strings.TrimSpace(pieces[2]) + fallthrough + case 2: + namespace = strings.TrimSpace(pieces[1]) + fallthrough + default: + serviceName = strings.TrimSpace(pieces[0]) + } + } else { + serviceName = strings.TrimSpace(parts[0]) + } + + // parse the optional datacenter + if len(parts) > 2 { + datacenter = strings.TrimSpace(parts[2]) + + // Check if there's a proxy defaults config with mesh gateway + // mode set to local or remote. This helps users from + // accidentally forgetting to set a mesh gateway mode + // and then being confused as to why their traffic isn't + // routing. + entry, _, err := r.ConsulClient.ConfigEntries().Get(api.ProxyDefaults, api.ProxyConfigGlobal, nil) + if err != nil && strings.Contains(err.Error(), "Unexpected response code: 404") { + return api.Upstream{}, fmt.Errorf("upstream %q is invalid: there is no ProxyDefaults config to set mesh gateway mode", rawUpstream) + } else if err == nil { + mode := entry.(*api.ProxyConfigEntry).MeshGateway.Mode + if mode != api.MeshGatewayModeLocal && mode != api.MeshGatewayModeRemote { + return api.Upstream{}, fmt.Errorf("upstream %q is invalid: ProxyDefaults mesh gateway mode is neither %q nor %q", rawUpstream, api.MeshGatewayModeLocal, api.MeshGatewayModeRemote) + } + } + // NOTE: If we can't reach Consul we don't error out because + // that would fail the pod scheduling and this is a nice-to-have + // check, not something that should block during a Consul hiccup. + } + if port > 0 { + upstream = api.Upstream{ + DestinationType: api.UpstreamDestTypeService, + DestinationPartition: partition, + DestinationPeer: peer, + DestinationNamespace: namespace, + DestinationName: serviceName, + Datacenter: datacenter, + LocalBindPort: int(port), + } + } + return upstream, nil + +} + +// processAnnotatedUpstream processes an upstream in the format: +// [service-name].svc.[service-namespace].ns.[service-peer].peer:[port] +// [service-name].svc.[service-namespace].ns.[service-partition].ap:[port] +// [service-name].svc.[service-namespace].ns.[service-datacenter].dc:[port]. +func (r *EndpointsController) processAnnotatedUpstream(pod corev1.Pod, rawUpstream string) (api.Upstream, error) { + var datacenter, serviceName, namespace, partition, peer string + var port int32 + var upstream api.Upstream + + parts := strings.SplitN(rawUpstream, ":", 3) + + port, _ = portValue(pod, strings.TrimSpace(parts[1])) + + service := parts[0] + + pieces := strings.Split(service, ".") + + if r.EnableConsulNamespaces || r.EnableConsulPartitions { + switch len(pieces) { + case 6: + end := strings.TrimSpace(pieces[5]) + switch end { + case "peer": + peer = strings.TrimSpace(pieces[4]) + case "ap": + partition = strings.TrimSpace(pieces[4]) + case "dc": + datacenter = strings.TrimSpace(pieces[4]) + default: + return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + } + fallthrough + case 4: + if strings.TrimSpace(pieces[3]) == "ns" { + namespace = strings.TrimSpace(pieces[2]) + } else { + return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + } + fallthrough + case 2: + if strings.TrimSpace(pieces[1]) == "svc" { + serviceName = strings.TrimSpace(pieces[0]) + } + default: + return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + } + + } else { + switch len(pieces) { + case 4: + end := strings.TrimSpace(pieces[3]) + switch end { + case "peer": + peer = strings.TrimSpace(pieces[2]) + case "dc": + datacenter = strings.TrimSpace(pieces[2]) + default: + return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + } + fallthrough + case 2: + serviceName = strings.TrimSpace(pieces[0]) + default: + return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream) + } + + } + + if port > 0 { + upstream = api.Upstream{ + DestinationType: api.UpstreamDestTypeService, + DestinationPartition: partition, + DestinationPeer: peer, + DestinationNamespace: namespace, + DestinationName: serviceName, + Datacenter: datacenter, + LocalBindPort: int(port), + } + } + return upstream, nil + +} + // processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream // objects. func (r *EndpointsController) processUpstreams(pod corev1.Pod, endpoints corev1.Endpoints) ([]api.Upstream, error) { @@ -847,75 +1017,45 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod, endpoints corev1. var upstreams []api.Upstream if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" { for _, raw := range strings.Split(raw, ",") { - parts := strings.SplitN(raw, ":", 3) + //var datacenter, serviceName, namespace, partition, peer string + //var port int32 + var upstream api.Upstream - var datacenter, serviceName, preparedQuery, namespace, partition string - var port int32 - if strings.TrimSpace(parts[0]) == "prepared_query" { - port, _ = portValue(pod, strings.TrimSpace(parts[2])) - preparedQuery = strings.TrimSpace(parts[1]) - } else { - port, _ = portValue(pod, strings.TrimSpace(parts[1])) - - // If Consul Namespaces or Admin Partitions are enabled, attempt to parse the - // upstream for a namespace. - if r.EnableConsulNamespaces || r.EnableConsulPartitions { - pieces := strings.SplitN(parts[0], ".", 3) - switch len(pieces) { - case 3: - partition = strings.TrimSpace(pieces[2]) - fallthrough - case 2: - namespace = strings.TrimSpace(pieces[1]) - fallthrough - default: - serviceName = strings.TrimSpace(pieces[0]) - } - } else { - serviceName = strings.TrimSpace(parts[0]) - } + // parts separates out the port, and determines whether it's a prepared query or not, since parts[0] would + // be "prepared_query" if it is. + parts := strings.SplitN(raw, ":", 3) - // parse the optional datacenter - if len(parts) > 2 { - datacenter = strings.TrimSpace(parts[2]) - - // Check if there's a proxy defaults config with mesh gateway - // mode set to local or remote. This helps users from - // accidentally forgetting to set a mesh gateway mode - // and then being confused as to why their traffic isn't - // routing. - entry, _, err := r.ConsulClient.ConfigEntries().Get(api.ProxyDefaults, api.ProxyConfigGlobal, nil) - if err != nil && strings.Contains(err.Error(), "Unexpected response code: 404") { - return []api.Upstream{}, fmt.Errorf("upstream %q is invalid: there is no ProxyDefaults config to set mesh gateway mode", raw) - } else if err == nil { - mode := entry.(*api.ProxyConfigEntry).MeshGateway.Mode - if mode != api.MeshGatewayModeLocal && mode != api.MeshGatewayModeRemote { - return []api.Upstream{}, fmt.Errorf("upstream %q is invalid: ProxyDefaults mesh gateway mode is neither %q nor %q", raw, api.MeshGatewayModeLocal, api.MeshGatewayModeRemote) - } - } - // NOTE: If we can't reach Consul we don't error out because - // that would fail the pod scheduling and this is a nice-to-have - // check, not something that should block during a Consul hiccup. + // serviceParts helps determine which format of upstream we're processing, + // [service-name].[service-namespace].[service-partition]:[port]:[optional datacenter] + // or + // [service-name].svc.[service-namespace].ns.[service-peer].peer:[port] + // [service-name].svc.[service-namespace].ns.[service-partition].ap:[port] + // [service-name].svc.[service-namespace].ns.[service-datacenter].dc:[port] + annotatedFormat := false + serviceParts := strings.Split(parts[0], ".") + if len(serviceParts) >= 2 { + if serviceParts[1] == "svc" { + annotatedFormat = true } } - if port > 0 { - upstream := api.Upstream{ - DestinationType: api.UpstreamDestTypeService, - DestinationPartition: partition, - DestinationNamespace: namespace, - DestinationName: serviceName, - Datacenter: datacenter, - LocalBindPort: int(port), + if strings.TrimSpace(parts[0]) == "prepared_query" { + upstream = processPreparedQueryUpstream(pod, raw) + } else if annotatedFormat { + var err error + upstream, err = r.processAnnotatedUpstream(pod, raw) + if err != nil { + return []api.Upstream{}, err } - - if preparedQuery != "" { - upstream.DestinationType = api.UpstreamDestTypePreparedQuery - upstream.DestinationName = preparedQuery + } else { + var err error + upstream, err = r.processNonAnnotatedUpstream(pod, raw) + if err != nil { + return []api.Upstream{}, err } - - upstreams = append(upstreams, upstream) } + + upstreams = append(upstreams, upstream) } } diff --git a/control-plane/connect-inject/endpoints_controller_test.go b/control-plane/connect-inject/endpoints_controller_test.go index 4fa4e7453b..aaac91547c 100644 --- a/control-plane/connect-inject/endpoints_controller_test.go +++ b/control-plane/connect-inject/endpoints_controller_test.go @@ -202,6 +202,219 @@ func TestProcessUpstreams(t *testing.T) { consulNamespacesEnabled bool consulPartitionsEnabled bool }{ + { + name: "annotated upstream with svc only", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + LocalBindPort: 1234, + }, + }, + consulNamespacesEnabled: false, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream with svc and dc", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.dc1.dc:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + Datacenter: "dc1", + LocalBindPort: 1234, + }, + }, + consulNamespacesEnabled: false, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream with svc and peer", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.peer1.peer:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + DestinationPeer: "peer1", + LocalBindPort: 1234, + }, + }, + consulNamespacesEnabled: false, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream with svc and peer, needs ns before peer if namespaces enabled", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.peer1.peer:1234" + return pod1 + }, + expErr: "upstream structured incorrectly: upstream1.svc.peer1.peer:1234", + consulNamespacesEnabled: true, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream with svc, ns, and peer", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.ns1.ns.peer1.peer:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + DestinationPeer: "peer1", + DestinationNamespace: "ns1", + LocalBindPort: 1234, + }, + }, + consulNamespacesEnabled: true, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream with svc, ns, and partition", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.ns1.ns.part1.ap:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + DestinationPartition: "part1", + DestinationNamespace: "ns1", + LocalBindPort: 1234, + }, + }, + consulNamespacesEnabled: true, + consulPartitionsEnabled: true, + }, + { + name: "annotated upstream with svc, ns, and dc", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.ns1.ns.dc1.dc:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + Datacenter: "dc1", + DestinationNamespace: "ns1", + LocalBindPort: 1234, + }, + }, + consulNamespacesEnabled: true, + consulPartitionsEnabled: false, + }, + { + name: "multiple annotated upstreams", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.ns1.ns.dc1.dc:1234, upstream2.svc:2234, upstream3.svc.ns1.ns:3234, upstream4.svc.ns1.ns.peer1.peer:4234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + Datacenter: "dc1", + DestinationNamespace: "ns1", + LocalBindPort: 1234, + }, + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream2", + LocalBindPort: 2234, + }, + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream3", + DestinationNamespace: "ns1", + LocalBindPort: 3234, + }, + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream4", + DestinationNamespace: "ns1", + DestinationPeer: "peer1", + LocalBindPort: 4234, + }, + }, + consulNamespacesEnabled: true, + consulPartitionsEnabled: true, + }, + { + name: "annotated upstream error: invalid partition/dc/peer", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.ns1.ns.part1.err:1234" + return pod1 + }, + expErr: "upstream structured incorrectly: upstream1.svc.ns1.ns.part1.err:1234", + consulNamespacesEnabled: true, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream error: invalid namespace", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.ns1.err:1234" + return pod1 + }, + expErr: "upstream structured incorrectly: upstream1.svc.ns1.err:1234", + consulNamespacesEnabled: true, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream error: invalid number of pieces in the address", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.err:1234" + return pod1 + }, + expErr: "upstream structured incorrectly: upstream1.svc.err:1234", + consulNamespacesEnabled: true, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream error: invalid peer", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.peer1.err:1234" + return pod1 + }, + expErr: "upstream structured incorrectly: upstream1.svc.peer1.err:1234", + consulNamespacesEnabled: false, + consulPartitionsEnabled: false, + }, + { + name: "annotated upstream error: invalid number of pieces in the address without namespaces and partitions", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationUpstreams] = "upstream1.svc.err:1234" + return pod1 + }, + expErr: "upstream structured incorrectly: upstream1.svc.err:1234", + consulNamespacesEnabled: false, + consulPartitionsEnabled: false, + }, { name: "upstream with datacenter without ProxyDefaults", pod: func() *corev1.Pod { @@ -468,10 +681,10 @@ func TestProcessUpstreams(t *testing.T) { consulPartitionsEnabled: false, }, { - name: "prepared query and non-query upstreams", + name: "prepared query and non-query upstreams and annotated non-query upstreams", pod: func() *corev1.Pod { pod1 := createPod("pod1", "1.2.3.4", true, true) - pod1.Annotations[annotationUpstreams] = "prepared_query:queryname:1234, upstream1:2234, prepared_query:6687bd19-5654-76be-d764:8202" + pod1.Annotations[annotationUpstreams] = "prepared_query:queryname:1234, upstream1:2234, prepared_query:6687bd19-5654-76be-d764:8202, upstream2.svc:3234" return pod1 }, expected: []api.Upstream{ @@ -490,6 +703,11 @@ func TestProcessUpstreams(t *testing.T) { DestinationName: "6687bd19-5654-76be-d764", LocalBindPort: 8202, }, + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream2", + LocalBindPort: 3234, + }, }, consulNamespacesEnabled: false, consulPartitionsEnabled: false,