Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync LB endpoints #257

Merged
merged 11 commits into from
Jun 5, 2020
93 changes: 77 additions & 16 deletions catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"sync"

"github.com/deckarep/golang-set"
mapset "github.com/deckarep/golang-set"
ltagliamonte-dd marked this conversation as resolved.
Show resolved Hide resolved
"github.com/hashicorp/consul-k8s/helper/controller"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -78,6 +78,9 @@ type ServiceResource struct {
// Setting this to false will ignore ClusterIP services during the sync.
ClusterIPSync bool

// LoadBalancerEndpointsSync set to true (default false) will sync ServiceTypeLoadBalancer endpoints.
LoadBalancerEndpointsSync bool

// NodeExternalIPSync set to true (the default) syncs NodePort services
// using the node's external ip address. When false, the node's internal
// ip address will be used instead.
Expand Down Expand Up @@ -295,6 +298,9 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool {
return false
}

if t.LoadBalancerEndpointsSync {
return svc.Spec.Type == apiv1.ServiceTypeNodePort || svc.Spec.Type == apiv1.ServiceTypeClusterIP || svc.Spec.Type == apiv1.ServiceTypeLoadBalancer
ishustava marked this conversation as resolved.
Show resolved Hide resolved
}
return svc.Spec.Type == apiv1.ServiceTypeNodePort || svc.Spec.Type == apiv1.ServiceTypeClusterIP
}

Expand Down Expand Up @@ -468,28 +474,83 @@ func (t *ServiceResource) generateRegistrations(key string) {
// For LoadBalancer type services, we create a service instance for
// each LoadBalancer entry. We only support entries that have an IP
// address assigned (not hostnames).
// If LoadBalancerEndpointsSync is true sync LB endpoints instead of loadbalancer ingress.
case apiv1.ServiceTypeLoadBalancer:
seen := map[string]struct{}{}
for _, ingress := range svc.Status.LoadBalancer.Ingress {
addr := ingress.IP
if addr == "" {
addr = ingress.Hostname
if t.LoadBalancerEndpointsSync {
if t.endpointsMap == nil {
return
}
if addr == "" {
continue

endpoints := t.endpointsMap[key]
if endpoints == nil {
return
}

if _, ok := seen[addr]; ok {
continue
for _, subset := range endpoints.Subsets {
// if LoadBalancerEndpointsSync is true use the endpoint port instead
// of the service port because we're registering each endpoint
// as a separate service instance.
epPort := baseService.Port
if overridePortName != "" {
// If we're supposed to use a specific named port, find it.
for _, p := range subset.Ports {
if overridePortName == p.Name {
epPort = int(p.Port)
break
}
}
} else if overridePortNumber == 0 {
// Otherwise we'll just use the first port in the list
// (unless the port number was overridden by an annotation).
for _, p := range subset.Ports {
epPort = int(p.Port)
break
}
}
for _, subsetAddr := range subset.Addresses {
addr := subsetAddr.IP
if addr == "" {
continue
}
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
r.Service.Port = epPort

t.consulMap[key] = append(t.consulMap[key], &r)
ishustava marked this conversation as resolved.
Show resolved Hide resolved
}
}
seen[addr] = struct{}{}
} else {
for _, ingress := range svc.Status.LoadBalancer.Ingress {
addr := ingress.IP
if addr == "" {
addr = ingress.Hostname
}
if addr == "" {
continue
}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
t.consulMap[key] = append(t.consulMap[key], &r)
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr

t.consulMap[key] = append(t.consulMap[key], &r)
}
}

// For NodePort services, we create a service instance for each
Expand Down
5 changes: 5 additions & 0 deletions subcommand/sync-catalog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Command struct {
flagK8SWriteNamespace string
flagConsulWritePeriod flags.DurationValue
flagSyncClusterIPServices bool
flagSyncLBEndpoints bool
flagNodePortSyncType string
flagAddK8SNamespaceSuffix bool
flagLogLevel string
Expand Down Expand Up @@ -102,6 +103,9 @@ func (c *Command) init() {
c.flags.BoolVar(&c.flagSyncClusterIPServices, "sync-clusterip-services", true,
"If true, all valid ClusterIP services in K8S are synced by default. If false, "+
"ClusterIP services are not synced to Consul.")
c.flags.BoolVar(&c.flagSyncLBEndpoints, "sync-lb-services-endpoints", false,
"If true, will sync in Consul ServiceTypeLoadBalancer endpoints. If false, "+
ltagliamonte-dd marked this conversation as resolved.
Show resolved Hide resolved
"LoadBalancer endpoints are not synced to Consul.")
c.flags.StringVar(&c.flagNodePortSyncType, "node-port-sync-type", "ExternalOnly",
"Defines the type of sync for NodePort services. Valid options are ExternalOnly, "+
"InternalOnly and ExternalFirst.")
Expand Down Expand Up @@ -260,6 +264,7 @@ func (c *Command) Run(args []string) int {
DenyK8sNamespacesSet: denySet,
ExplicitEnable: !c.flagK8SDefault,
ClusterIPSync: c.flagSyncClusterIPServices,
LoadBalancerEndpointsSync: c.flagSyncLBEndpoints,
NodePortSync: catalogtoconsul.NodePortSyncType(c.flagNodePortSyncType),
ConsulK8STag: c.flagConsulK8STag,
ConsulServicePrefix: c.flagConsulServicePrefix,
Expand Down