Skip to content

Commit

Permalink
Merge pull request #309 from andreaskaris/nodename-from-downward-api
Browse files Browse the repository at this point in the history
Use downward API to pass current spec.nodeName to pod
  • Loading branch information
dougbtv authored Mar 21, 2023
2 parents 87bba87 + ef409bb commit d2dbf8a
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 25 deletions.
16 changes: 4 additions & 12 deletions cmd/controlloop/controlloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (

gocron "github.com/go-co-op/gocron"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
v1coreinformerfactory "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -132,15 +129,10 @@ func newPodController(stopChannel chan struct{}) (*controlloop.PodController, er
const noResyncPeriod = 0
ipPoolInformerFactory := wbinformers.NewSharedInformerFactory(wbClientSet, noResyncPeriod)
netAttachDefInformerFactory := nadinformers.NewSharedInformerFactory(nadK8sClientSet, noResyncPeriod)
podInformerFactory := v1coreinformerfactory.NewSharedInformerFactoryWithOptions(
k8sClientSet, noResyncPeriod, v1coreinformerfactory.WithTweakListOptions(
func(options *v1.ListOptions) {
const (
filterKey = "spec.nodeName"
hostnameEnvVariable = "HOSTNAME"
)
options.FieldSelector = fields.OneTermEqualSelector(filterKey, os.Getenv(hostnameEnvVariable)).String()
}))
podInformerFactory, err := controlloop.PodInformerFactory(k8sClientSet)
if err != nil {
return nil, err
}

controller := controlloop.NewPodController(
k8sClientSet,
Expand Down
10 changes: 10 additions & 0 deletions doc/crds/daemonset-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ rules:
verbs:
- list
- watch
- apiGroups: [""]
resources:
- nodes
verbs:
- get
- apiGroups: ["k8s.cni.cncf.io"]
resources:
- network-attachment-definitions
Expand Down Expand Up @@ -103,6 +108,11 @@ spec:
/ip-control-loop -log-level debug
image: ghcr.io/k8snetworkplumbingwg/whereabouts:latest-amd64
env:
- name: NODENAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
- name: WHEREABOUTS_NAMESPACE
valueFrom:
fieldRef:
Expand Down
9 changes: 6 additions & 3 deletions pkg/controlloop/dummy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package controlloop

import (
"context"
kubeClient "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes"
"net"

kubeClient "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1coreinformerfactory "k8s.io/client-go/informers"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -41,7 +41,10 @@ func newDummyPodController(
const noResyncPeriod = 0
netAttachDefInformerFactory := nadinformers.NewSharedInformerFactory(nadClient, noResyncPeriod)
wbInformerFactory := wbinformers.NewSharedInformerFactory(wbClient, noResyncPeriod)
podInformerFactory := v1coreinformerfactory.NewSharedInformerFactory(k8sClient, noResyncPeriod)
podInformerFactory, err := PodInformerFactory(k8sClient)
if err != nil {
return nil, err
}

podController := newPodController(
k8sClient,
Expand Down
13 changes: 12 additions & 1 deletion pkg/controlloop/entity_generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,24 @@ func dummyNonWhereaboutsIPAMNetSpec(networkName string) string {
}`, networkName)
}

func podSpec(name string, namespace string, networks ...string) *v1.Pod {
func nodeSpec(name string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
}

func podSpec(name string, namespace string, nodeName string, networks ...string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: podNetworkSelectionElements(networks...),
},
Spec: v1.PodSpec{
NodeName: nodeName,
},
}
}

Expand Down
28 changes: 27 additions & 1 deletion pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/kubernetes"
"net"
"os"
"strconv"
"strings"
"time"

"k8s.io/client-go/kubernetes"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
v1coreinformerfactory "k8s.io/client-go/informers"
v1corelisters "k8s.io/client-go/listers/core/v1"
Expand All @@ -22,6 +25,7 @@ import (
nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
nadinformers "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
nadlister "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
"github.com/pkg/errors"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/allocate"
whereaboutsv1alpha1 "github.com/k8snetworkplumbingwg/whereabouts/pkg/api/whereabouts.cni.cncf.io/v1alpha1"
Expand All @@ -47,6 +51,12 @@ const (
addressGarbageCollectionFailed = "IPAddressGarbageCollectionFailed"
)

const (
podControllerFilterKey = "spec.nodeName"
podControllerNodeNameEnvVariable = "NODENAME"
noResyncPeriod = 0
)

type garbageCollector func(ctx context.Context, mode int, ipamConf types.IPAMConfig, client *wbclient.KubernetesIPAM) ([]net.IPNet, error)

type PodController struct {
Expand All @@ -73,6 +83,22 @@ func NewPodController(k8sCoreClient kubernetes.Interface, wbClient wbclientset.I
return newPodController(k8sCoreClient, wbClient, k8sCoreInformerFactory, wbSharedInformerFactory, netAttachDefInformerFactory, broadcaster, recorder, wbclient.IPManagement)
}

// PodInformerFactory is a wrapper around NewSharedInformerFactoryWithOptions. Before returning the informer, it will
// extract the node name from environment variable "NODENAME". It will then try to look up the node with the given name.
// On success, it will create an informer that filters all pods with spec.nodeName == <value of env NODENAME>.
func PodInformerFactory(k8sClientSet kubernetes.Interface) (v1coreinformerfactory.SharedInformerFactory, error) {
nodeName := os.Getenv(podControllerNodeNameEnvVariable)
logging.Debugf("Filtering pods with filter key '%s' and filter value '%s'", podControllerFilterKey, nodeName)
if _, err := k8sClientSet.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("Could not find node with node name '%s'.", nodeName))
}
return v1coreinformerfactory.NewSharedInformerFactoryWithOptions(
k8sClientSet, noResyncPeriod, v1coreinformerfactory.WithTweakListOptions(
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector(podControllerFilterKey, nodeName).String()
})), nil
}

func newPodController(k8sCoreClient kubernetes.Interface, wbClient wbclientset.Interface, k8sCoreInformerFactory v1coreinformerfactory.SharedInformerFactory, wbSharedInformerFactory wbinformers.SharedInformerFactory, netAttachDefInformerFactory nadinformers.SharedInformerFactory, broadcaster record.EventBroadcaster, recorder record.EventRecorder, cleanupFunc garbageCollector) *PodController {
k8sPodFilteredInformer := k8sCoreInformerFactory.Core().V1().Pods()
ipPoolInformer := wbSharedInformerFactory.Whereabouts().V1alpha1().IPPools()
Expand Down
70 changes: 62 additions & 8 deletions pkg/controlloop/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
. "github.com/onsi/gomega"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sclient "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -56,20 +57,59 @@ var _ = Describe("IPControlLoop", func() {
Expect(os.RemoveAll(cniConfigDir)).To(Succeed())
})

Context("a running pod", func() {
Context("a running pod on a node", func() {
const (
networkName = "meganet"
podName = "tiny-winy-pod"
nodeName = "hypernode"
)

var (
k8sClient k8sclient.Interface
pod *v1.Pod
k8sClient k8sclient.Interface
pod *v1.Pod
node *v1.Node
dummyPodController *dummyPodController
podControllerError error
)

BeforeEach(func() {
pod = podSpec(podName, namespace, networkName)
k8sClient = fakek8sclient.NewSimpleClientset(pod)
pod = podSpec(podName, namespace, nodeName, networkName)
node = nodeSpec(nodeName)
k8sClient = fakek8sclient.NewSimpleClientset(pod, node)
os.Setenv("NODENAME", nodeName)
})

When("NODENAME is set to an invalid value", func() {
var (
wbClient wbclient.Interface
eventRecorder *record.FakeRecorder
netAttachDefClient nadclient.Interface
stopChannel chan struct{}
)

BeforeEach(func() {
os.Setenv("NODENAME", "invalid-node-name")

stopChannel = make(chan struct{})
wbClient = fakewbclient.NewSimpleClientset()
netAttachDefClient, podControllerError = newFakeNetAttachDefClient(namespace, netAttachDef(networkName, namespace, dummyNonWhereaboutsIPAMNetSpec(networkName)))
Expect(podControllerError).NotTo(HaveOccurred())

const maxEvents = 1
eventRecorder = record.NewFakeRecorder(maxEvents)
})

It("should fail", func() {
_, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(errors.IsNotFound(podControllerError)).Should(BeTrue())
})

AfterEach(func() {
if podControllerError != nil {
return
}
stopChannel <- struct{}{}
})
})

Context("IPPool featuring an allocation for the pod", func() {
Expand Down Expand Up @@ -98,7 +138,10 @@ var _ = Describe("IPControlLoop", func() {
const maxEvents = 10
stopChannel = make(chan struct{})
eventRecorder = record.NewFakeRecorder(maxEvents)
Expect(newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)).NotTo(BeNil())

dummyPodController, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(podControllerError).NotTo(HaveOccurred())
Expect(dummyPodController).NotTo(BeNil())

// assure the pool features an allocated address
ipPool, err := wbClient.WhereaboutsV1alpha1().IPPools(dummyNetworkPool.GetNamespace()).Get(context.TODO(), dummyNetworkPool.GetName(), metav1.GetOptions{})
Expand All @@ -107,6 +150,9 @@ var _ = Describe("IPControlLoop", func() {
})

AfterEach(func() {
if podControllerError != nil {
return
}
stopChannel <- struct{}{}
})

Expand Down Expand Up @@ -145,7 +191,9 @@ var _ = Describe("IPControlLoop", func() {

const maxEvents = 10
eventRecorder = record.NewFakeRecorder(maxEvents)
Expect(newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)).NotTo(BeNil())
dummyPodController, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(podControllerError).NotTo(HaveOccurred())
Expect(dummyPodController).NotTo(BeNil())

// assure the pool features an allocated address
ipPool, err := wbClient.WhereaboutsV1alpha1().IPPools(dummyNetworkPool.GetNamespace()).Get(context.TODO(), dummyNetworkPool.GetName(), metav1.GetOptions{})
Expand Down Expand Up @@ -196,10 +244,16 @@ var _ = Describe("IPControlLoop", func() {

const maxEvents = 1
eventRecorder = record.NewFakeRecorder(maxEvents)
Expect(newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)).NotTo(BeNil())

dummyPodController, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(podControllerError).NotTo(HaveOccurred())
Expect(dummyPodController).NotTo(BeNil())
})

AfterEach(func() {
if podControllerError != nil {
return
}
stopChannel <- struct{}{}
})

Expand Down

0 comments on commit d2dbf8a

Please sign in to comment.