Skip to content


Finished endpoints controller
Browse files Browse the repository at this point in the history
Todos left:
* remove service name default annotation
* make inject-connect unit tests pass without a kubeconfig
* end to end testing
  • Loading branch information
ndhanushkodi committed Mar 18, 2021
1 parent b93dcb7 commit d88d866
Show file tree
Hide file tree
Showing 3 changed files with 467 additions and 175 deletions.
168 changes: 72 additions & 96 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

// todo: add docs
type EndpointsController struct {
// ConsulClient points at the agent local to the connect-inject deployment pod
Expand All @@ -27,39 +26,30 @@ type EndpointsController struct {
// i.e. "http" or "https".
ConsulScheme string
// ConsulPort is the port to make HTTP API calls to Consul agents on.
ConsulPort string
ConsulPort string
// Only endpoints in the AllowK8sNamespacesSet are reconciled.
AllowK8sNamespacesSet mapset.Set
DenyK8sNamespacesSet mapset.Set
Log logr.Logger
Scheme *runtime.Scheme
Context context.Context
// Endpoints in the DenyK8sNamespacesSet are ignored.
DenyK8sNamespacesSet mapset.Set
// ReleaseName is the Consul Helm installation release.
ReleaseName string
// ReleaseNamespace is the namespace where Consul is Helm installed.
ReleaseNamespace string
Log logr.Logger
Scheme *runtime.Scheme
Ctx context.Context

// TODO: get consul installation namespace and release name passed in for querying agents (for more efficient lookup of agent pods)

const MetaKeyKubeServiceName = "k8s-service-name"

func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
var serviceEndpoints corev1.Endpoints

// Ignores system namespaces.
if req.Namespace == "kube-system" || req.Namespace == "local-path-storage" {
return ctrl.Result{}, nil

// Ignore namespaces where we don't connect-inject.
// Ignores deny list.
if r.DenyK8sNamespacesSet.Contains(req.Namespace) {
fmt.Printf("%+v\n", r.DenyK8sNamespacesSet.ToSlice()...)
return ctrl.Result{}, nil
// Ignores if not in allow list or allow list is not *.
if !r.AllowK8sNamespacesSet.Contains("*") && !r.AllowK8sNamespacesSet.Contains(req.Namespace) {
fmt.Printf("%+v\n", r.AllowK8sNamespacesSet.ToSlice()...)
if shouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) {
return ctrl.Result{}, nil

err := r.Client.Get(context.Background(), req.NamespacedName, &serviceEndpoints)
err := r.Client.Get(r.Ctx, req.NamespacedName, &serviceEndpoints)

// If the endpoints object has been deleted (and we get an IsNotFound
// error), we need to deregister all instances in Consul for that service.
Expand Down Expand Up @@ -104,7 +94,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Get pod associated with this address.
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
err = r.Client.Get(context.Background(), objectKey, &pod)
err = r.Client.Get(r.Ctx, objectKey, &pod)
if err != nil {
r.Log.Error(err, "failed to get pod from Kubernetes", "pod name", address.TargetRef.Name)
return ctrl.Result{}, err
Expand All @@ -127,7 +117,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {

// TODO remove logic in handler to always set the service name annotation
// TODO: remove logic in handler to always set the service name annotation
// We only want that annotation to be present when explicitly overriding the consul svc name
// Otherwise, the Consul service name should equal the K8s Service name.
// The service name in Consul defaults to the Endpoints object name, and is overridden by the pod
Expand Down Expand Up @@ -163,16 +153,13 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
tags = append(tags, strings.Split(raw, ",")...)

// TODO tags, meta, upstreams

//fmt.Printf("&&& Pod name: %+v, service port: %+v, service name: %+v, service id: %+v\n", pod, servicePort, serviceName, serviceID)
service := &api.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Port: servicePort,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: "", // todo: namespace support
Namespace: "", // TODO: namespace support
if len(tags) > 0 {
service.Tags = tags
Expand All @@ -189,7 +176,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
proxyConfig := &api.AgentServiceConnectProxyConfig{
DestinationServiceName: serviceName,
DestinationServiceID: serviceID,
Config: nil, // todo: add config for metrics
Config: nil, // TODO: add config for metrics (upcoming PR)

if servicePort > 0 {
Expand All @@ -208,9 +195,9 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
Name: proxyServiceName,
Port: 20000,
Address: pod.Status.PodIP,
TaggedAddresses: nil, // todo: set cluster IP here (will be done later)
TaggedAddresses: nil, // TODO: set cluster IP here (will be done later)
Meta: meta,
Namespace: "", // todo: same as service namespace
Namespace: "", // TODO: same as service namespace
Proxy: proxyConfig,
Check: nil,
Checks: api.AgentServiceChecks{
Expand Down Expand Up @@ -248,7 +235,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// since we don't need to deregister extraneous service instances if there aren't any.
if consulServiceName != "" {
// Get service and proxy instances from Consul.
fmt.Printf("*** consulServiceName %s\n", consulServiceName)
r.Log.Info("endpoints has at least 1 address, checking for Consul service instances to deregister", "consul service name", consulServiceName)
serviceInstances, _, err := r.ConsulClient.Catalog().Service(consulServiceName, "", nil)
if err != nil {
r.Log.Error(err, "failed to get service instances", "service", consulServiceName)
Expand All @@ -260,16 +247,11 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
r.Log.Error(err, "failed to get service instances", "service", proxyServiceName)
return ctrl.Result{}, err
fmt.Printf("*** svcinstances %+v\n", serviceInstances)
fmt.Printf("*** endpointAddressMap %+v\n", endpointAddressMap)

serviceAndProxyInstances := append(serviceInstances, proxyInstances...)

// Check if each instance is in the endpointAddressMap. If it is not, then deregister that service instance.
for _, instance := range serviceAndProxyInstances {
fmt.Printf("*** instance %+v\n", instance)
if _, ok := endpointAddressMap[instance.ServiceAddress]; !ok {

agentClient, err := r.getConsulClient(instance.Address)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", instance.Address)
Expand All @@ -288,7 +270,6 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil

// TODO pass in a context for entire reconcile, not context.Background
// deregisterServiceOnAllAgents queries all agents for service instances that have the metadata
// "k8s-service-name"=k8sSvcName and "k8s-namespace"=k8sSvcNamespace. The k8s service name may or may not match the
// consul service name, but the k8s service name will always match the metadata on the Consul service
Expand All @@ -298,14 +279,22 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// associated proxy service instances.
func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNamespace string) error {

// Get all agents by getting pods with label component=client
// Get all agents by getting pods with label component=client, app=consul and release=<ReleaseName>
// TODO more strict: app=consul, maybe release name (need to pass in), also namespace
list := corev1.PodList{}
listOptions := client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{"component": "client"}),
Namespace: r.ReleaseNamespace,
LabelSelector: labels.SelectorFromSet(map[string]string{
"component": "client",
"app": "consul",
"release": r.ReleaseName,
err := r.Client.List(r.Ctx, &list, &listOptions)
if err != nil {
r.Log.Error(err, "failed to get agent pods from Kubernetes")
return err
// TODO error check
r.Client.List(context.Background(), &list, &listOptions)

// On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
for _, pod := range list.Items {
Expand All @@ -323,7 +312,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam
return err

// Deregister each service instance that matches the metadata
// Deregister each service instance that matches the metadata.
for svcID, _ := range svcs {
r.Log.Info("deregistering service", "service id", svcID)
err = client.Agent().ServiceDeregister(svcID)
Expand All @@ -337,6 +326,8 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam
return nil

// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream
// objects.
func (r *EndpointsController) processUpstreams(pod *corev1.Pod) ([]api.Upstream, error) {
var upstreams []api.Upstream
if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" {
Expand All @@ -351,17 +342,7 @@ func (r *EndpointsController) processUpstreams(pod *corev1.Pod) ([]api.Upstream,
} else {
port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// todo: Parse the namespace if provided
//if data.ConsulNamespace != "" {
// pieces := strings.SplitN(parts[0], ".", 2)
// serviceName = pieces[0]
// if len(pieces) > 1 {
// namespace = pieces[1]
// }
//} else {
// serviceName = strings.TrimSpace(parts[0])
// TODO: Parse the namespace if provided

serviceName = strings.TrimSpace(parts[0])

Expand Down Expand Up @@ -411,13 +392,14 @@ func (r *EndpointsController) processUpstreams(pod *corev1.Pod) ([]api.Upstream,
return upstreams, nil

func hasBeenInjected(pod *corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
if anno == injected {
return true
return false
func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger {
return r.Log.WithValues("request", name)

func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
Expand All @@ -435,40 +417,34 @@ func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) {
return localClient, err

func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger {
return r.Log.WithValues("request", name)
// shouldIgnore ignores namespaces where we don't connect-inject.
func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {
// Ignores system namespaces.
if namespace == "kube-system" || namespace == "local-path-storage" {
return true

func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Ignores deny list.
if denySet.Contains(namespace) {
fmt.Printf("%+v\n", denySet.ToSlice()...)
return true

// Ignores if not in allow list or allow list is not *.
if !allowSet.Contains("*") && !allowSet.Contains(namespace) {
fmt.Printf("%+v\n", allowSet.ToSlice()...)
return true

return false

// The following can work for when k8s svc == consul svc. We can add this as an optimization to above.
// ---------------------------------------------------------------------------------------
// then deregister each service instance [is there a way to deregister the whole service]
// below it's done by each svc instance
// r.ConsulClient.Catalog().Services(q *api.QueryOptions) --> name of svcs

// Use this path for if k8s service == consul service
// serviceInstances, _, err := r.ConsulClient.Catalog().Service(name, "", nil)
// if err != nil {
// r.Log.Error(err, "failed to get service instances from Consul", "name", name)
// return ctrl.Result{}, err
// }
// for _, instance := range serviceInstances {
// agentClient, err := r.getConsulClient(instance.Address) // this is the pod IP of the consul client agent rather than service address
// if err != nil {
// r.Log.Error(err, "failed to create a new Consul client", "address", instance.Address)
// return ctrl.Result{}, err
// }
// r.Log.Info("deregistering service", "service", instance.ServiceName)
// err = agentClient.Agent().ServiceDeregister(instance.ServiceID)
// if err != nil {
// r.Log.Error(err, "failed to deregister service", "name", name)
// return ctrl.Result{}, err
// }
// }
// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod *corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
if anno == injected {
return true
return false

0 comments on commit d88d866

Please sign in to comment.