Skip to content

Commit

Permalink
support new annotated format for upstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
ndhanushkodi committed May 25, 2022
1 parent 428e78c commit 2248101
Show file tree
Hide file tree
Showing 2 changed files with 359 additions and 63 deletions.
255 changes: 194 additions & 61 deletions control-plane/connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectinject
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"regexp"
Expand Down Expand Up @@ -834,6 +835,168 @@ func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNam
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace, MetaKeyManagedBy, managedByValue))
}

func processPreparedQueryUpstream(pod corev1.Pod, rawUpstream string) api.Upstream {
var preparedQuery string
var port int32
parts := strings.SplitN(rawUpstream, ":", 3)

port, _ = portValue(pod, strings.TrimSpace(parts[2]))
preparedQuery = strings.TrimSpace(parts[1])
var upstream api.Upstream
if port > 0 {

upstream = api.Upstream{
DestinationType: api.UpstreamDestTypePreparedQuery,
DestinationName: preparedQuery,
LocalBindPort: int(port),
}
}
return upstream
}

func (r *EndpointsController) processNonAnnotatedUpstream(pod corev1.Pod, rawUpstream string) (api.Upstream, error) {
var datacenter, serviceName, namespace, partition, peer string
var port int32
var upstream api.Upstream

parts := strings.SplitN(rawUpstream, ":", 3)

port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// If Consul Namespaces or Admin Partitions are enabled, attempt to parse the
// upstream for a namespace.
if r.EnableConsulNamespaces || r.EnableConsulPartitions {
pieces := strings.SplitN(parts[0], ".", 3)
switch len(pieces) {
case 3:
partition = strings.TrimSpace(pieces[2])
fallthrough
case 2:
namespace = strings.TrimSpace(pieces[1])
fallthrough
default:
serviceName = strings.TrimSpace(pieces[0])
}
} else {
serviceName = strings.TrimSpace(parts[0])
}

// parse the optional datacenter
if len(parts) > 2 {
datacenter = strings.TrimSpace(parts[2])

// Check if there's a proxy defaults config with mesh gateway
// mode set to local or remote. This helps users from
// accidentally forgetting to set a mesh gateway mode
// and then being confused as to why their traffic isn't
// routing.
entry, _, err := r.ConsulClient.ConfigEntries().Get(api.ProxyDefaults, api.ProxyConfigGlobal, nil)
if err != nil && strings.Contains(err.Error(), "Unexpected response code: 404") {
return api.Upstream{}, fmt.Errorf("upstream %q is invalid: there is no ProxyDefaults config to set mesh gateway mode", rawUpstream)
} else if err == nil {
mode := entry.(*api.ProxyConfigEntry).MeshGateway.Mode
if mode != api.MeshGatewayModeLocal && mode != api.MeshGatewayModeRemote {
return api.Upstream{}, fmt.Errorf("upstream %q is invalid: ProxyDefaults mesh gateway mode is neither %q nor %q", rawUpstream, api.MeshGatewayModeLocal, api.MeshGatewayModeRemote)
}
}
// NOTE: If we can't reach Consul we don't error out because
// that would fail the pod scheduling and this is a nice-to-have
// check, not something that should block during a Consul hiccup.
}
if port > 0 {
upstream = api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationPartition: partition,
DestinationPeer: peer,
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
}
}
return upstream, nil

}

func (r *EndpointsController) processAnnotatedUpstream(pod corev1.Pod, rawUpstream string) (api.Upstream, error) {
var datacenter, serviceName, namespace, partition, peer string
var port int32
var upstream api.Upstream

parts := strings.SplitN(rawUpstream, ":", 3)

port, _ = portValue(pod, strings.TrimSpace(parts[1]))

service := parts[0]

pieces := strings.Split(service, ".")

if r.EnableConsulNamespaces || r.EnableConsulPartitions {
switch len(pieces) {
case 6:
end := strings.TrimSpace(pieces[5])
switch end {
case "peer":
peer = strings.TrimSpace(pieces[4])
case "ap":
partition = strings.TrimSpace(pieces[4])
case "dc":
datacenter = strings.TrimSpace(pieces[4])
default:
return api.Upstream{}, errors.New(fmt.Sprintf("upstream structured incorrectly: %s", rawUpstream))
}
fallthrough
case 4:
if strings.TrimSpace(pieces[3]) == "ns" {
namespace = strings.TrimSpace(pieces[2])
} else {
return api.Upstream{}, errors.New(fmt.Sprintf("upstream structured incorrectly: %s", rawUpstream))
}
fallthrough
case 2:
if strings.TrimSpace(pieces[1]) == "svc" {
serviceName = strings.TrimSpace(pieces[0])
}
default:
return api.Upstream{}, errors.New(fmt.Sprintf("upstream structured incorrectly: %s", rawUpstream))
}

} else {
switch len(pieces) {
case 4:
end := strings.TrimSpace(pieces[3])
switch end {
case "peer":
peer = strings.TrimSpace(pieces[2])
case "dc":
datacenter = strings.TrimSpace(pieces[2])
default:
return api.Upstream{}, errors.New(fmt.Sprintf("upstream structured incorrectly: %s", rawUpstream))
}
fallthrough
case 2:
serviceName = strings.TrimSpace(pieces[0])
default:
return api.Upstream{}, errors.New(fmt.Sprintf("upstream structured incorrectly: %s", rawUpstream))
}

}

if port > 0 {
upstream = api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationPartition: partition,
DestinationPeer: peer,
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
}
}
return upstream, nil

}

// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream
// objects.
func (r *EndpointsController) processUpstreams(pod corev1.Pod, endpoints corev1.Endpoints) ([]api.Upstream, error) {
Expand All @@ -847,75 +1010,45 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod, endpoints corev1.
var upstreams []api.Upstream
if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" {
for _, raw := range strings.Split(raw, ",") {
parts := strings.SplitN(raw, ":", 3)
//var datacenter, serviceName, namespace, partition, peer string
//var port int32
var upstream api.Upstream

var datacenter, serviceName, preparedQuery, namespace, partition string
var port int32
if strings.TrimSpace(parts[0]) == "prepared_query" {
port, _ = portValue(pod, strings.TrimSpace(parts[2]))
preparedQuery = strings.TrimSpace(parts[1])
} else {
port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// If Consul Namespaces or Admin Partitions are enabled, attempt to parse the
// upstream for a namespace.
if r.EnableConsulNamespaces || r.EnableConsulPartitions {
pieces := strings.SplitN(parts[0], ".", 3)
switch len(pieces) {
case 3:
partition = strings.TrimSpace(pieces[2])
fallthrough
case 2:
namespace = strings.TrimSpace(pieces[1])
fallthrough
default:
serviceName = strings.TrimSpace(pieces[0])
}
} else {
serviceName = strings.TrimSpace(parts[0])
}
// parts separates out the port, and determines whether it's a prepared query or not, since parts[0] would
// be "prepared_query" if it is.
parts := strings.SplitN(raw, ":", 3)

// parse the optional datacenter
if len(parts) > 2 {
datacenter = strings.TrimSpace(parts[2])

// Check if there's a proxy defaults config with mesh gateway
// mode set to local or remote. This helps users from
// accidentally forgetting to set a mesh gateway mode
// and then being confused as to why their traffic isn't
// routing.
entry, _, err := r.ConsulClient.ConfigEntries().Get(api.ProxyDefaults, api.ProxyConfigGlobal, nil)
if err != nil && strings.Contains(err.Error(), "Unexpected response code: 404") {
return []api.Upstream{}, fmt.Errorf("upstream %q is invalid: there is no ProxyDefaults config to set mesh gateway mode", raw)
} else if err == nil {
mode := entry.(*api.ProxyConfigEntry).MeshGateway.Mode
if mode != api.MeshGatewayModeLocal && mode != api.MeshGatewayModeRemote {
return []api.Upstream{}, fmt.Errorf("upstream %q is invalid: ProxyDefaults mesh gateway mode is neither %q nor %q", raw, api.MeshGatewayModeLocal, api.MeshGatewayModeRemote)
}
}
// NOTE: If we can't reach Consul we don't error out because
// that would fail the pod scheduling and this is a nice-to-have
// check, not something that should block during a Consul hiccup.
// serviceParts helps determine which format of upstream we're processing,
// [service-name].[service-namespace].[service-partition]:[port]:[optional datacenter]
// or
// [service-name].svc.[service-namespace].ns.[service-peer].peer:[port]
// [service-name].svc.[service-namespace].ns.[service-partition].ap:[port]
// [service-name].svc.[service-namespace].ns.[service-datacenter].dc:[port]
annotatedFormat := false
serviceParts := strings.Split(parts[0], ".")
if len(serviceParts) >= 2 {
if serviceParts[1] == "svc" {
annotatedFormat = true
}
}

if port > 0 {
upstream := api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationPartition: partition,
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
if strings.TrimSpace(parts[0]) == "prepared_query" {
upstream = processPreparedQueryUpstream(pod, raw)
} else if annotatedFormat {
var err error
upstream, err = r.processAnnotatedUpstream(pod, raw)
if err != nil {
return []api.Upstream{}, err
}

if preparedQuery != "" {
upstream.DestinationType = api.UpstreamDestTypePreparedQuery
upstream.DestinationName = preparedQuery
} else {
var err error
upstream, err = r.processNonAnnotatedUpstream(pod, raw)
if err != nil {
return []api.Upstream{}, err
}

upstreams = append(upstreams, upstream)
}

upstreams = append(upstreams, upstream)
}
}

Expand Down
Loading

0 comments on commit 2248101

Please sign in to comment.