Skip to content

Commit

Permalink
Merge pull request #1031 from davecheney/internal-dag-cluster
Browse files Browse the repository at this point in the history
internal/dag: introduce dag.Cluster
  • Loading branch information
davecheney authored Apr 22, 2019
2 parents f4bc723 + 468aa4e commit ccbb6be
Show file tree
Hide file tree
Showing 15 changed files with 568 additions and 448 deletions.
30 changes: 16 additions & 14 deletions internal/contour/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,23 @@ func visitClusters(root dag.Vertex) map[string]*v2.Cluster {
}

func (v *clusterVisitor) visit(vertex dag.Vertex) {
switch service := vertex.(type) {
case *dag.HTTPService:
name := envoy.Clustername(&service.TCPService)
if _, ok := v.clusters[name]; !ok {
c := envoy.Cluster(service)
v.clusters[c.Name] = c
if cluster, ok := vertex.(*dag.Cluster); ok {
switch cluster.Upstream.(type) {
case *dag.HTTPService:
name := envoy.Clustername(cluster)
if _, ok := v.clusters[name]; !ok {
c := envoy.Cluster(cluster)
v.clusters[c.Name] = c
}
case *dag.TCPService:
name := envoy.Clustername(cluster)
if _, ok := v.clusters[name]; !ok {
c := envoy.Cluster(cluster)
v.clusters[c.Name] = c
}
default:
// nothing
}
case *dag.TCPService:
name := envoy.Clustername(service)
if _, ok := v.clusters[name]; !ok {
c := envoy.Cluster(service)
v.clusters[c.Name] = c
}
default:
// nothing
}

// recurse into children of v
Expand Down
26 changes: 4 additions & 22 deletions internal/contour/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,22 +116,13 @@ func (v *routeVisitor) visit(vertex dag.Vertex) {
vhost := envoy.VirtualHost(vh.Name, l.Port)
vh.Visit(func(v dag.Vertex) {
if r, ok := v.(*dag.Route); ok {
var svcs []*dag.TCPService
r.Visit(func(s dag.Vertex) {
switch s := s.(type) {
case *dag.HTTPService:
svcs = append(svcs, &s.TCPService)
case *dag.TCPService:
svcs = append(svcs, s)
}
})
if len(svcs) < 1 {
if len(r.Clusters) < 1 {
// no services for this route, skip it.
return
}
rr := route.Route{
Match: envoy.PrefixMatch(r.Prefix),
Action: envoy.RouteRoute(r, svcs),
Action: envoy.RouteRoute(r, r.Clusters),
}

if r.HTTPSUpgrade {
Expand All @@ -149,22 +140,13 @@ func (v *routeVisitor) visit(vertex dag.Vertex) {
vhost := envoy.VirtualHost(vh.VirtualHost.Name, l.Port)
vh.Visit(func(v dag.Vertex) {
if r, ok := v.(*dag.Route); ok {
var svcs []*dag.TCPService
r.Visit(func(s dag.Vertex) {
switch s := s.(type) {
case *dag.HTTPService:
svcs = append(svcs, &s.TCPService)
case *dag.TCPService:
svcs = append(svcs, s)
}
})
if len(svcs) < 1 {
if len(r.Clusters) < 1 {
// no services for this route, skip it.
return
}
vhost.Routes = append(vhost.Routes, route.Route{
Match: envoy.PrefixMatch(r.Prefix),
Action: envoy.RouteRoute(r, svcs),
Action: envoy.RouteRoute(r, r.Clusters),
})
}
})
Expand Down
32 changes: 18 additions & 14 deletions internal/contour/visitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ func TestVisitClusters(t *testing.T) {
VirtualHost: dag.VirtualHost{
Name: "www.example.com",
TCPProxy: &dag.TCPProxy{
Services: []*dag.TCPService{{
Name: "example",
Namespace: "default",
ServicePort: &v1.ServicePort{
Protocol: "TCP",
Port: 443,
TargetPort: intstr.FromInt(8443),
Clusters: []*dag.Cluster{{
Upstream: &dag.TCPService{
Name: "example",
Namespace: "default",
ServicePort: &v1.ServicePort{
Protocol: "TCP",
Port: 443,
TargetPort: intstr.FromInt(8443),
},
},
}},
},
Expand Down Expand Up @@ -85,13 +87,15 @@ func TestVisitClusters(t *testing.T) {

func TestVisitListeners(t *testing.T) {
p1 := &dag.TCPProxy{
Services: []*dag.TCPService{{
Name: "example",
Namespace: "default",
ServicePort: &v1.ServicePort{
Protocol: "TCP",
Port: 443,
TargetPort: intstr.FromInt(8443),
Clusters: []*dag.Cluster{{
Upstream: &dag.TCPService{
Name: "example",
Namespace: "default",
ServicePort: &v1.ServicePort{
Protocol: "TCP",
Port: 443,
TargetPort: intstr.FromInt(8443),
},
},
}},
}
Expand Down
140 changes: 19 additions & 121 deletions internal/dag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,123 +20,22 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"

"github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
ingressroutev1 "github.com/heptio/contour/apis/contour/v1beta1"
)

// A KubernetesCache holds Kubernetes objects and associated configuration and produces
// DAG values.
type KubernetesCache struct {
// IngressRouteRootNamespaces specifies the namespaces where root
// IngressRoutes can be defined. If empty, roots can be defined in any
// namespace.
IngressRouteRootNamespaces []string

mu sync.RWMutex

ingresses map[meta]*v1beta1.Ingress
ingressroutes map[meta]*ingressroutev1.IngressRoute
secrets map[meta]*v1.Secret
delegations map[meta]*ingressroutev1.TLSCertificateDelegation
services map[meta]*v1.Service
}

// meta holds the name and namespace of a Kubernetes object.
type meta struct {
name, namespace string
}

const (
StatusValid = "valid"
StatusInvalid = "invalid"
StatusOrphaned = "orphaned"
)

// Insert inserts obj into the KubernetesCache.
// If an object with a matching type, name, and namespace exists, it will be overwritten.
func (kc *KubernetesCache) Insert(obj interface{}) {
kc.mu.Lock()
defer kc.mu.Unlock()
switch obj := obj.(type) {
case *v1.Secret:
m := meta{name: obj.Name, namespace: obj.Namespace}
if kc.secrets == nil {
kc.secrets = make(map[meta]*v1.Secret)
}
kc.secrets[m] = obj
case *v1.Service:
m := meta{name: obj.Name, namespace: obj.Namespace}
if kc.services == nil {
kc.services = make(map[meta]*v1.Service)
}
kc.services[m] = obj
case *v1beta1.Ingress:
m := meta{name: obj.Name, namespace: obj.Namespace}
if kc.ingresses == nil {
kc.ingresses = make(map[meta]*v1beta1.Ingress)
}
kc.ingresses[m] = obj
case *ingressroutev1.IngressRoute:
m := meta{name: obj.Name, namespace: obj.Namespace}
if kc.ingressroutes == nil {
kc.ingressroutes = make(map[meta]*ingressroutev1.IngressRoute)
}
kc.ingressroutes[m] = obj
case *ingressroutev1.TLSCertificateDelegation:
m := meta{name: obj.Name, namespace: obj.Namespace}
if kc.delegations == nil {
kc.delegations = make(map[meta]*ingressroutev1.TLSCertificateDelegation)
}
kc.delegations[m] = obj

default:
// not an interesting object
}
}

// Remove removes obj from the KubernetesCache.
// If no object with a matching type, name, and namespace exists in the DAG, no action is taken.
func (kc *KubernetesCache) Remove(obj interface{}) {
switch obj := obj.(type) {
default:
kc.remove(obj)
case cache.DeletedFinalStateUnknown:
kc.Remove(obj.Obj) // recurse into ourselves with the tombstoned value
}
}

func (kc *KubernetesCache) remove(obj interface{}) {
kc.mu.Lock()
defer kc.mu.Unlock()
switch obj := obj.(type) {
case *v1.Secret:
m := meta{name: obj.Name, namespace: obj.Namespace}
delete(kc.secrets, m)
case *v1.Service:
m := meta{name: obj.Name, namespace: obj.Namespace}
delete(kc.services, m)
case *v1beta1.Ingress:
m := meta{name: obj.Name, namespace: obj.Namespace}
delete(kc.ingresses, m)
case *ingressroutev1.IngressRoute:
m := meta{name: obj.Name, namespace: obj.Namespace}
delete(kc.ingressroutes, m)
case *ingressroutev1.TLSCertificateDelegation:
m := meta{name: obj.Name, namespace: obj.Namespace}
delete(kc.delegations, m)
default:
// not interesting
}
}

// A Builder builds a *DAGs
type Builder struct {
KubernetesCache
Expand Down Expand Up @@ -175,8 +74,8 @@ type builder struct {
}

// lookupHTTPService returns a HTTPService that matches the meta and port supplied.
func (b *builder) lookupHTTPService(m meta, port intstr.IntOrString, weight int, strategy string, hc *ingressroutev1.HealthCheck) *HTTPService {
s := b.lookupService(m, port, weight, strategy, hc)
func (b *builder) lookupHTTPService(m meta, port intstr.IntOrString, strategy string, hc *ingressroutev1.HealthCheck) *HTTPService {
s := b.lookupService(m, port, strategy, hc)
switch s := s.(type) {
case *HTTPService:
return s
Expand All @@ -188,10 +87,10 @@ func (b *builder) lookupHTTPService(m meta, port intstr.IntOrString, weight int,
for i := range svc.Spec.Ports {
p := &svc.Spec.Ports[i]
if int(p.Port) == port.IntValue() {
return b.addHTTPService(svc, p, weight, strategy, hc)
return b.addHTTPService(svc, p, strategy, hc)
}
if port.String() == p.Name {
return b.addHTTPService(svc, p, weight, strategy, hc)
return b.addHTTPService(svc, p, strategy, hc)
}
}
return nil
Expand All @@ -202,8 +101,8 @@ func (b *builder) lookupHTTPService(m meta, port intstr.IntOrString, weight int,
}

// lookupTCPService returns a TCPService that matches the meta and port supplied.
func (b *builder) lookupTCPService(m meta, port intstr.IntOrString, weight int, strategy string, hc *ingressroutev1.HealthCheck) *TCPService {
s := b.lookupService(m, port, weight, strategy, hc)
func (b *builder) lookupTCPService(m meta, port intstr.IntOrString, strategy string, hc *ingressroutev1.HealthCheck) *TCPService {
s := b.lookupService(m, port, strategy, hc)
switch s := s.(type) {
case *TCPService:
return s
Expand All @@ -215,10 +114,10 @@ func (b *builder) lookupTCPService(m meta, port intstr.IntOrString, weight int,
for i := range svc.Spec.Ports {
p := &svc.Spec.Ports[i]
if int(p.Port) == port.IntValue() {
return b.addTCPService(svc, p, weight, strategy, hc)
return b.addTCPService(svc, p, strategy, hc)
}
if port.String() == p.Name {
return b.addTCPService(svc, p, weight, strategy, hc)
return b.addTCPService(svc, p, strategy, hc)
}
}
return nil
Expand All @@ -227,7 +126,7 @@ func (b *builder) lookupTCPService(m meta, port intstr.IntOrString, weight int,
return nil
}
}
func (b *builder) lookupService(m meta, port intstr.IntOrString, weight int, strategy string, hc *ingressroutev1.HealthCheck) Service {
func (b *builder) lookupService(m meta, port intstr.IntOrString, strategy string, hc *ingressroutev1.HealthCheck) Service {
if port.Type != intstr.Int {
// can't handle, give up
return nil
Expand All @@ -236,7 +135,6 @@ func (b *builder) lookupService(m meta, port intstr.IntOrString, weight int, str
name: m.name,
namespace: m.namespace,
port: int32(port.IntValue()),
weight: weight,
strategy: strategy,
healthcheck: healthcheckToString(hc),
}
Expand All @@ -251,7 +149,7 @@ func healthcheckToString(hc *ingressroutev1.HealthCheck) string {
return fmt.Sprintf("%#v", hc)
}

func (b *builder) addHTTPService(svc *v1.Service, port *v1.ServicePort, weight int, strategy string, hc *ingressroutev1.HealthCheck) *HTTPService {
func (b *builder) addHTTPService(svc *v1.Service, port *v1.ServicePort, strategy string, hc *ingressroutev1.HealthCheck) *HTTPService {
if b.services == nil {
b.services = make(map[servicemeta]Service)
}
Expand All @@ -266,7 +164,6 @@ func (b *builder) addHTTPService(svc *v1.Service, port *v1.ServicePort, weight i
Name: svc.Name,
Namespace: svc.Namespace,
ServicePort: port,
Weight: weight,
LoadBalancerStrategy: strategy,

MaxConnections: parseAnnotation(svc.Annotations, annotationMaxConnections),
Expand All @@ -281,15 +178,14 @@ func (b *builder) addHTTPService(svc *v1.Service, port *v1.ServicePort, weight i
return s
}

func (b *builder) addTCPService(svc *v1.Service, port *v1.ServicePort, weight int, strategy string, hc *ingressroutev1.HealthCheck) *TCPService {
func (b *builder) addTCPService(svc *v1.Service, port *v1.ServicePort, strategy string, hc *ingressroutev1.HealthCheck) *TCPService {
if b.services == nil {
b.services = make(map[servicemeta]Service)
}
s := &TCPService{
Name: svc.Name,
Namespace: svc.Namespace,
ServicePort: port,
Weight: weight,
LoadBalancerStrategy: strategy,

MaxConnections: parseAnnotation(svc.Annotations, annotationMaxConnections),
Expand Down Expand Up @@ -570,8 +466,8 @@ func (b *builder) computeIngresses() {
r := prefixRoute(ing, prefix)
be := httppath.Backend
m := meta{name: be.ServiceName, namespace: ing.Namespace}
if s := b.lookupHTTPService(m, be.ServicePort, 0, "", nil); s != nil {
r.addHTTPService(s)
if s := b.lookupHTTPService(m, be.ServicePort, "", nil); s != nil {
r.addHTTPService(s, 0)
}

// should we create port 80 routes for this ingress
Expand Down Expand Up @@ -765,8 +661,8 @@ func (b *builder) processRoutes(ir *ingressroutev1.IngressRoute, prefixMatch str
return
}
m := meta{name: service.Name, namespace: ir.Namespace}
if s := b.lookupHTTPService(m, intstr.FromInt(service.Port), service.Weight, service.Strategy, service.HealthCheck); s != nil {
r.addHTTPService(s)
if s := b.lookupHTTPService(m, intstr.FromInt(service.Port), service.Strategy, service.HealthCheck); s != nil {
r.addHTTPService(s, service.Weight)
}
}

Expand Down Expand Up @@ -826,12 +722,14 @@ func (b *builder) processTCPProxy(ir *ingressroutev1.IngressRoute, visited []*in
var proxy TCPProxy
for _, service := range tcpproxy.Services {
m := meta{name: service.Name, namespace: ir.Namespace}
s := b.lookupTCPService(m, intstr.FromInt(service.Port), service.Weight, service.Strategy, service.HealthCheck)
s := b.lookupTCPService(m, intstr.FromInt(service.Port), service.Strategy, service.HealthCheck)
if s == nil {
b.setStatus(Status{Object: ir, Status: StatusInvalid, Description: fmt.Sprintf("tcpproxy: service %s/%s/%d: not found", ir.Namespace, service.Name, service.Port), Vhost: host})
return
}
proxy.Services = append(proxy.Services, s)
proxy.Clusters = append(proxy.Clusters, &Cluster{
Upstream: s,
})
}
b.lookupSecureVirtualHost(host).VirtualHost.TCPProxy = &proxy
b.setStatus(Status{Object: ir, Status: StatusValid, Description: "valid IngressRoute", Vhost: host})
Expand Down
Loading

0 comments on commit ccbb6be

Please sign in to comment.