Skip to content

Commit

Permalink
* scheduled: pvc must bound and its pod.spec.nodeName is not empty
Browse files Browse the repository at this point in the history
* emit event when scheduled failed
* more nice error message
  • Loading branch information
weekface committed Apr 25, 2019
1 parent e051465 commit 246c482
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 45 deletions.
43 changes: 41 additions & 2 deletions pkg/scheduler/predicates/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package predicates

import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand All @@ -27,18 +29,23 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
kubescheme "k8s.io/client-go/kubernetes/scheme"
eventv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)

type ha struct {
lock sync.Mutex
kubeCli kubernetes.Interface
cli versioned.Interface
podListFn func(ns, instanceName, component string) (*apiv1.PodList, error)
podGetFn func(ns, podName string) (*apiv1.Pod, error)
pvcGetFn func(ns, pvcName string) (*apiv1.PersistentVolumeClaim, error)
tcGetFn func(ns, tcName string) (*v1alpha1.TidbCluster, error)
pvcListFn func(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error)
updatePVCFn func(*apiv1.PersistentVolumeClaim) error
acquireLockFn func(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error)
recorder record.EventRecorder
}

// NewHA returns a Predicate
Expand All @@ -48,11 +55,19 @@ func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate {
cli: cli,
}
h.podListFn = h.realPodListFn
h.podGetFn = h.realPodGetFn
h.pvcGetFn = h.realPVCGetFn
h.tcGetFn = h.realTCGetFn
h.pvcListFn = h.realPVCListFn
h.updatePVCFn = h.realUpdatePVCFn
h.acquireLockFn = h.realAcquireLock

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(h.kubeCli.CoreV1().RESTClient()).Events("")})
h.recorder = eventBroadcaster.NewRecorder(kubescheme.Scheme, apiv1.EventSource{Component: "tidb-scheduler"})

return h
}

Expand Down Expand Up @@ -143,7 +158,9 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]
}

if len(minNodeNames) == 0 {
return nil, fmt.Errorf("can't find a node from: %v, nodeMap: %v", nodes, nodeMap)
msg := fmt.Sprintf("can't scheduled to nodes: %v, because these pods had been scheduled to nodes: %v", GetNodeNames(nodes), nodeMap)
h.recorder.Event(pod, apiv1.EventTypeWarning, "FailedScheduling", msg)
return nil, errors.New(msg)
}
return getNodeFromNames(nodes, minNodeNames), nil
}
Expand Down Expand Up @@ -180,7 +197,12 @@ func (h *ha) realAcquireLock(pod *apiv1.Pod) (*apiv1.PersistentVolumeClaim, *api
if schedulingPVC == currentPVC {
return schedulingPVC, currentPVC, nil
}
if schedulingPVC.Status.Phase != apiv1.ClaimBound {
schedulingPodName := getPodNameFromPVC(schedulingPVC)
schedulingPod, err := h.podGetFn(ns, schedulingPodName)
if err != nil {
return schedulingPVC, currentPVC, err
}
if schedulingPVC.Status.Phase != apiv1.ClaimBound || schedulingPod.Spec.NodeName == "" {
return schedulingPVC, currentPVC, fmt.Errorf("waiting for Pod %s/%s scheduling", ns, strings.TrimPrefix(schedulingPVC.GetName(), component))
}

Expand All @@ -199,6 +221,10 @@ func (h *ha) realPodListFn(ns, instanceName, component string) (*apiv1.PodList,
})
}

func (h *ha) realPodGetFn(ns, podName string) (*apiv1.Pod, error) {
return h.kubeCli.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{})
}

func (h *ha) realPVCListFn(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) {
selector := label.New().Instance(instanceName).Component(component).Labels()
return h.kubeCli.CoreV1().PersistentVolumeClaims(ns).List(metav1.ListOptions{
Expand Down Expand Up @@ -242,3 +268,16 @@ func getReplicasFrom(tc *v1alpha1.TidbCluster, component string) int32 {
func pvcName(component, podName string) string {
return fmt.Sprintf("%s-%s", component, podName)
}

func GetNodeNames(nodes []apiv1.Node) []string {
nodeNames := make([]string, 0)
for _, node := range nodes {
nodeNames = append(nodeNames, node.GetName())
}
sort.Strings(nodeNames)
return nodeNames
}

func getPodNameFromPVC(pvc *apiv1.PersistentVolumeClaim) string {
return strings.TrimPrefix(pvc.Name, fmt.Sprintf("%s-", pvc.Labels[label.ComponentLabelKey]))
}
Loading

0 comments on commit 246c482

Please sign in to comment.