Skip to content

Commit

Permalink
Don't support syncing multiple addresses when k8s service has multipl…
Browse files Browse the repository at this point in the history
…e ports (#511)

Previously, we set a tagged address for each Kubernetes service port,
using the "virtual-" prefix for the tagged address key. For example,
if a service port is named "tcp", then we synced it to Consul with
the tagged address key "virtual-tcp".

However, since Consul doesn't really support multiple ports on a service yet,
this is unnecessary. So instead, we will find the service's target port
that is equal to the service port we are registering with Consul
and use that as the Port for the tagged address. Otherwise,
if we can't find the target port, we will set the tagged address's port to 0
(it will still have the cluster IP set).
  • Loading branch information
ishustava authored May 4, 2021
1 parent caa9f60 commit b6c0373
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ IMPROVEMENTS:
using this CRD but via annotations. [[GH-502](https://github.com/hashicorp/consul-k8s/pull/502)], [[GH-485](https://github.com/hashicorp/consul-k8s/pull/485)]
* CRDs: Update ProxyDefaults with Mode and TransparentProxy fields. Note: Mode and TransparentProxy should not be set
using the CRD but via annotations. [[GH-505](https://github.com/hashicorp/consul-k8s/pull/505)], [[GH-485](https://github.com/hashicorp/consul-k8s/pull/485)]
* Connect: No longer set multiple tagged addresses in Consul when k8s service has multiple ports and Transparent Proxy is enabled.
[[GH-511](https://github.com/hashicorp/consul-k8s/pull/511)]
* Connect: Allow exclusion of inbound ports, outbound ports and CIDRs, and additional user IDs when
Transparent Proxy is enabled. [[GH-506](https://github.com/hashicorp/consul-k8s/pull/506)]

Expand Down
59 changes: 46 additions & 13 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -32,6 +33,10 @@ const (
MetaKeyKubeNS = "k8s-namespace"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
envoyPrometheusBindAddr = "envoy_prometheus_bind_addr"

// clusterIPTaggedAddressName is the key for the tagged address to store the service's cluster IP and service port
// in Consul. Note: This value should not be changed without a corresponding change in Consul.
// TODO: change this to a constant shared with Consul to avoid accidentally changing this.
clusterIPTaggedAddressName = "virtual"
)

Expand Down Expand Up @@ -215,13 +220,14 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
// If a port is specified, then we determine the value of that port
// and register that port for the host service.
var servicePort int
// The handler will always set the port annotation if one is not provided on the pod.
var consulServicePort int
if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" {
if port, err := portValue(pod, raw); port > 0 {
if err != nil {
return nil, nil, err
}
servicePort = int(port)
consulServicePort = int(port)
}
}

Expand Down Expand Up @@ -259,7 +265,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
service := &api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Port: servicePort,
Port: consulServicePort,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: r.consulNamespace(pod.Namespace),
Expand Down Expand Up @@ -302,9 +308,9 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
proxyConfig.Config[envoyPrometheusBindAddr] = prometheusScrapeListener
}

if servicePort > 0 {
if consulServicePort > 0 {
proxyConfig.LocalServiceAddress = "127.0.0.1"
proxyConfig.LocalServicePort = servicePort
proxyConfig.LocalServicePort = consulServicePort
}

upstreams, err := r.processUpstreams(pod)
Expand Down Expand Up @@ -356,18 +362,45 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
parsedIP := net.ParseIP(k8sService.Spec.ClusterIP)
if parsedIP != nil {
taggedAddresses := make(map[string]api.ServiceAddress)
for _, servicePort := range k8sService.Spec.Ports {
taggedAddressKey := clusterIPTaggedAddressName
if servicePort.Name != "" {
taggedAddressKey = fmt.Sprintf("%s-%s", clusterIPTaggedAddressName, servicePort.Name)
}

taggedAddresses[taggedAddressKey] = api.ServiceAddress{
Address: k8sService.Spec.ClusterIP,
Port: int(servicePort.Port),
// When a service has multiple ports, we need to choose the port that is registered with Consul
// and only set that port as the tagged address because Consul currently does not support multiple port
// on a single service.
var k8sServicePort int32
for _, sp := range k8sService.Spec.Ports {
// If target port is a name, then we need to find the port value from the pod.
if sp.TargetPort.Type == intstr.String {
targetPortValue, err := portValue(pod, sp.TargetPort.StrVal)
if err != nil {
return nil, nil, err
}

// If the targetPortValue is the consulServicePort, then this is the service port we'll use as the tagged address.
if targetPortValue == int32(consulServicePort) {
k8sServicePort = sp.Port
break
}
} else if sp.TargetPort.Type == intstr.Int && sp.TargetPort.IntVal != 0 {
// If the target port is a non-zero int, we can compare that port directly with the Consul service port.
if sp.TargetPort.IntVal == int32(consulServicePort) {
k8sServicePort = sp.Port
break
}
} else {
// If targetPort is not specified, then the service port is used as the target port,
// and we can compare the service port with the Consul service port.
if sp.Port == int32(consulServicePort) {
k8sServicePort = sp.Port
break
}
}
}

taggedAddresses[clusterIPTaggedAddressName] = api.ServiceAddress{
Address: k8sService.Spec.ClusterIP,
Port: int(k8sServicePort),
}

service.TaggedAddresses = taggedAddresses
proxyService.TaggedAddresses = taggedAddresses

Expand Down
114 changes: 92 additions & 22 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -2590,7 +2591,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2599,7 +2600,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand Down Expand Up @@ -2637,7 +2638,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2646,7 +2647,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand Down Expand Up @@ -2705,7 +2706,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2714,7 +2715,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand All @@ -2729,7 +2730,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
proxyMode: api.ProxyModeDefault,
expErr: "services \"test-service\" not found",
},
"service with a single port without a name": {
"service with a single port without a target port": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -2740,7 +2741,33 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 8081,
},
},
expErr: "",
},
"service with a single port and a target port that is a port name": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.Parse("tcp"),
},
},
},
Expand All @@ -2754,7 +2781,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
},
expErr: "",
},
"service with a single port with a name": {
"service with a single port and a target port that is a int": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -2765,15 +2792,15 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Name: "tcp",
Port: 80,
Port: 80,
TargetPort: intstr.FromInt(8081),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual-tcp": {
"virtual": {
Address: "10.0.0.1",
Port: 80,
},
Expand All @@ -2791,25 +2818,52 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Name: "tcp",
Port: 80,
Name: "tcp",
Port: 80,
TargetPort: intstr.FromString("tcp"),
},
{
Name: "http",
Port: 8080,
Name: "http",
Port: 81,
TargetPort: intstr.FromString("http"),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual-tcp": {
"virtual": {
Address: "10.0.0.1",
Port: 80,
},
"virtual-http": {
},
expErr: "",
},
// When target port is not equal to the port we're registering with Consul,
// then we want to register the zero-value for the port. This could happen
// for client services that don't have a container port that they're listening on.
"target port is not found": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.Parse("http"),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 8080,
Port: 0,
},
},
expErr: "",
Expand Down Expand Up @@ -2885,7 +2939,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "2001:db8::68",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2894,7 +2948,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "2001:db8::68",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand All @@ -2903,10 +2957,26 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *

for name, c := range cases {
t.Run(name, func(t *testing.T) {
pod := createPod("test-pod-1", "1.2.3.4", false)
pod := createPod("test-pod-1", "1.2.3.4", true)
if c.annotationEnabled != nil {
pod.Annotations[annotationTransparentProxy] = strconv.FormatBool(*c.annotationEnabled)
}
pod.Spec.Containers = []corev1.Container{
{
Name: "test",
Ports: []corev1.ContainerPort{
{
Name: "tcp",
ContainerPort: 8081,
},
{
Name: "http",
ContainerPort: 8080,
},
},
},
}
pod.Annotations[annotationPort] = "tcp"
endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Expand Down

0 comments on commit b6c0373

Please sign in to comment.