Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new annotated format for upstreams #1237

Merged
merged 3 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 201 additions & 61 deletions control-plane/connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,176 @@ func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNam
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace, MetaKeyManagedBy, managedByValue))
}

// processPreparedQueryUpstream processes an upstream in the format:
// prepared_query:[query name]:[port].
func processPreparedQueryUpstream(pod corev1.Pod, rawUpstream string) api.Upstream {
var preparedQuery string
var port int32
parts := strings.SplitN(rawUpstream, ":", 3)

port, _ = portValue(pod, strings.TrimSpace(parts[2]))
preparedQuery = strings.TrimSpace(parts[1])
var upstream api.Upstream
if port > 0 {

upstream = api.Upstream{
DestinationType: api.UpstreamDestTypePreparedQuery,
DestinationName: preparedQuery,
LocalBindPort: int(port),
}
}
return upstream
}

// processNonAnnotatedUpstream processes an upstream in the format:
// [service-name].[service-namespace].[service-partition]:[port]:[optional datacenter].
func (r *EndpointsController) processNonAnnotatedUpstream(pod corev1.Pod, rawUpstream string) (api.Upstream, error) {
var datacenter, serviceName, namespace, partition, peer string
var port int32
var upstream api.Upstream

parts := strings.SplitN(rawUpstream, ":", 3)

port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// If Consul Namespaces or Admin Partitions are enabled, attempt to parse the
// upstream for a namespace.
if r.EnableConsulNamespaces || r.EnableConsulPartitions {
pieces := strings.SplitN(parts[0], ".", 3)
switch len(pieces) {
case 3:
partition = strings.TrimSpace(pieces[2])
fallthrough
case 2:
namespace = strings.TrimSpace(pieces[1])
fallthrough
default:
serviceName = strings.TrimSpace(pieces[0])
}
} else {
serviceName = strings.TrimSpace(parts[0])
}

// parse the optional datacenter
if len(parts) > 2 {
datacenter = strings.TrimSpace(parts[2])

// Check if there's a proxy defaults config with mesh gateway
// mode set to local or remote. This helps users from
// accidentally forgetting to set a mesh gateway mode
// and then being confused as to why their traffic isn't
// routing.
entry, _, err := r.ConsulClient.ConfigEntries().Get(api.ProxyDefaults, api.ProxyConfigGlobal, nil)
if err != nil && strings.Contains(err.Error(), "Unexpected response code: 404") {
return api.Upstream{}, fmt.Errorf("upstream %q is invalid: there is no ProxyDefaults config to set mesh gateway mode", rawUpstream)
} else if err == nil {
mode := entry.(*api.ProxyConfigEntry).MeshGateway.Mode
if mode != api.MeshGatewayModeLocal && mode != api.MeshGatewayModeRemote {
return api.Upstream{}, fmt.Errorf("upstream %q is invalid: ProxyDefaults mesh gateway mode is neither %q nor %q", rawUpstream, api.MeshGatewayModeLocal, api.MeshGatewayModeRemote)
}
}
// NOTE: If we can't reach Consul we don't error out because
// that would fail the pod scheduling and this is a nice-to-have
// check, not something that should block during a Consul hiccup.
}
if port > 0 {
upstream = api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationPartition: partition,
DestinationPeer: peer,
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
}
}
return upstream, nil

}

// processAnnotatedUpstream processes an upstream in the format:
// [service-name].svc.[service-namespace].ns.[service-peer].peer:[port]
// [service-name].svc.[service-namespace].ns.[service-partition].ap:[port]
// [service-name].svc.[service-namespace].ns.[service-datacenter].dc:[port].
func (r *EndpointsController) processAnnotatedUpstream(pod corev1.Pod, rawUpstream string) (api.Upstream, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we copy the comment where you have described the formatting of the annotated upstream and add that above each of these method to describe the "shape" of the upstream that they will each process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup added!

var datacenter, serviceName, namespace, partition, peer string
var port int32
var upstream api.Upstream

parts := strings.SplitN(rawUpstream, ":", 3)

port, _ = portValue(pod, strings.TrimSpace(parts[1]))

service := parts[0]

pieces := strings.Split(service, ".")

if r.EnableConsulNamespaces || r.EnableConsulPartitions {
switch len(pieces) {
case 6:
end := strings.TrimSpace(pieces[5])
switch end {
case "peer":
peer = strings.TrimSpace(pieces[4])
case "ap":
partition = strings.TrimSpace(pieces[4])
case "dc":
datacenter = strings.TrimSpace(pieces[4])
default:
return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
}
fallthrough
case 4:
if strings.TrimSpace(pieces[3]) == "ns" {
namespace = strings.TrimSpace(pieces[2])
} else {
return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
}
fallthrough
case 2:
if strings.TrimSpace(pieces[1]) == "svc" {
serviceName = strings.TrimSpace(pieces[0])
}
default:
return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
}

} else {
switch len(pieces) {
case 4:
end := strings.TrimSpace(pieces[3])
switch end {
case "peer":
peer = strings.TrimSpace(pieces[2])
case "dc":
datacenter = strings.TrimSpace(pieces[2])
default:
return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
}
fallthrough
case 2:
serviceName = strings.TrimSpace(pieces[0])
default:
return api.Upstream{}, fmt.Errorf("upstream structured incorrectly: %s", rawUpstream)
}

}

if port > 0 {
upstream = api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationPartition: partition,
DestinationPeer: peer,
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
}
}
return upstream, nil

}

// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream
// objects.
func (r *EndpointsController) processUpstreams(pod corev1.Pod, endpoints corev1.Endpoints) ([]api.Upstream, error) {
Expand All @@ -847,75 +1017,45 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod, endpoints corev1.
var upstreams []api.Upstream
if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" {
for _, raw := range strings.Split(raw, ",") {
parts := strings.SplitN(raw, ":", 3)
//var datacenter, serviceName, namespace, partition, peer string
//var port int32
var upstream api.Upstream

var datacenter, serviceName, preparedQuery, namespace, partition string
var port int32
if strings.TrimSpace(parts[0]) == "prepared_query" {
port, _ = portValue(pod, strings.TrimSpace(parts[2]))
preparedQuery = strings.TrimSpace(parts[1])
} else {
port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// If Consul Namespaces or Admin Partitions are enabled, attempt to parse the
// upstream for a namespace.
if r.EnableConsulNamespaces || r.EnableConsulPartitions {
pieces := strings.SplitN(parts[0], ".", 3)
switch len(pieces) {
case 3:
partition = strings.TrimSpace(pieces[2])
fallthrough
case 2:
namespace = strings.TrimSpace(pieces[1])
fallthrough
default:
serviceName = strings.TrimSpace(pieces[0])
}
} else {
serviceName = strings.TrimSpace(parts[0])
}
// parts separates out the port, and determines whether it's a prepared query or not, since parts[0] would
// be "prepared_query" if it is.
parts := strings.SplitN(raw, ":", 3)

// parse the optional datacenter
if len(parts) > 2 {
datacenter = strings.TrimSpace(parts[2])

// Check if there's a proxy defaults config with mesh gateway
// mode set to local or remote. This helps users from
// accidentally forgetting to set a mesh gateway mode
// and then being confused as to why their traffic isn't
// routing.
entry, _, err := r.ConsulClient.ConfigEntries().Get(api.ProxyDefaults, api.ProxyConfigGlobal, nil)
if err != nil && strings.Contains(err.Error(), "Unexpected response code: 404") {
return []api.Upstream{}, fmt.Errorf("upstream %q is invalid: there is no ProxyDefaults config to set mesh gateway mode", raw)
} else if err == nil {
mode := entry.(*api.ProxyConfigEntry).MeshGateway.Mode
if mode != api.MeshGatewayModeLocal && mode != api.MeshGatewayModeRemote {
return []api.Upstream{}, fmt.Errorf("upstream %q is invalid: ProxyDefaults mesh gateway mode is neither %q nor %q", raw, api.MeshGatewayModeLocal, api.MeshGatewayModeRemote)
}
}
// NOTE: If we can't reach Consul we don't error out because
// that would fail the pod scheduling and this is a nice-to-have
// check, not something that should block during a Consul hiccup.
// serviceParts helps determine which format of upstream we're processing,
// [service-name].[service-namespace].[service-partition]:[port]:[optional datacenter]
// or
// [service-name].svc.[service-namespace].ns.[service-peer].peer:[port]
// [service-name].svc.[service-namespace].ns.[service-partition].ap:[port]
// [service-name].svc.[service-namespace].ns.[service-datacenter].dc:[port]
annotatedFormat := false
serviceParts := strings.Split(parts[0], ".")
if len(serviceParts) >= 2 {
if serviceParts[1] == "svc" {
annotatedFormat = true
}
}

if port > 0 {
upstream := api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationPartition: partition,
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
if strings.TrimSpace(parts[0]) == "prepared_query" {
upstream = processPreparedQueryUpstream(pod, raw)
} else if annotatedFormat {
var err error
upstream, err = r.processAnnotatedUpstream(pod, raw)
if err != nil {
return []api.Upstream{}, err
}

if preparedQuery != "" {
upstream.DestinationType = api.UpstreamDestTypePreparedQuery
upstream.DestinationName = preparedQuery
} else {
var err error
upstream, err = r.processNonAnnotatedUpstream(pod, raw)
if err != nil {
return []api.Upstream{}, err
}

upstreams = append(upstreams, upstream)
}

upstreams = append(upstreams, upstream)
}
}

Expand Down
Loading