diff --git a/pkg/mgmt/zfsnode/start.go b/pkg/mgmt/zfsnode/start.go index 8508d7404..8fb164bf4 100644 --- a/pkg/mgmt/zfsnode/start.go +++ b/pkg/mgmt/zfsnode/start.go @@ -19,11 +19,13 @@ package zfsnode import ( "context" "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/klog/v2" "os" + "strings" "sync" "time" @@ -66,32 +68,37 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { })) nodeName := os.Getenv("OPENEBS_NODE_NAME") - var searchLabel string - if nodeName == zfs.NodeID { - searchLabel = zfs.ZFSTopoNodenameKey - } else { - searchLabel = zfs.ZFSTopologyKey - } - topologyRequirement, requirementError := labels.NewRequirement(searchLabel, selection.Equals, []string{zfs.NodeID}) - if requirementError != nil { - return errors.Wrapf(requirementError, "Unable to retrieve node by %s for node id %s", zfs.ZFSTopologyKey, zfs.NodeID) - } - topologySelector := labels.NewSelector().Add(*topologyRequirement).String() - klog.Infof("The topology selector is %s", topologySelector) + var k8sNode v1.Node - k8sNodeCandidates, err := kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ - LabelSelector: topologySelector, - }) + if len(strings.TrimSpace(zfs.NodeID)) == 0 || nodeName == zfs.NodeID { + k8sNodeCandidate, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), zfs.NodeID, metav1.GetOptions{}) - if err != nil { - return errors.Wrapf(err, "fetch k8s node %s", zfs.NodeID) - } + if err != nil { + return errors.Wrapf(err, "fetch k8s node %s", zfs.NodeID) + } + + k8sNode = *k8sNodeCandidate - if k8sNodeCandidates == nil || len(k8sNodeCandidates.Items) != 1 { - return fmt.Errorf("unable to retrieve a single node by %s for %s", zfs.ZFSTopologyKey, zfs.NodeID) + } else { + topologyRequirement, requirementError := labels.NewRequirement(zfs.ZFSTopologyKey, selection.Equals, []string{zfs.NodeID}) + if requirementError != nil { + return errors.Wrapf(requirementError, "Unable to generate topology requirement by %s for node id %s", zfs.ZFSTopologyKey, zfs.NodeID) + } + topologySelector := labels.NewSelector().Add(*topologyRequirement).String() + klog.Infof("The topology selector is %s", topologySelector) + + k8sNodeCandidate, err := kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: topologySelector, + }) + if k8sNodeCandidate == nil || len(k8sNodeCandidate.Items) != 1 { + return fmt.Errorf("unable to retrieve a single node by %s for %s", zfs.ZFSTopologyKey, zfs.NodeID) + } + if err != nil { + return errors.Wrapf(err, "error trying to find node with label %s having value %s", zfs.ZFSTopologyKey, zfs.NodeID) + } + k8sNode = k8sNodeCandidate.Items[0] } - k8sNode := k8sNodeCandidates.Items[0] isTrue := true // as object returned by client go clears all TypeMeta from it. nodeGVK := &schema.GroupVersionKind{