Skip to content

Commit

Permalink
Support upgrades for connect refactor (#509)
Browse files Browse the repository at this point in the history
Before the connect refactor, service registration in Consul was managed
by the lifecycle sidecar, which would re-register the service with
Consul every 10s. Now, service registration is managed by Endpoints
controller.

In order to support upgrades to the refactored Endpoints controller, we
need Endpoints controller to NOT register or deregister any services
managed by lifecycle sidecar. To do this, the annotation consul.hashicorp.com/connect-inject-managed-by
is added to pods managed by endpoints controller, so endpoints
controller will ignore older services managed by lifecycle sidecar
(legacy services) for service registration/deregistration, and only
create and register new services that are supposed to be managed by
endpoints controller.

To support health checks for legacy services, the Endpoints controller
will always update the healthcheck for any Connect service, whether it's managed by
Endpoints controller or not.

The service registration no longer happens at the same time as its health check registration.
The health check is registered separately for all services, legacy or not.
  • Loading branch information
ndhanushkodi authored May 10, 2021
1 parent 99b448e commit 75b5d6a
Show file tree
Hide file tree
Showing 8 changed files with 727 additions and 138 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ IMPROVEMENTS:
* Setting the label `consul.hashicorp.com/transparent-proxy` to `true/false` on a namespace will define the default behavior for pods in that namespace, which do not also have the annotation set.
* The default tproxy behavior will be defined by the value of `-enable-transparent-proxy` flag to the `consul-k8s inject-connect` command. It can be overridden in a namespace by the the label on the namespace or for a pod using the annotation on the pod.

* Connect: support upgrades for services deployed before endpoints controller to
upgrade to a version of consul-k8s with endpoints controller. [[GH-509](https://github.com/hashicorp/consul-k8s/pull/509)]

BUG FIXES:
* Connect: Use `runAsNonRoot: false` for connect-init's container when tproxy is enabled. [[GH-493](https://github.com/hashicorp/consul-k8s/pull/493)]
* CRDs: Fix a bug where the `config` field in `ProxyDefaults` CR was not synced to Consul because
Expand Down
9 changes: 9 additions & 0 deletions connect-inject/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ const (
// a pod after an injection is done.
keyInjectStatus = "consul.hashicorp.com/connect-inject-status"

// keyManagedBy is the key of the label that is added to pods managed
// by the Endpoints controller. This is to support upgrading from consul-k8s
// without Endpoints controller to consul-k8s with Endpoints controller
// without disrupting services managed the old way.
keyManagedBy = "consul.hashicorp.com/connect-inject-managed-by"

// annotationInject is the key of the annotation that controls whether
// injection is explicitly enabled or disabled for a pod. This should
// be set to a truthy or falsy value, as parseable by strconv.ParseBool
Expand Down Expand Up @@ -104,6 +110,9 @@ const (

// injected is used as the annotation value for annotationInjected.
injected = "injected"

// endpointsController is the value for keyManagedBy.
managedByValue = "consul-k8s-endpoints-controller"
)

// Annotations used by Prometheus.
Expand Down
198 changes: 155 additions & 43 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
MetaKeyPodName = "pod-name"
MetaKeyKubeServiceName = "k8s-service-name"
MetaKeyKubeNS = "k8s-namespace"
MetaKeyManagedBy = "managed-by"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
envoyPrometheusBindAddr = "envoy_prometheus_bind_addr"

Expand Down Expand Up @@ -150,39 +151,55 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
var managedByEndpointsController bool
if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue {
managedByEndpointsController = true
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}
// For pods managed by this controller, create and register the service instance.
if managedByEndpointsController {
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus)
if err != nil {
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return ctrl.Result{}, err
// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return ctrl.Result{}, err
}
}

// Update the TTL health check for the service.
// This is required because ServiceRegister() does not update the TTL if the service already exists.
// Update the service TTL health check for both legacy services and services managed by endpoints
// controller. The proxy health checks are registered separately by endpoints controller and
// lifecycle sidecar for legacy services. Here, we always update the health check for legacy and
// newer services idempotently since the service health check is not added as part of the service
// registration.
reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace)
r.Log.Info("updating health check status for service", "name", serviceRegistration.Name, "reason", reason, "status", healthStatus)
err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, healthStatus)
serviceName := getServiceName(pod, serviceEndpoints)
r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus)
serviceID := getServiceID(pod, serviceEndpoints)
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
proxyServiceID := getProxyServiceID(pod, serviceEndpoints)
healthCheckID := getConsulHealthCheckID(pod, serviceID)
err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus)
if err != nil {
r.Log.Error(err, "failed to update health check status for service", "name", serviceRegistration.Name)
r.Log.Error(err, "failed to update health check status for service", "name", serviceName)
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -215,6 +232,111 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
).Complete(r)
}

// getServiceCheck will return the health check for this pod and service if it exists.
func getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) {
filter := fmt.Sprintf("CheckID == `%s`", healthCheckID)
checks, err := client.Agent().ChecksWithFilter(filter)
if err != nil {
return nil, err
}
// This will be nil (does not exist) or an actual check.
return checks[healthCheckID], nil
}

// registerConsulHealthCheck registers a TTL health check for the service on this Agent local to the Pod. This will add
// the Pod's readiness status, which will mark the service instance healthy/unhealthy for Consul service mesh
// traffic.
func registerConsulHealthCheck(client *api.Client, consulHealthCheckID, serviceID, status string) error {
// Create a TTL health check in Consul associated with this service and pod.
// The TTL time is 100000h which should ensure that the check never fails due to timeout
// of the TTL check.
err := client.Agent().CheckRegister(&api.AgentCheckRegistration{
ID: consulHealthCheckID,
Name: "Kubernetes Health Check",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
TTL: "100000h",
Status: status,
SuccessBeforePassing: 1,
FailuresBeforeCritical: 1,
},
})
if err != nil {
// Full error looks like:
// Unexpected response code: 500 (ServiceID "consulnamespace/svc-id" does not exist)
if strings.Contains(err.Error(), fmt.Sprintf("%s\" does not exist", serviceID)) {
return fmt.Errorf("service %q not found in Consul: unable to register health check", serviceID)
}
return fmt.Errorf("registering health check for service %q: %w", serviceID, err)
}

return nil
}

// updateConsulHealthCheckStatus updates the consul health check status.
func (r *EndpointsController) updateConsulHealthCheckStatus(client *api.Client, consulHealthCheckID, status, reason string) error {
r.Log.Info("updating health check", "id", consulHealthCheckID)
err := client.Agent().UpdateTTL(consulHealthCheckID, reason, status)
if err != nil {
return fmt.Errorf("error updating health check: %w", err)
}
return nil
}

// upsertHealthCheck checks if the healthcheck exists for the service, and creates it if it doesn't exist, or updates it
// if it does.
func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, status string) error {
reason := getHealthCheckStatusReason(status, pod.Name, pod.Namespace)
// Retrieve the health check that would exist if the service had one registered for this pod.
serviceCheck, err := getServiceCheck(client, healthCheckID)
if err != nil {
return fmt.Errorf("unable to get agent health checks: serviceID=%s, checkID=%s, %s", serviceID, healthCheckID, err)
}
if serviceCheck == nil {
// Create a new health check.
err = registerConsulHealthCheck(client, healthCheckID, serviceID, status)
if err != nil {
return err
}

// Also update it, the reason this is separate is there is no way to set the Output field of the health check
// at creation time, and this is what is displayed on the UI as opposed to the Notes field.
err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason)
if err != nil {
return err
}
} else if serviceCheck.Status != status {
err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason)
if err != nil {
return err
}
}
return nil
}

func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
serviceName := serviceEndpoints.Name
if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" {
serviceName = serviceNameFromAnnotation
}
return serviceName

}

func getServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
return fmt.Sprintf("%s-%s", pod.Name, getServiceName(pod, serviceEndpoints))
}

func getProxyServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
serviceName := getServiceName(pod, serviceEndpoints)
return fmt.Sprintf("%s-sidecar-proxy", serviceName)
}

func getProxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string {
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
return fmt.Sprintf("%s-%s", pod.Name, proxyServiceName)
}

// createServiceRegistrations creates the service and proxy service instance registrations with the information from the
// Pod.
func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) {
Expand All @@ -235,17 +357,15 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
// Otherwise, the Consul service name should equal the Kubernetes Service name.
// The service name in Consul defaults to the Endpoints object name, and is overridden by the pod
// annotation consul.hashicorp.com/connect-service..
serviceName := serviceEndpoints.Name
if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" {
serviceName = serviceNameFromAnnotation
}
serviceName := getServiceName(pod, serviceEndpoints)

serviceID := fmt.Sprintf("%s-%s", pod.Name, serviceName)
serviceID := getServiceID(pod, serviceEndpoints)

meta := map[string]string{
MetaKeyPodName: pod.Name,
MetaKeyKubeServiceName: serviceEndpoints.Name,
MetaKeyKubeNS: serviceEndpoints.Namespace,
MetaKeyManagedBy: managedByValue,
}
for k, v := range pod.Annotations {
if strings.HasPrefix(k, annotationMeta) && strings.TrimPrefix(k, annotationMeta) != "" {
Expand All @@ -269,21 +389,13 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Address: pod.Status.PodIP,
Meta: meta,
Namespace: r.consulNamespace(pod.Namespace),
Check: &api.AgentServiceCheck{
CheckID: getConsulHealthCheckID(pod, serviceID),
Name: "Kubernetes Health Check",
TTL: "100000h",
Status: healthStatus,
SuccessBeforePassing: 1,
FailuresBeforeCritical: 1,
},
}
if len(tags) > 0 {
service.Tags = tags
}

proxyServiceName := fmt.Sprintf("%s-sidecar-proxy", serviceName)
proxyServiceID := fmt.Sprintf("%s-%s", pod.Name, proxyServiceName)
proxyServiceName := getProxyServiceName(pod, serviceEndpoints)
proxyServiceID := getProxyServiceID(pod, serviceEndpoints)
proxyConfig := &api.AgentServiceConnectProxyConfig{
DestinationServiceName: serviceName,
DestinationServiceID: serviceID,
Expand Down Expand Up @@ -506,8 +618,8 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
// of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNamespace string, client *api.Client) (map[string]*api.AgentService, error) {
return client.Agent().ServicesWithFilter(
fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q`,
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace))
fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Meta[%q] == %q`,
MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace, MetaKeyManagedBy, managedByValue))
}

// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream
Expand Down
Loading

0 comments on commit 75b5d6a

Please sign in to comment.