Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rechecking pending Pods (conflict resolved) #375

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/crds/daemonset-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ rules:
- create
- patch
- update
- get
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ var _ = Describe("Whereabouts IP reconciler", func() {
Expect(err).NotTo(HaveOccurred())
})

It("cannot be reconciled", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).To(BeEmpty())
It("can be reconciled", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).NotTo(BeEmpty())
})
})
})
Expand Down
70 changes: 62 additions & 8 deletions pkg/reconciler/iploop.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,73 @@ func (rl *ReconcileLooper) findOrphanedIPsPerPool(ipPools []storage.IPPool) erro
func (rl ReconcileLooper) isPodAlive(podRef string, ip string) bool {
for livePodRef, livePod := range rl.liveWhereaboutsPods {
if podRef == livePodRef {
livePodIPs := livePod.ips
logging.Debugf(
"pod reference %s matches allocation; Allocation IP: %s; PodIPs: %s",
livePodRef,
ip,
livePodIPs)
_, isFound := livePodIPs[ip]
return isFound || livePod.phase == v1.PodPending
isFound := isIpOnPod(&livePod, podRef, ip)
if !isFound && (livePod.phase == v1.PodPending) {
/* Sometimes pods are still coming up, and may not yet have Multus
* annotation added to it yet. We don't want to check the IPs yet
* so re-fetch the Pod 5x
*/
podToMatch := &livePod
retries := 0

logging.Debugf("Re-fetching Pending Pod: %s IP-to-match: %s", livePodRef, ip)

for retries < storage.PodRefreshRetries {
retries += 1
podToMatch = rl.refreshPod(livePodRef)
if podToMatch == nil {
logging.Debugf("Cleaning up...")
return false
} else if podToMatch.phase != v1.PodPending {
logging.Debugf("Pending Pod is now in phase: %s", podToMatch.phase)
break
} else {
isFound = isIpOnPod(podToMatch, podRef, ip)
// Short-circuit - Pending Pod may have IP now
if isFound {
logging.Debugf("Pod now has IP annotation while in Pending")
return true
}
time.Sleep(time.Duration(500) * time.Millisecond)
}
}
isFound = isIpOnPod(podToMatch, podRef, ip)
}

return isFound
}
}
return false
}

func (rl ReconcileLooper) refreshPod(podRef string) *podWrapper {
namespace, podName := splitPodRef(podRef)
if namespace == "" || podName == "" {
logging.Errorf("Invalid podRef format: %s", podRef)
return nil
}

pod, err := rl.k8sClient.GetPod(namespace, podName)
if err != nil {
logging.Errorf("Failed to refresh Pod %s: %s\n", podRef, err)
return nil
}

wrappedPod := wrapPod(*pod)
logging.Debugf("Got refreshed pod: %v", wrappedPod)
return wrappedPod
}

func splitPodRef(podRef string) (string, string) {
namespacedName := strings.Split(podRef, "/")
if len(namespacedName) != 2 {
logging.Errorf("Failed to split podRef %s", podRef)
return "", ""
}

return namespacedName[0], namespacedName[1]
}

func composePodRef(pod v1.Pod) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/reconciler/wrappedPod.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@ func networkStatusFromPod(pod v1.Pod) string {
}
return networkStatusAnnotationValue
}

func isIpOnPod(livePod *podWrapper, podRef, ip string) bool {
livePodIPs := livePod.ips
logging.Debugf(
"pod reference %s matches allocation; Allocation IP: %s; PodIPs: %s",
podRef,
ip,
livePodIPs)
_, isFound := livePodIPs[ip]
return isFound
}
9 changes: 9 additions & 0 deletions pkg/storage/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ func (i *Client) ListPods(ctx context.Context) ([]v1.Pod, error) {
return podList.Items, nil
}

func (i *Client) GetPod(namespace, name string) (*v1.Pod, error) {
pod, err := i.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}

return pod, nil
}

func (i *Client) ListOverlappingIPs(ctx context.Context) ([]whereaboutsv1alpha1.OverlappingRangeIPReservation, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, storage.RequestTimeout)
defer cancel()
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var (
RequestTimeout = 10 * time.Second

// DatastoreRetries defines how many retries are attempted when updating the Pool
DatastoreRetries = 100
DatastoreRetries = 100
PodRefreshRetries = 3
)

// IPPool is the interface that represents an manageable pool of allocated IPs
Expand Down