Skip to content

Commit

Permalink
Replace options.KubeRouterConfig with config.Node and remove metrics/…
Browse files Browse the repository at this point in the history
…waitgroup stuff

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Feb 3, 2021
1 parent 07256cf commit 65c78cc
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 227 deletions.
7 changes: 6 additions & 1 deletion pkg/agent/netpol/namespace.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/namespace.go

// +build !windows

package netpol

import (
"reflect"

"github.com/golang/glog"
api "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
)

func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
Expand Down
63 changes: 63 additions & 0 deletions pkg/agent/netpol/netpol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// +build !windows

package netpol

import (
"context"

"github.com/rancher/k3s/pkg/agent/netpol/utils"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/sirupsen/logrus"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

// Run creates and starts a new instance of the kube-router network policy controller
// The code in this function is cribbed from the upstream controller at:
// https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/cmd/kube-router.go#L78
// The NewNetworkPolicyController function has also been modified to use the k3s config.Node struct instead of KubeRouter's
// CLI configuration, eliminate use of a WaitGroup for shutdown sequencing, and drop Prometheus metrics support.
func Run(ctx context.Context, nodeConfig *config.Node) error {
set, err := utils.NewIPSet(false)
if err != nil {
logrus.Warnf("Skipping network policy controller start, ipset unavailable: %v", err)
return nil
}

if err := set.Save(); err != nil {
logrus.Warnf("Skipping network policy controller start, ipset save failed: %v", err)
return nil
}

restConfig, err := clientcmd.BuildConfigFromFlags("", nodeConfig.AgentConfig.KubeConfigK3sController)
if err != nil {
return err
}

client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}

stopCh := ctx.Done()
informerFactory := informers.NewSharedInformerFactory(client, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)

npc, err := NewNetworkPolicyController(client, nodeConfig, podInformer, npInformer, nsInformer)
if err != nil {
return err
}

podInformer.AddEventHandler(npc.PodEventHandler)
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)

go npc.Run(stopCh)

return nil
}
59 changes: 0 additions & 59 deletions pkg/agent/netpol/network_policy.go

This file was deleted.

85 changes: 17 additions & 68 deletions pkg/agent/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
// Apache License v2.0 (copyright Cloud Native Labs & Rancher Labs)
// - modified from https://github.com/cloudnativelabs/kube-router/blob/ee9f6d890d10609284098229fa1e283ab5d83b93/pkg/controllers/netpol/network_policy_controller.go

// +build !windows

package netpol

import (
"crypto/sha256"
"encoding/base32"
"fmt"
"net"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/cloudnativelabs/kube-router/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/pkg/metrics"
"github.com/cloudnativelabs/kube-router/pkg/options"
"github.com/cloudnativelabs/kube-router/pkg/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/rancher/k3s/pkg/agent/netpol/utils"
"github.com/rancher/k3s/pkg/daemons/config"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
glog "k8s.io/klog"
)

const (
Expand All @@ -32,6 +33,7 @@ const (
kubeInputChainName = "KUBE-ROUTER-INPUT"
kubeForwardChainName = "KUBE-ROUTER-FORWARD"
kubeOutputChainName = "KUBE-ROUTER-OUTPUT"
defaultSyncPeriod = 5 * time.Minute
)

// Network policy controller provides both ingress and egress filtering for the pods as per the defined network
Expand All @@ -54,8 +56,6 @@ type NetworkPolicyController struct {
serviceNodePortRange string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}

ipSetHandler *utils.IPSet
Expand Down Expand Up @@ -131,23 +131,19 @@ type protocol2eps map[string]numericPort2eps
type namedPort2eps map[string]protocol2eps

// Run runs forever till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
defer wg.Done()

glog.Info("Starting network policy controller")
npc.healthChan = healthChan

// setup kube-router specific top level cutoms chains
npc.ensureTopLevelChains()

// Full syncs of the network policy controller take a lot of time and can only be processed one at a time,
// therefore, we start it in it's own goroutine and request a sync through a single item channel
glog.Info("Starting network policy controller full sync goroutine")
wg.Add(1)
go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
go func(fullSyncRequest <-chan struct{}, stopCh <-chan struct{}) {
for {
// Add an additional non-blocking select to ensure that if the stopCh channel is closed it is handled first
select {
Expand All @@ -165,7 +161,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle
npc.fullPolicySync() // fullPolicySync() is a blocking request here
}
}
}(npc.fullSyncRequestChan, stopCh, wg)
}(npc.fullSyncRequestChan, stopCh)

// loop forever till notified to stop on stopCh
for {
Expand Down Expand Up @@ -198,14 +194,10 @@ func (npc *NetworkPolicyController) fullPolicySync() {
npc.mu.Lock()
defer npc.mu.Unlock()

healthcheck.SendHeartBeat(npc.healthChan, "NPC")
start := time.Now()
syncVersion := strconv.FormatInt(start.UnixNano(), 10)
defer func() {
endTime := time.Since(start)
if npc.MetricsEnabled {
metrics.ControllerIptablesSyncTime.Observe(endTime.Seconds())
}
glog.V(1).Infof("sync iptables took %v", endTime)
}()

Expand Down Expand Up @@ -591,7 +583,7 @@ func (npc *NetworkPolicyController) Cleanup() {

// NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
config *config.Node, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{}

Expand All @@ -600,54 +592,11 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
// be up to date with all of the policy changes from any enqueued request after that
npc.fullSyncRequestChan = make(chan struct{}, 1)

// Validate and parse ClusterIP service range
_, ipnet, err := net.ParseCIDR(config.ClusterIPCIDR)
if err != nil {
return nil, fmt.Errorf("failed to get parse --service-cluster-ip-range parameter: %s", err.Error())
}
npc.serviceClusterIPRange = *ipnet

// Validate and parse NodePort range
nodePortValidator := regexp.MustCompile(`^([0-9]+)[:-]{1}([0-9]+)$`)
if matched := nodePortValidator.MatchString(config.NodePortRange); !matched {
return nil, fmt.Errorf("failed to parse node port range given: '%s' please see specification in help text", config.NodePortRange)
}
matches := nodePortValidator.FindStringSubmatch(config.NodePortRange)
if len(matches) != 3 {
return nil, fmt.Errorf("could not parse port number from range given: '%s'", config.NodePortRange)
}
port1, err := strconv.ParseInt(matches[1], 10, 16)
if err != nil {
return nil, fmt.Errorf("could not parse first port number from range given: '%s'", config.NodePortRange)
}
port2, err := strconv.ParseInt(matches[2], 10, 16)
if err != nil {
return nil, fmt.Errorf("could not parse second port number from range given: '%s'", config.NodePortRange)
}
if port1 >= port2 {
return nil, fmt.Errorf("port 1 is greater than or equal to port 2 in range given: '%s'", config.NodePortRange)
}
npc.serviceNodePortRange = fmt.Sprintf("%d:%d", port1, port2)

// Validate and parse ExternalIP service range
for _, externalIPRange := range config.ExternalIPCIDRs {
_, ipnet, err := net.ParseCIDR(externalIPRange)
if err != nil {
return nil, fmt.Errorf("failed to get parse --service-external-ip-range parameter: '%s'. Error: %s", externalIPRange, err.Error())
}
npc.serviceExternalIPRanges = append(npc.serviceExternalIPRanges, *ipnet)
}

if config.MetricsEnabled {
//Register the metrics for this controller
prometheus.MustRegister(metrics.ControllerIptablesSyncTime)
prometheus.MustRegister(metrics.ControllerPolicyChainsSyncTime)
npc.MetricsEnabled = true
}

npc.syncPeriod = config.IPTablesSyncPeriod
npc.serviceClusterIPRange = config.AgentConfig.ServiceCIDR
npc.serviceNodePortRange = strings.ReplaceAll(config.AgentConfig.ServiceNodePortRange.String(), "-", ":")
npc.syncPeriod = defaultSyncPeriod

node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
node, err := utils.GetNodeObject(clientset, config.AgentConfig.NodeName)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 65c78cc

Please sign in to comment.