Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

respect PodCIDR of node #78

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func NewController(conf Config) (*Controller, error) {
LockPodParallelism: 16,
DeletePodParallelism: 16,
NodeHasFunc: nodes.Has, // just handle pods that are on nodes we have
NodeInfoGetFunc: nodes.Get, // get node info from the node controller
Logger: conf.Logger,
FuncMap: funcMap,
})
Expand Down
12 changes: 8 additions & 4 deletions pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type NodeController struct {
manageNodesWithLabelSelector string
nodeSelectorFunc func(node *corev1.Node) bool
lockPodsOnNodeFunc func(ctx context.Context, nodeName string) error
nodesSets *stringSets
nodesSets *nodeSets
nodeHeartbeatTemplate string
nodeStatusTemplate string
renderer *renderer
Expand Down Expand Up @@ -102,7 +102,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
manageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector,
lockPodsOnNodeFunc: conf.LockPodsOnNodeFunc,
nodeIP: conf.NodeIP,
nodesSets: newStringSets(),
nodesSets: newNodeSets(),
logger: log,
nodeHeartbeatTemplate: conf.NodeHeartbeatTemplate,
nodeStatusTemplate: conf.NodeStatusTemplate + "\n" + conf.NodeHeartbeatTemplate,
Expand Down Expand Up @@ -254,7 +254,7 @@ func (c *NodeController) WatchNodes(ctx context.Context, ch chan<- string, opt m
case watch.Added, watch.Modified:
node := event.Object.(*corev1.Node)
if c.needHeartbeat(node) {
c.nodesSets.Put(node.Name)
c.nodesSets.Put(node.Name, node.DeepCopy())
if c.needLockNode(node) {
ch <- node.Name
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func (c *NodeController) ListNodes(ctx context.Context, ch chan<- string, opt me
return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error {
node := obj.(*corev1.Node)
if c.needHeartbeat(node) {
c.nodesSets.Put(node.Name)
c.nodesSets.Put(node.Name, node.DeepCopy())
if c.needLockNode(node) {
ch <- node.Name
}
Expand Down Expand Up @@ -393,6 +393,10 @@ func (c *NodeController) Has(nodeName string) bool {
return c.nodesSets.Has(nodeName)
}

func (c *NodeController) Get(nodeName string) *nodeInfo {
return c.nodesSets.Get(nodeName)
}

func (c *NodeController) Size() int {
return c.nodesSets.Size()
}
56 changes: 46 additions & 10 deletions pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type PodController struct {
nodeIP string
cidrIPNet *net.IPNet
nodeHasFunc func(nodeName string) bool
nodeInfoGetFunc func(nodeName string) *nodeInfo
ipPool *ipPool
podStatusTemplate string
logger logger.Logger
Expand All @@ -72,6 +73,7 @@ type PodControllerConfig struct {
NodeIP string
CIDR string
NodeHasFunc func(nodeName string) bool
NodeInfoGetFunc func(nodeName string) *nodeInfo
PodStatusTemplate string
Logger logger.Logger
LockPodParallelism int
Expand Down Expand Up @@ -109,6 +111,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
cidrIPNet: cidrIPNet,
ipPool: newIPPool(cidrIPNet),
nodeHasFunc: conf.NodeHasFunc,
nodeInfoGetFunc: conf.NodeInfoGetFunc,
logger: log,
podStatusTemplate: conf.PodStatusTemplate,
lockPodChan: make(chan *corev1.Pod),
Expand All @@ -120,7 +123,11 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
"NodeIP": func() string {
return n.nodeIP
},
"PodIP": func() string {
"PodIP": func(nodeName string) string {
nodeInfo := n.nodeInfoGetFunc(nodeName)
if nodeInfo != nil && nodeInfo.IPPool != nil {
return nodeInfo.IPPool.Get()
}
return n.ipPool.Get()
},
}
Expand Down Expand Up @@ -305,12 +312,10 @@ func (c *PodController) WatchPods(ctx context.Context, lockChan, deleteChan chan
}
case watch.Deleted:
pod := event.Object.(*corev1.Pod)
if c.nodeHasFunc(pod.Spec.NodeName) {
// Recycling PodIP
if pod.Status.PodIP != "" && c.cidrIPNet.Contains(net.ParseIP(pod.Status.PodIP)) {
c.ipPool.Put(pod.Status.PodIP)
}
}
if c.nodeHasFunc(pod.Spec.NodeName) {
// Recycling PodIP
c.reclaimPodIP(pod)
carlory marked this conversation as resolved.
Show resolved Hide resolved
}
}
case <-ctx.Done():
watcher.Stop()
Expand Down Expand Up @@ -347,9 +352,7 @@ func (c *PodController) LockPodsOnNode(ctx context.Context, nodeName string) err
func (c *PodController) configurePod(pod *corev1.Pod) ([]byte, error) {

// Mark the pod IP that existed before the kubelet was started
if c.cidrIPNet.Contains(net.ParseIP(pod.Status.PodIP)) {
c.ipPool.Use(pod.Status.PodIP)
}
c.markPodIP(pod)

patch, err := c.computePatchData(pod, c.podStatusTemplate)
if err != nil {
Expand Down Expand Up @@ -400,3 +403,36 @@ func (c *PodController) computePatchData(pod *corev1.Pod, temp string) ([]byte,

return patch, nil
}
func (c *PodController) markPodIP(pod *corev1.Pod) {
if c.cidrIPNet.Contains(net.ParseIP(pod.Status.PodIP)) {
c.ipPool.Use(pod.Status.PodIP)
}

nodeInfo := c.nodeInfoGetFunc(pod.Spec.NodeName)
if nodeInfo == nil || nodeInfo.CidrIPNet == nil || nodeInfo.IPPool == nil {
return
}

if nodeInfo.CidrIPNet.Contains(net.ParseIP(pod.Status.PodIP)) {
nodeInfo.IPPool.Use(pod.Status.PodIP)
}
}

func (c *PodController) reclaimPodIP(pod *corev1.Pod) {
nodeInfo := c.nodeInfoGetFunc(pod.Spec.NodeName)
if nodeInfo == nil || pod.Status.PodIP == "" {
return
}

if c.cidrIPNet.Contains(net.ParseIP(pod.Status.PodIP)) {
c.ipPool.Put(pod.Status.PodIP)
}

if nodeInfo.CidrIPNet == nil || nodeInfo.IPPool == nil {
return
}

if nodeInfo.CidrIPNet.Contains(net.ParseIP(pod.Status.PodIP)) {
nodeInfo.IPPool.Put(pod.Status.PodIP)
}
}
130 changes: 130 additions & 0 deletions pkg/kwok/controllers/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func TestPodController(t *testing.T) {
nodeHasFunc := func(nodeName string) bool {
return strings.HasPrefix(nodeName, "node")
}

nodeInfoGetFunc := func(nodeName string) *nodeInfo {
if nodeHasFunc(nodeName) {
return &nodeInfo{}
}
return nil
}

annotationSelector, _ := labels.Parse("fake=custom")
pods, err := NewPodController(PodControllerConfig{
ClientSet: clientset,
Expand All @@ -78,6 +86,7 @@ func TestPodController(t *testing.T) {
DisregardStatusWithAnnotationSelector: annotationSelector.String(),
PodStatusTemplate: templates.DefaultPodStatusTemplate,
NodeHasFunc: nodeHasFunc,
NodeInfoGetFunc: nodeInfoGetFunc,
FuncMap: funcMap,
LockPodParallelism: 2,
DeletePodParallelism: 2,
Expand Down Expand Up @@ -174,3 +183,124 @@ func TestPodController(t *testing.T) {
}
}
}

func TestPodControllerIPPool(t *testing.T) {
clientset := fake.NewSimpleClientset()

nodeHasFunc := func(nodeName string) bool {
return strings.HasPrefix(nodeName, "node")
}

node1PodCIDR := "10.0.1.1/24"
node1PodNet, _ := parseCIDR(node1PodCIDR)
node1Info := &nodeInfo{
CidrIPNet: node1PodNet,
IPPool: newIPPool(node1PodNet),
}
nodeInfoGetFunc := func(nodeName string) *nodeInfo {
if nodeName == "node0" {
return &nodeInfo{}
}
return node1Info
}

podCIDR := "10.0.0.1/24"
pods, err := NewPodController(PodControllerConfig{
ClientSet: clientset,
NodeIP: "10.0.0.1",
CIDR: podCIDR,
PodStatusTemplate: templates.DefaultPodStatusTemplate,
NodeHasFunc: nodeHasFunc,
NodeInfoGetFunc: nodeInfoGetFunc,
FuncMap: funcMap,
LockPodParallelism: 2,
DeletePodParallelism: 2,
Logger: testingLogger{t},
})
if err != nil {
t.Fatal(fmt.Errorf("new pods controller error: %w", err))
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
t.Cleanup(func() {
cancel()
time.Sleep(time.Second)
})

err = pods.Start(ctx)
if err != nil {
t.Fatal(fmt.Errorf("start pods controller error: %w", err))
}

var genPod = func(podName, nodeName string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: "default",
CreationTimestamp: metav1.Now(),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
NodeName: nodeName,
},
}
}

clientset.CoreV1().Pods("default").Create(ctx, genPod("pod0", "node0"), metav1.CreateOptions{})

// sleep 2 seconds to wait for pod0 to be assigned an IP
time.Sleep(2 * time.Second)

pod0, err := clientset.CoreV1().Pods("default").Get(ctx, "pod0", metav1.GetOptions{})
if err != nil {
t.Fatal(fmt.Errorf("get pod0 error: %w", err))
}

// check if pod0 ip is in default ip cidr
pod0IP := pod0.Status.PodIP
if pod0IP == "" {
t.Fatal(fmt.Errorf("want pod %s to be assign an IP, but got nothing", pod0.Name))
}
if !pods.ipPool.InUsed(pod0IP) {
t.Fatal(fmt.Errorf("want pod %s ip in %s, but got %s", pod0.Name, podCIDR, pod0IP))
}

clientset.CoreV1().Pods("default").Create(ctx, genPod("pod1", "node1"), metav1.CreateOptions{})

// sleep 2 seconds to wait for pod0 to be assigned an IP
time.Sleep(2 * time.Second)

pod1, err := clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{})
if err != nil {
t.Fatal(fmt.Errorf("get pod1 error: %w", err))
}

// check if pod1 ip is in node pod cidr
pod1IP := pod1.Status.PodIP
if pod1IP == "" {
t.Fatal(fmt.Errorf("want pod %s to be assign an IP, but got nothing", pod1.Name))
}
if !node1Info.IPPool.InUsed(pod1IP) {
t.Fatal(fmt.Errorf("want pod %s ip in %s, but got %s", pod1.Name, node1PodCIDR, pod1IP))
}

clientset.CoreV1().Pods("default").Delete(ctx, "pod0", metav1.DeleteOptions{})
// sleep 2 seconds to wait for pod0 to be deleted
time.Sleep(2 * time.Second)
if pods.ipPool.InUsed(pod0IP) {
t.Fatal(fmt.Errorf("want pod0 ip to be reclaimed, but got %s in use", pod0IP))
}

clientset.CoreV1().Pods("default").Delete(ctx, "pod1", metav1.DeleteOptions{})
// sleep 2 seconds to wait for pod1 to be deleted
time.Sleep(2 * time.Second)
if node1Info.IPPool.InUsed(pod1IP) {
t.Fatal(fmt.Errorf("want pod1 ip to be reclaimed, but got %s in use", pod1IP))
}
}
3 changes: 2 additions & 1 deletion pkg/kwok/controllers/templates/pod.status.tpl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{{ $startTime := .metadata.creationTimestamp }}
{{ $nodeName := .spec.nodeName }}

conditions:
- lastTransitionTime: {{ $startTime }}
Expand Down Expand Up @@ -46,7 +47,7 @@ initContainerStatuses:

{{ with .status }}
hostIP: {{ with .hostIP }} {{ . }} {{ else }} {{ NodeIP }} {{ end }}
podIP: {{ with .podIP }} {{ . }} {{ else }} {{ PodIP }} {{ end }}
podIP: {{ with .podIP }} {{ . }} {{ else }} {{ PodIP $nodeName }} {{ end }}
{{ end }}

phase: Running
Expand Down
Loading