Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Consul ENT Namespaces in the endpoints controller #475

Merged
merged 2 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions connect-inject/container_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,15 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
{{- if .NamespaceMirroringEnabled }}
{{- /* If namespace mirroring is enabled, the auth method is
defined in the default namespace */}}
-namespace="default"
-auth-method-namespace="default" \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice change!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, the flag names are great and helped me follow that we only need to pass that flag when acls are enabled and the -service-namespace flag when acls are not enabled but namespaces are!

{{- else }}
-namespace="{{ .ConsulNamespace }}"
-auth-method-namespace="{{ .ConsulNamespace }}" \
{{- end }}
{{- end }}
{{- end }}
{{- if .ConsulNamespace }}
-consul-service-namespace="{{ .ConsulNamespace }}" \
{{- end }}

# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down
8 changes: 6 additions & 2 deletions connect-inject/container_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestHandlerContainerInit_namespacesEnabled(t *testing.T) {
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-consul-service-namespace="default" \

# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand All @@ -192,6 +193,7 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-consul-service-namespace="non-default" \

# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand All @@ -218,7 +220,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="non-default"
-auth-method-namespace="non-default" \
-consul-service-namespace="non-default" \

# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down Expand Up @@ -247,7 +250,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="default"
-auth-method-namespace="default" \
-consul-service-namespace="k8snamespace" \

# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down
105 changes: 69 additions & 36 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/deckarep/golang-set"
"github.com/go-logr/logr"
"github.com/hashicorp/consul-k8s/consul"
"github.com/hashicorp/consul-k8s/namespaces"
"github.com/hashicorp/consul/api"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -48,6 +49,25 @@ type EndpointsController struct {
AllowK8sNamespacesSet mapset.Set
// Endpoints in the DenyK8sNamespacesSet are ignored.
DenyK8sNamespacesSet mapset.Set
// EnableConsulNamespaces indicates that a user is running Consul Enterprise
// with version 1.7+ which supports namespaces.
EnableConsulNamespaces bool
// ConsulDestinationNamespace is the name of the Consul namespace to create
// all config entries in. If EnableNSMirroring is true this is ignored.
ConsulDestinationNamespace string
// EnableNSMirroring causes Consul namespaces to be created to match the
// k8s namespace of any config entry custom resource. Config entries will
// be created in the matching Consul namespace.
EnableNSMirroring bool
// NSMirroringPrefix is an optional prefix that can be added to the Consul
// namespaces created while mirroring. For example, if it is set to "k8s-",
// then the k8s `default` namespace will be mirrored in Consul's
// `k8s-default` namespace.
NSMirroringPrefix string
// CrossNSACLPolicy is the name of the ACL policy to attach to
// any created Consul namespaces to allow cross namespace service discovery.
// Only necessary if ACLs are enabled.
CrossNSACLPolicy string
// ReleaseName is the Consul Helm installation release.
ReleaseName string
// ReleaseNamespace is the namespace where Consul is installed.
Expand Down Expand Up @@ -77,11 +97,11 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
}
return ctrl.Result{}, nil
} else if err != nil {
r.Log.Error(err, "failed to get Endpoints from Kubernetes", "name", req.Name, "namespace", req.Namespace)
r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace)
return ctrl.Result{}, err
}

r.Log.Info("retrieved Kubernetes Endpoints", "endpoints", serviceEndpoints.Name, "endpoints-namespace", serviceEndpoints.Namespace)
r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
// against service instances in Consul to deregister them if they are not in the map.
Expand All @@ -100,13 +120,13 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod from Kubernetes", "pod-name", address.TargetRef.Name)
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
return ctrl.Result{}, err
}

if hasBeenInjected(pod) {
// Create client for Consul agent local to the pod.
client, err := r.remoteConsulClient(pod.Status.HostIP)
client, err := r.remoteConsulClient(pod.Status.HostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP)
return ctrl.Result{}, err
Expand All @@ -115,38 +135,39 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints)
if err != nil {
r.Log.Error(err, "failed to create service registrations", "endpoints", serviceEndpoints.Name)
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service", "service", serviceRegistration.Name)
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service with Consul", "consul-service-name", serviceRegistration.Name)
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service", "service", proxyServiceRegistration.Name)
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service with Consul", "consul-proxy-service-name", proxyServiceRegistration.Name)
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return ctrl.Result{}, err
}

// Update the TTL health check for the service.
// This is required because ServiceRegister() does not update the TTL if the service already exists.
r.Log.Info("updating ttl health check", "service", serviceRegistration.Name)
r.Log.Info("updating TTL health check for service", "name", serviceRegistration.Name)
status, reason, err := getReadyStatusAndReason(pod)
if err != nil {
return ctrl.Result{}, err
}
err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, status)
if err != nil {
r.Log.Error(err, "failed to update TTL health check", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}
}
Expand All @@ -158,7 +179,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to deregister service instances on all agents", "k8s-service-name", serviceEndpoints.Name, "k8s-namespace", serviceEndpoints.Namespace)
r.Log.Error(err, "failed to deregister endpoints on all agents", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -240,7 +261,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Port: servicePort,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: "", // TODO: namespace support
Namespace: r.consulNamespace(pod.Namespace),
Check: &api.AgentServiceCheck{
CheckID: getConsulHealthCheckID(pod, serviceID),
Name: "Kubernetes Health Check",
Expand Down Expand Up @@ -299,7 +320,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Address: pod.Status.PodIP,
TaggedAddresses: nil, // TODO: set cluster IP here (will be done later)
Meta: meta,
Namespace: "", // TODO: same as service namespace
Namespace: r.consulNamespace(pod.Namespace),
Proxy: proxyConfig,
Checks: api.AgentServiceChecks{
{
Expand Down Expand Up @@ -359,9 +380,8 @@ func getReadyStatusAndReason(pod corev1.Pod) (string, string, error) {
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {

// Get all agents by getting pods with label component=client, app=consul and release=<ReleaseName>
list := corev1.PodList{}
agents := corev1.PodList{}
listOptions := client.ListOptions{
Namespace: r.ReleaseNamespace,
LabelSelector: labels.SelectorFromSet(map[string]string{
Expand All @@ -370,24 +390,23 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
"release": r.ReleaseName,
}),
}
if err := r.Client.List(ctx, &list, &listOptions); err != nil {
r.Log.Error(err, "failed to get agent pods from Kubernetes")
if err := r.Client.List(ctx, &agents, &listOptions); err != nil {
r.Log.Error(err, "failed to get Consul client agent pods")
return err
}

// On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
for _, pod := range list.Items {
// Create client for this agent.
client, err := r.remoteConsulClient(pod.Status.PodIP)
for _, agent := range agents.Items {
client, err := r.remoteConsulClient(agent.Status.PodIP, r.consulNamespace(k8sSvcNamespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP)
r.Log.Error(err, "failed to create a new Consul client", "address", agent.Status.PodIP)
return err
}

// Get services matching metadata.
svcs, err := serviceInstancesForK8SServiceNameAndNamespace(k8sSvcName, k8sSvcNamespace, client)
if err != nil {
r.Log.Error(err, "failed to get service instances", MetaKeyKubeServiceName, k8sSvcName)
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
}

Expand All @@ -399,13 +418,13 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
if err = client.Agent().ServiceDeregister(svcID); err != nil {
r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID)
r.Log.Error(err, "failed to deregister service instance", "id", svcID)
return err
}
}
} else {
if err = client.Agent().ServiceDeregister(svcID); err != nil {
r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID)
r.Log.Error(err, "failed to deregister service instance", "id", svcID)
return err
}
}
Expand All @@ -430,17 +449,25 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
for _, raw := range strings.Split(raw, ",") {
parts := strings.SplitN(raw, ":", 3)

var datacenter, serviceName, preparedQuery string
var datacenter, serviceName, preparedQuery, namespace string
var port int32
if strings.TrimSpace(parts[0]) == "prepared_query" {
port, _ = portValue(pod, strings.TrimSpace(parts[2]))
preparedQuery = strings.TrimSpace(parts[1])
} else {
port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// TODO: Parse the namespace if provided

serviceName = strings.TrimSpace(parts[0])
// If Consul Namespaces are enabled, attempt to parse the
// upstream for a namespace.
if r.EnableConsulNamespaces {
pieces := strings.SplitN(parts[0], ".", 2)
serviceName = strings.TrimSpace(pieces[0])
if len(pieces) > 1 {
namespace = strings.TrimSpace(pieces[1])
}
} else {
serviceName = strings.TrimSpace(parts[0])
}

// parse the optional datacenter
if len(parts) > 2 {
Expand Down Expand Up @@ -469,7 +496,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
if port > 0 {
upstream := api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationNamespace: "", // todo
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
Expand All @@ -488,12 +515,12 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
return upstreams, nil
}

// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) {
// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod for a provided namespace.
func (r *EndpointsController) remoteConsulClient(ip string, namespace string) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
localConfig := r.ConsulClientCfg
localConfig.Address = newAddr

localConfig.Namespace = namespace
return consul.NewClient(localConfig)
}

Expand Down Expand Up @@ -553,28 +580,28 @@ func (r EndpointsController) filterAgentPods(object client.Object) bool {
// for client agent pods where the Ready condition is true.
func (r EndpointsController) requestsForRunningAgentPods(object client.Object) []ctrl.Request {
var consulClientPod corev1.Pod
r.Log.Info("received update for consulClientPod", "podName", object.GetName())
r.Log.Info("received update for Consul client pod", "name", object.GetName())
err := r.Client.Get(r.Context, types.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()}, &consulClientPod)
if k8serrors.IsNotFound(err) {
// Ignore if consulClientPod is not found.
return []ctrl.Request{}
}
if err != nil {
r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name)
r.Log.Error(err, "failed to get Consul client pod", "name", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not running, since
// we can't reconcile and register/deregister services against that agent.
if consulClientPod.Status.Phase != corev1.PodRunning {
r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name)
r.Log.Info("ignoring Consul client pod because it's not running", "name", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not yet ready, since
// we can't reconcile and register/deregister services against that agent.
for _, cond := range consulClientPod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue {
// Ignore if consulClientPod is not ready.
r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name)
r.Log.Info("ignoring Consul client pod because it's not ready", "name", consulClientPod.Name)
return []ctrl.Request{}
}
}
Expand Down Expand Up @@ -605,6 +632,12 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [
return requests
}

// consulNamespace returns the Consul destination namespace for a provided Kubernetes namespace
// depending on Consul Namespaces being enabled and the value of namespace mirroring.
func (r *EndpointsController) consulNamespace(namespace string) string {
return namespaces.ConsulNamespace(namespace, r.EnableConsulNamespaces, r.ConsulDestinationNamespace, r.EnableNSMirroring, r.NSMirroringPrefix)
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[keyInjectStatus]; ok {
Expand Down
Loading