Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advertise LoadBalancer IPs #422

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions pkg/backends/calico/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ func NewCalicoClient(confdConfig *config.Config) (*client, error) {
// racing with syncer-derived updates.
c.onExternalIPsUpdate(externalCIDRs)

// Get LoadBalancer CIDRs.
lbCIDRs := []string{}
if cfg != nil && cfg.Spec.ServiceLoadBalancerIPs != nil {
for _, c := range cfg.Spec.ServiceLoadBalancerIPs {
lbCIDRs = append(lbCIDRs, c.CIDR)
}
}
// Note: do this initial update before starting the syncer, so there's no chance of this
// racing with syncer-derived updates.
c.onLoadBalancerIPsUpdate(lbCIDRs)

// Start the main syncer loop. If the node-to-node mesh is enabled then we need to
// monitor all nodes. If this setting changes (which we will monitor in the OnUpdates
// callback) then we terminate confd - the calico/node init process will restart the
Expand Down Expand Up @@ -287,11 +298,13 @@ type client struct {
// This node's log level key.
nodeLogKey string

// Current values of <bgpconfig>.spec.serviceExternalIPs and
// <bgpconfig>.spec.serviceClusterIPs.
externalIPs []string
externalIPNets []*net.IPNet // same as externalIPs but parsed
clusterCIDRs []string
// Current values of <bgpconfig>.spec.serviceExternalIPs,
// <bgpconfig>.spec.serviceLoadBalancerIPs, and <bgpconfig>.spec.serviceClusterIPs.
externalIPs []string
externalIPNets []*net.IPNet // same as externalIPs but parsed
clusterCIDRs []string
loadBalancerIPs []string
loadBalancerIPNets []*net.IPNet // same as externalIPs but parsed

// Subcomponent for accessing and watching secrets (that hold BGP passwords).
secretWatcher *secretWatcher
Expand Down Expand Up @@ -913,6 +926,13 @@ func (c *client) onUpdates(updates []api.Update, needUpdatePeersV1 bool) {
}
c.onClusterIPsUpdate(clusterIPs)

// Same for loadbalancer CIDRs.
var loadBalancerIPs []string
if len(c.cache["/calico/bgp/v1/global/svc_loadbalancer_ips"]) > 0 {
loadBalancerIPs = strings.Split(c.cache["/calico/bgp/v1/global/svc_loadbalancer_ips"], ",")
}
c.onLoadBalancerIPsUpdate(loadBalancerIPs)

if c.rg != nil {
// Trigger the route generator to recheck and advertise or withdraw
// node-specific routes.
Expand All @@ -933,6 +953,7 @@ func (c *client) updateBGPConfigCache(resName string, v3res *apiv3.BGPConfigurat
c.getASNumberKVPair(v3res, model.GlobalBGPConfigKey{}, updatePeersV1, updateReasons)
c.getServiceExternalIPsKVPair(v3res, model.GlobalBGPConfigKey{}, svcAdvertisement)
c.getServiceClusterIPsKVPair(v3res, model.GlobalBGPConfigKey{}, svcAdvertisement)
c.getServiceLoadBalancerIPsKVPair(v3res, model.GlobalBGPConfigKey{}, svcAdvertisement)
c.getNodeToNodeMeshKVPair(v3res, model.GlobalBGPConfigKey{})
c.getLogSeverityKVPair(v3res, model.GlobalBGPConfigKey{})
} else if strings.HasPrefix(resName, perNodeConfigNamePrefix) {
Expand Down Expand Up @@ -1069,7 +1090,7 @@ func (c *client) getASNumberKVPair(v3res *apiv3.BGPConfiguration, key interface{
}

func (c *client) getServiceExternalIPsKVPair(v3res *apiv3.BGPConfiguration, key interface{}, svcAdvertisement *bool) {
scvExternalIPKey := getBGPConfigKey("svc_external_ips", key)
svcExternalIPKey := getBGPConfigKey("svc_external_ips", key)

if v3res != nil && v3res.Spec.ServiceExternalIPs != nil && len(v3res.Spec.ServiceExternalIPs) != 0 {
// We wrap each Service external IP in a ServiceExternalIPBlock struct to
Expand All @@ -1078,9 +1099,24 @@ func (c *client) getServiceExternalIPsKVPair(v3res *apiv3.BGPConfiguration, key
for i, ipBlock := range v3res.Spec.ServiceExternalIPs {
ipCidrs[i] = ipBlock.CIDR
}
c.updateCache(api.UpdateTypeKVUpdated, getKVPair(scvExternalIPKey, strings.Join(ipCidrs, ",")))
c.updateCache(api.UpdateTypeKVUpdated, getKVPair(svcExternalIPKey, strings.Join(ipCidrs, ",")))
} else {
c.updateCache(api.UpdateTypeKVDeleted, getKVPair(svcExternalIPKey))
}
*svcAdvertisement = true
}

func (c *client) getServiceLoadBalancerIPsKVPair(v3res *apiv3.BGPConfiguration, key interface{}, svcAdvertisement *bool) {
svcLoadBalancerIPKey := getBGPConfigKey("svc_loadbalancer_ips", key)

if v3res != nil && v3res.Spec.ServiceLoadBalancerIPs != nil && len(v3res.Spec.ServiceLoadBalancerIPs) != 0 {
ipCidrs := make([]string, len(v3res.Spec.ServiceLoadBalancerIPs))
for i, ipBlock := range v3res.Spec.ServiceLoadBalancerIPs {
ipCidrs[i] = ipBlock.CIDR
}
c.updateCache(api.UpdateTypeKVUpdated, getKVPair(svcLoadBalancerIPKey, strings.Join(ipCidrs, ",")))
} else {
c.updateCache(api.UpdateTypeKVDeleted, getKVPair(scvExternalIPKey))
c.updateCache(api.UpdateTypeKVDeleted, getKVPair(svcLoadBalancerIPKey))
}
*svcAdvertisement = true
}
Expand Down Expand Up @@ -1175,6 +1211,16 @@ func (c *client) onClusterIPsUpdate(clusterCIDRs []string) {
}
}

func (c *client) onLoadBalancerIPsUpdate(lbIPs []string) {
if err := c.updateGlobalRoutes(c.loadBalancerIPs, lbIPs); err == nil {
c.loadBalancerIPs = lbIPs
c.loadBalancerIPNets = parseIPNets(c.loadBalancerIPs)
log.Infof("Updated with new Loadbalancer IP CIDRs: %s", lbIPs)
} else {
log.WithError(err).Error("Failed to update external IP routes")
}
}

func (c *client) AdvertiseClusterIPs() bool {
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
Expand All @@ -1187,6 +1233,12 @@ func (c *client) GetExternalIPs() []*net.IPNet {
return c.externalIPNets
}

func (c *client) GetLoadBalancerIPs() []*net.IPNet {
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
return c.loadBalancerIPNets
}

// "Global" here means the routes for cluster IP and external IP CIDRs that are advertised from
// every node in the cluster.
func (c *client) updateGlobalRoutes(current, new []string) error {
Expand Down
37 changes: 35 additions & 2 deletions pkg/backends/calico/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,33 @@ func (rg *routeGenerator) getAllRoutesForService(svc *v1.Service) []string {
// Only advertise cluster IPs if we've been told to.
routes = append(routes, svc.Spec.ClusterIP)
}
svcID := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)

if svc.Spec.ExternalIPs != nil {
for _, externalIP := range svc.Spec.ExternalIPs {
// Only advertise whitelisted external IPs
if !rg.isAllowedExternalIP(externalIP) {
svc := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)
log.WithFields(log.Fields{"ip": externalIP, "svc": svc}).Info("Cannot advertise External IP - not whitelisted")
log.WithFields(log.Fields{"ip": externalIP, "svc": svcID}).Info("Cannot advertise External IP - not whitelisted")
continue
}
routes = append(routes, externalIP)
}
}

if svc.Status.LoadBalancer.Ingress != nil {
for _, lbIngress := range svc.Status.LoadBalancer.Ingress {
if len(lbIngress.IP) > 0 {
// Only advertise whitelisted LB IPs
if !rg.isAllowedLoadBalancerIP(lbIngress.IP) {
log.WithFields(log.Fields{"ip": lbIngress.IP, "svc": svcID}).Info("Cannot advertise LoadBalancer IP - not whitelisted")
continue
}
routes = append(routes, lbIngress.IP)
}
}

}

return addFullIPLength(routes)
}

Expand Down Expand Up @@ -317,7 +331,26 @@ func (rg *routeGenerator) isAllowedExternalIP(externalIP string) bool {

// Guilty until proven innocent
return false
}

// isAllowedLoadBalancerIP determines if the given IP is in the list of
// whitelisted LoadBalancer CIDRs given in the default bgpconfiguration.
func (rg *routeGenerator) isAllowedLoadBalancerIP(loadBalancerIP string) bool {

ip := net.ParseIP(loadBalancerIP)
if ip == nil {
log.Errorf("Could not parse service LB IP: %s", loadBalancerIP)
return false
}

for _, allowedNet := range rg.client.GetLoadBalancerIPs() {
if allowedNet.Contains(ip) {
return true
}
}

// Guilty until proven innocent
return false
}

// addFullIPLength returns a new slice, with the full IP length appended onto every item.
Expand Down
1 change: 1 addition & 0 deletions tests/compiled_templates/mesh/static-routes/bird_aggr.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ protocol static {
# Static routes.
route 10.101.0.0/16 blackhole;
route 10.101.0.101/32 blackhole;
route 80.15.0.0/24 blackhole;
}


Expand Down
2 changes: 2 additions & 0 deletions tests/compiled_templates/mesh/static-routes/bird_ipam.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ filter calico_export_to_bgp_peers {
# Export static routes.
if ( net ~ 10.101.0.0/16 ) then { accept; }
if ( net ~ 10.101.0.101/32 ) then { accept; }
if ( net ~ 80.15.0.0/24 ) then { accept; }

if ( net ~ 192.168.0.0/16 ) then {
accept;
Expand All @@ -19,6 +20,7 @@ filter calico_kernel_programming {

# Don't program static routes into kernel.
if ( net ~ 10.101.0.0/16 ) then { reject; }
if ( net ~ 80.15.0.0/24 ) then { reject; }

if ( net ~ 192.168.0.0/16 ) then {
krt_tunnel = "tunl0";
Expand Down
2 changes: 2 additions & 0 deletions tests/mock_data/calicoctl/mesh/static-routes/input.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ spec:
serviceClusterIPs:
- cidr: 10.101.0.0/16
- cidr: fd00:96::/112
serviceLoadBalancerIPs:
- cidr: 80.15.0.0/24

---

Expand Down
10 changes: 10 additions & 0 deletions tests/mock_data/calicoctl/mesh/static-routes/kubectl-input.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ spec:
- port: 80
protocol: TCP
targetPort: 80
status:
# TODO: kubectl doesn't support writing status, so these IPs
# never make it to the apiserver. Need changes to our test rig
# to support this.
loadBalancer:
ingress:
# Within the range in BGP config.
- ip: 80.15.0.1/32
# Not within the range in BGP config.
- ip: 90.15.0.1/32

---

Expand Down