Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't support syncing multiple addresses when k8s service has multiple ports #511

Merged
merged 3 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ IMPROVEMENTS:
using this CRD but via annotations. [[GH-502](https://github.com/hashicorp/consul-k8s/pull/502)], [[GH-485](https://github.com/hashicorp/consul-k8s/pull/485)]
* CRDs: Update ProxyDefaults with Mode and TransparentProxy fields. Note: Mode and TransparentProxy should not be set
using the CRD but via annotations. [[GH-505](https://github.com/hashicorp/consul-k8s/pull/505)], [[GH-485](https://github.com/hashicorp/consul-k8s/pull/485)]
* Connect: No longer set multiple tagged addresses in Consul when k8s service has multiple ports and Transparent Proxy is enabled.
[[GH-511](https://github.com/hashicorp/consul-k8s/pull/511)]
* Connect: Allow exclusion of inbound ports, outbound ports and CIDRs, and additional user IDs when
Transparent Proxy is enabled. [[GH-506](https://github.com/hashicorp/consul-k8s/pull/506)]

Expand Down
59 changes: 46 additions & 13 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -32,6 +33,10 @@ const (
MetaKeyKubeNS = "k8s-namespace"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
envoyPrometheusBindAddr = "envoy_prometheus_bind_addr"

// clusterIPTaggedAddressName is the key for the tagged address to store the service's cluster IP and service port
// in Consul. Note: This value should not be changed without a corresponding change in Consul.
// TODO: change this to a constant shared with Consul to avoid accidentally changing this.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will update it at a future time.

clusterIPTaggedAddressName = "virtual"
)

Expand Down Expand Up @@ -215,13 +220,14 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
// If a port is specified, then we determine the value of that port
// and register that port for the host service.
var servicePort int
// The handler will always set the port annotation if one is not provided on the pod.
var consulServicePort int
if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" {
if port, err := portValue(pod, raw); port > 0 {
if err != nil {
return nil, nil, err
}
servicePort = int(port)
consulServicePort = int(port)
}
}

Expand Down Expand Up @@ -259,7 +265,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
service := &api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Port: servicePort,
Port: consulServicePort,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: r.consulNamespace(pod.Namespace),
Expand Down Expand Up @@ -302,9 +308,9 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
proxyConfig.Config[envoyPrometheusBindAddr] = prometheusScrapeListener
}

if servicePort > 0 {
if consulServicePort > 0 {
proxyConfig.LocalServiceAddress = "127.0.0.1"
proxyConfig.LocalServicePort = servicePort
proxyConfig.LocalServicePort = consulServicePort
}

upstreams, err := r.processUpstreams(pod)
Expand Down Expand Up @@ -356,18 +362,45 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
parsedIP := net.ParseIP(k8sService.Spec.ClusterIP)
if parsedIP != nil {
taggedAddresses := make(map[string]api.ServiceAddress)
for _, servicePort := range k8sService.Spec.Ports {
taggedAddressKey := clusterIPTaggedAddressName
if servicePort.Name != "" {
taggedAddressKey = fmt.Sprintf("%s-%s", clusterIPTaggedAddressName, servicePort.Name)
}

taggedAddresses[taggedAddressKey] = api.ServiceAddress{
Address: k8sService.Spec.ClusterIP,
Port: int(servicePort.Port),
// When a service has multiple ports, we need to choose the port that is registered with Consul
// and only set that port as the tagged address because Consul currently does not support multiple port
// on a single service.
var k8sServicePort int32
for _, sp := range k8sService.Spec.Ports {
// If target port is a name, then we need to find the port value from the pod.
if sp.TargetPort.Type == intstr.String {
targetPortValue, err := portValue(pod, sp.TargetPort.StrVal)
if err != nil {
return nil, nil, err
}

// If the targetPortValue is the consulServicePort, then this is the service port we'll use as the tagged address.
if targetPortValue == int32(consulServicePort) {
k8sServicePort = sp.Port
break
}
} else if sp.TargetPort.Type == intstr.Int && sp.TargetPort.IntVal != 0 {
// If the target port is a non-zero int, we can compare that port directly with the Consul service port.
if sp.TargetPort.IntVal == int32(consulServicePort) {
k8sServicePort = sp.Port
break
}
} else {
// If targetPort is not specified, then the service port is used as the target port,
// and we can compare the service port with the Consul service port.
if sp.Port == int32(consulServicePort) {
k8sServicePort = sp.Port
break
}
}
}

taggedAddresses[clusterIPTaggedAddressName] = api.ServiceAddress{
Address: k8sService.Spec.ClusterIP,
Port: int(k8sServicePort),
}

service.TaggedAddresses = taggedAddresses
proxyService.TaggedAddresses = taggedAddresses

Expand Down
114 changes: 92 additions & 22 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -2590,7 +2591,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2599,7 +2600,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand Down Expand Up @@ -2637,7 +2638,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2646,7 +2647,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand Down Expand Up @@ -2705,7 +2706,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2714,7 +2715,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand All @@ -2729,7 +2730,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
proxyMode: api.ProxyModeDefault,
expErr: "services \"test-service\" not found",
},
"service with a single port without a name": {
"service with a single port without a target port": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -2740,7 +2741,33 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 8081,
},
},
expErr: "",
},
"service with a single port and a target port that is a port name": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.Parse("tcp"),
},
},
},
Expand All @@ -2754,7 +2781,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
},
expErr: "",
},
"service with a single port with a name": {
"service with a single port and a target port that is a int": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -2765,15 +2792,15 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Name: "tcp",
Port: 80,
Port: 80,
TargetPort: intstr.FromInt(8081),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual-tcp": {
"virtual": {
Address: "10.0.0.1",
Port: 80,
},
Expand All @@ -2791,25 +2818,52 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Name: "tcp",
Port: 80,
Name: "tcp",
Port: 80,
TargetPort: intstr.FromString("tcp"),
},
{
Name: "http",
Port: 8080,
Name: "http",
Port: 81,
TargetPort: intstr.FromString("http"),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual-tcp": {
"virtual": {
Address: "10.0.0.1",
Port: 80,
},
"virtual-http": {
},
expErr: "",
},
// When target port is not equal to the port we're registering with Consul,
// then we want to register the zero-value for the port. This could happen
// for client services that don't have a container port that they're listening on.
"target port is not found": {
globalEnabled: true,
service: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.Parse("http"),
},
},
},
},
proxyMode: api.ProxyModeTransparent,
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "10.0.0.1",
Port: 8080,
Port: 0,
},
},
expErr: "",
Expand Down Expand Up @@ -2885,7 +2939,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
ClusterIP: "2001:db8::68",
Ports: []corev1.ServicePort{
{
Port: 80,
Port: 8081,
},
},
},
Expand All @@ -2894,7 +2948,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *
expTaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "2001:db8::68",
Port: 80,
Port: 8081,
},
},
expErr: "",
Expand All @@ -2903,10 +2957,26 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *

for name, c := range cases {
t.Run(name, func(t *testing.T) {
pod := createPod("test-pod-1", "1.2.3.4", false)
pod := createPod("test-pod-1", "1.2.3.4", true)
if c.annotationEnabled != nil {
pod.Annotations[annotationTransparentProxy] = strconv.FormatBool(*c.annotationEnabled)
}
pod.Spec.Containers = []corev1.Container{
{
Name: "test",
Ports: []corev1.ContainerPort{
{
Name: "tcp",
ContainerPort: 8081,
},
{
Name: "http",
ContainerPort: 8080,
},
},
},
}
pod.Annotations[annotationPort] = "tcp"
endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Expand Down