Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Revert removal of support for TCP and UDP services
  • Loading branch information
aledbf committed Nov 16, 2018
1 parent d26543a commit 168f30d
Show file tree
Hide file tree
Showing 16 changed files with 600 additions and 11 deletions.
14 changes: 14 additions & 0 deletions cmd/nginx/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ Takes the form "namespace/name". When used together with update-status, the
controller mirrors the address of this service's endpoints to the load-balancer
status of all Ingress objects it satisfies.`)

tcpConfigMapName = flags.String("tcp-services-configmap", "",
`Name of the ConfigMap containing the definition of the TCP services to expose.
The key in the map indicates the external port to be used. The value is a
reference to a Service in the form "namespace/name:port", where "port" can
either be a port number or name. TCP ports 80 and 443 are reserved by the
controller for servicing HTTP traffic.`)
udpConfigMapName = flags.String("udp-services-configmap", "",
`Name of the ConfigMap containing the definition of the UDP services to expose.
The key in the map indicates the external port to be used. The value is a
reference to a Service in the form "namespace/name:port", where "port" can
either be a port name or number.`)

resyncPeriod = flags.Duration("sync-period", 0,
`Period at which the controller forces the repopulation of its local object stores. Disabled by default.`)

Expand Down Expand Up @@ -217,6 +229,8 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en
DefaultService: *defaultSvc,
Namespace: *watchNamespace,
ConfigMapName: *configMap,
TCPConfigMapName: *tcpConfigMapName,
UDPConfigMapName: *udpConfigMapName,
DefaultSSLCertificate: *defSSLCertificate,
DefaultHealthzURL: *defHealthzURL,
HealthCheckTimeout: *healthCheckTimeout,
Expand Down
17 changes: 17 additions & 0 deletions deploy/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,21 @@ metadata:
app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
name: tcp-services
namespace: ingress-nginx
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
name: udp-services
namespace: ingress-nginx
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/part-of: ingress-nginx
6 changes: 3 additions & 3 deletions deploy/mandatory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
name: ingress-nginx

---

kind: ConfigMap
apiVersion: v1
metadata:
Expand All @@ -15,7 +14,6 @@ metadata:
app.kubernetes.io/part-of: ingress-nginx

---

apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down Expand Up @@ -162,7 +160,6 @@ subjects:
namespace: ingress-nginx

---

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
Expand Down Expand Up @@ -193,6 +190,8 @@ spec:
args:
- /nginx-ingress-controller
- --configmap=$(POD_NAMESPACE)/nginx-configuration
- --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
- --udp-services-configmap=$(POD_NAMESPACE)/udp-services
- --publish-service=$(POD_NAMESPACE)/ingress-nginx
- --annotations-prefix=nginx.ingress.kubernetes.io
securityContext:
Expand Down Expand Up @@ -238,3 +237,4 @@ spec:
timeoutSeconds: 1

---

2 changes: 2 additions & 0 deletions deploy/with-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ spec:
args:
- /nginx-ingress-controller
- --configmap=$(POD_NAMESPACE)/nginx-configuration
- --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
- --udp-services-configmap=$(POD_NAMESPACE)/udp-services
- --publish-service=$(POD_NAMESPACE)/ingress-nginx
- --annotations-prefix=nginx.ingress.kubernetes.io
securityContext:
Expand Down
2 changes: 2 additions & 0 deletions internal/ingress/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,8 @@ type TemplateConfig struct {
Backends []*ingress.Backend
PassthroughBackends []*ingress.SSLPassthroughBackend
Servers []*ingress.Server
TCPBackends []ingress.L4Service
UDPBackends []ingress.L4Service
HealthzURI string
CustomErrors bool
Cfg Configuration
Expand Down
122 changes: 122 additions & 0 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/rand"
"sort"
"strconv"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -60,6 +61,11 @@ type Configuration struct {

ForceNamespaceIsolation bool

// +optional
TCPConfigMapName string
// +optional
UDPConfigMapName string

DefaultHealthzURL string
HealthCheckTimeout time.Duration
DefaultSSLCertificate string
Expand Down Expand Up @@ -151,6 +157,8 @@ func (n *NGINXController) syncIngress(interface{}) error {
pcfg := &ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
}
Expand Down Expand Up @@ -216,6 +224,120 @@ func (n *NGINXController) syncIngress(interface{}) error {
return nil
}

func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service {
if configmapName == "" {
return []ingress.L4Service{}
}
glog.V(3).Infof("Obtaining information about %v stream services from ConfigMap %q", proto, configmapName)
_, _, err := k8s.ParseNameNS(configmapName)
if err != nil {
glog.Errorf("Error parsing ConfigMap reference %q: %v", configmapName, err)
return []ingress.L4Service{}
}
configmap, err := n.store.GetConfigMap(configmapName)
if err != nil {
glog.Errorf("Error getting ConfigMap %q: %v", configmapName, err)
return []ingress.L4Service{}
}
var svcs []ingress.L4Service
var svcProxyProtocol ingress.ProxyProtocol
rp := []int{
n.cfg.ListenPorts.HTTP,
n.cfg.ListenPorts.HTTPS,
n.cfg.ListenPorts.SSLProxy,
n.cfg.ListenPorts.Status,
n.cfg.ListenPorts.Health,
n.cfg.ListenPorts.Default,
}
reserverdPorts := sets.NewInt(rp...)
// svcRef format: <(str)namespace>/<(str)service>:<(intstr)port>[:<("PROXY")decode>:<("PROXY")encode>]
for port, svcRef := range configmap.Data {
externalPort, err := strconv.Atoi(port)
if err != nil {
glog.Warningf("%q is not a valid %v port number", port, proto)
continue
}
if reserverdPorts.Has(externalPort) {
glog.Warningf("Port %d cannot be used for %v stream services. It is reserved for the Ingress controller.", externalPort, proto)
continue
}
nsSvcPort := strings.Split(svcRef, ":")
if len(nsSvcPort) < 2 {
glog.Warningf("Invalid Service reference %q for %v port %d", svcRef, proto, externalPort)
continue
}
nsName := nsSvcPort[0]
svcPort := nsSvcPort[1]
svcProxyProtocol.Decode = false
svcProxyProtocol.Encode = false
// Proxy Protocol is only compatible with TCP Services
if len(nsSvcPort) >= 3 && proto == apiv1.ProtocolTCP {
if len(nsSvcPort) >= 3 && strings.ToUpper(nsSvcPort[2]) == "PROXY" {
svcProxyProtocol.Decode = true
}
if len(nsSvcPort) == 4 && strings.ToUpper(nsSvcPort[3]) == "PROXY" {
svcProxyProtocol.Encode = true
}
}
svcNs, svcName, err := k8s.ParseNameNS(nsName)
if err != nil {
glog.Warningf("%v", err)
continue
}
svc, err := n.store.GetService(nsName)
if err != nil {
glog.Warningf("Error getting Service %q: %v", nsName, err)
continue
}
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
if err != nil {
// not a port number, fall back to using port name
glog.V(3).Infof("Searching Endpoints with %v port name %q for Service %q", proto, svcPort, nsName)
for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort {
if sp.Protocol == proto {
endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints)
break
}
}
}
} else {
glog.V(3).Infof("Searching Endpoints with %v port number %d for Service %q", proto, targetPort, nsName)
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints)
break
}
}
}
}
// stream services cannot contain empty upstreams and there is
// no default backend equivalent
if len(endps) == 0 {
glog.Warningf("Service %q does not have any active Endpoint for %v port %v", nsName, proto, svcPort)
continue
}
svcs = append(svcs, ingress.L4Service{
Port: externalPort,
Backend: ingress.L4Backend{
Name: svcName,
Namespace: svcNs,
Port: intstr.FromString(svcPort),
Protocol: proto,
ProxyProtocol: svcProxyProtocol,
},
Endpoints: endps,
})
}
// Keep upstream order sorted to reduce unnecessary nginx config reloads.
sort.SliceStable(svcs, func(i, j int) bool {
return svcs[i].Port < svcs[j].Port
})
return svcs
}

// getDefaultUpstream returns the upstream associated with the default backend.
// Configures the upstream to return HTTP code 503 in case of error.
func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
Expand Down
42 changes: 41 additions & 1 deletion internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -112,6 +113,8 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File
config.EnableSSLChainCompletion,
config.Namespace,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,
config.DefaultSSLCertificate,
config.ResyncPeriod,
config.Client,
Expand Down Expand Up @@ -578,6 +581,8 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
Backends: ingressCfg.Backends,
PassthroughBackends: ingressCfg.PassthroughBackends,
Servers: ingressCfg.Servers,
TCPBackends: ingressCfg.TCPEndpoints,
UDPBackends: ingressCfg.UDPEndpoints,
HealthzURI: ngxHealthPath,
CustomErrors: len(cfg.CustomHTTPErrors) > 0,
Cfg: cfg,
Expand Down Expand Up @@ -789,6 +794,42 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
return err
}

streamSocket := "/tmp/ingress-stream.sock"
conn, err := net.Dial("unix", streamSocket)
if err != nil {
return err
}
defer conn.Close()

var streams []*ingress.Backend
for _, ep := range pcfg.TCPEndpoints {
key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, &ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
})
}
for _, ep := range pcfg.UDPEndpoints {
key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, &ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
})
}

buf, err := json.Marshal(streams)
if err != nil {
return err
}

_, err = conn.Write(buf)
if err != nil {
return err
}
fmt.Fprintf(conn, "\r\n")

if isDynamicCertificatesEnabled {
err = configureCertificates(pcfg, port)
if err != nil {
Expand Down Expand Up @@ -824,7 +865,6 @@ func post(url string, data interface{}) error {
}

glog.V(2).Infof("Posting to %s", url)

resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions internal/ingress/controller/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ type k8sStore struct {

// New creates a new object store to be used in the ingress controller
func New(checkOCSP bool,
namespace, configmap, defaultSSLCertificate string,
namespace, configmap, tcp, udp, defaultSSLCertificate string,
resyncPeriod time.Duration,
client clientset.Interface,
fs file.Filesystem,
Expand Down Expand Up @@ -473,7 +473,7 @@ func New(checkOCSP bool,
cm := obj.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cm)
// updates to configuration configmaps can trigger an update
if key == configmap {
if key == configmap || key == tcp || key == udp {
recorder.Eventf(cm, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("ConfigMap %v", key))
if key == configmap {
store.setConfig(cm)
Expand All @@ -489,7 +489,7 @@ func New(checkOCSP bool,
cm := cur.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cm)
// updates to configuration configmaps can trigger an update
if key == configmap {
if key == configmap || key == tcp || key == udp {
recorder.Eventf(cm, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", key))
if key == configmap {
store.setConfig(cm)
Expand Down
Loading

0 comments on commit 168f30d

Please sign in to comment.