From 246c4824c479916be55c9e5f4b150d4f38db42af Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 25 Apr 2019 17:51:37 +0800 Subject: [PATCH 1/2] * scheduled: pvc must bound and its pod.spec.nodeName is not empty * emit event when scheduled failed * more nice error message --- pkg/scheduler/predicates/ha.go | 43 +++++- pkg/scheduler/predicates/ha_test.go | 200 +++++++++++++++++++++++----- pkg/scheduler/scheduler.go | 12 +- tests/cmd/stability/main.go | 2 +- 4 files changed, 212 insertions(+), 45 deletions(-) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index b530d35e8a..484e38caff 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -14,7 +14,9 @@ package predicates import ( + "errors" "fmt" + "sort" "strings" "sync" "time" @@ -27,6 +29,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" + kubescheme "k8s.io/client-go/kubernetes/scheme" + eventv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" ) type ha struct { @@ -34,11 +39,13 @@ type ha struct { kubeCli kubernetes.Interface cli versioned.Interface podListFn func(ns, instanceName, component string) (*apiv1.PodList, error) + podGetFn func(ns, podName string) (*apiv1.Pod, error) pvcGetFn func(ns, pvcName string) (*apiv1.PersistentVolumeClaim, error) tcGetFn func(ns, tcName string) (*v1alpha1.TidbCluster, error) pvcListFn func(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) updatePVCFn func(*apiv1.PersistentVolumeClaim) error acquireLockFn func(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) + recorder record.EventRecorder } // NewHA returns a Predicate @@ -48,11 +55,19 @@ func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate { cli: cli, } h.podListFn = h.realPodListFn + h.podGetFn = h.realPodGetFn h.pvcGetFn = h.realPVCGetFn h.tcGetFn = h.realTCGetFn h.pvcListFn = h.realPVCListFn h.updatePVCFn = h.realUpdatePVCFn h.acquireLockFn = h.realAcquireLock + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{ + Interface: eventv1.New(h.kubeCli.CoreV1().RESTClient()).Events("")}) + h.recorder = eventBroadcaster.NewRecorder(kubescheme.Scheme, apiv1.EventSource{Component: "tidb-scheduler"}) + return h } @@ -143,7 +158,9 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] } if len(minNodeNames) == 0 { - return nil, fmt.Errorf("can't find a node from: %v, nodeMap: %v", nodes, nodeMap) + msg := fmt.Sprintf("can't scheduled to nodes: %v, because these pods had been scheduled to nodes: %v", GetNodeNames(nodes), nodeMap) + h.recorder.Event(pod, apiv1.EventTypeWarning, "FailedScheduling", msg) + return nil, errors.New(msg) } return getNodeFromNames(nodes, minNodeNames), nil } @@ -180,7 +197,12 @@ func (h *ha) realAcquireLock(pod *apiv1.Pod) (*apiv1.PersistentVolumeClaim, *api if schedulingPVC == currentPVC { return schedulingPVC, currentPVC, nil } - if schedulingPVC.Status.Phase != apiv1.ClaimBound { + schedulingPodName := getPodNameFromPVC(schedulingPVC) + schedulingPod, err := h.podGetFn(ns, schedulingPodName) + if err != nil { + return schedulingPVC, currentPVC, err + } + if schedulingPVC.Status.Phase != apiv1.ClaimBound || schedulingPod.Spec.NodeName == "" { return schedulingPVC, currentPVC, fmt.Errorf("waiting for Pod %s/%s scheduling", ns, strings.TrimPrefix(schedulingPVC.GetName(), component)) } @@ -199,6 +221,10 @@ func (h *ha) realPodListFn(ns, instanceName, component string) (*apiv1.PodList, }) } +func (h *ha) realPodGetFn(ns, podName string) (*apiv1.Pod, error) { + return h.kubeCli.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{}) +} + func (h *ha) realPVCListFn(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) { selector := label.New().Instance(instanceName).Component(component).Labels() return h.kubeCli.CoreV1().PersistentVolumeClaims(ns).List(metav1.ListOptions{ @@ -242,3 +268,16 @@ func getReplicasFrom(tc *v1alpha1.TidbCluster, component string) int32 { func pvcName(component, podName string) string { return fmt.Sprintf("%s-%s", component, podName) } + +func GetNodeNames(nodes []apiv1.Node) []string { + nodeNames := make([]string, 0) + for _, node := range nodes { + nodeNames = append(nodeNames, node.GetName()) + } + sort.Strings(nodeNames) + return nodeNames +} + +func getPodNameFromPVC(pvc *apiv1.PersistentVolumeClaim) string { + return strings.TrimPrefix(pvc.Name, fmt.Sprintf("%s-", pvc.Labels[label.ComponentLabelKey])) +} diff --git a/pkg/scheduler/predicates/ha_test.go b/pkg/scheduler/predicates/ha_test.go index 0d589dd7c4..26a5b89da4 100644 --- a/pkg/scheduler/predicates/ha_test.go +++ b/pkg/scheduler/predicates/ha_test.go @@ -27,6 +27,7 @@ import ( apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" ) func TestMapAndIntNil(t *testing.T) { @@ -47,6 +48,7 @@ func TestHARealAcquireLockFn(t *testing.T) { type testcase struct { name string podFn func(string, string, int32) *apiv1.Pod + podGetFn func(string, string) (*apiv1.Pod, error) pvcListFn func(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) updatePVCFn func(*apiv1.PersistentVolumeClaim) error expectFn func(*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) @@ -60,6 +62,10 @@ func TestHARealAcquireLockFn(t *testing.T) { ha := ha{ pvcListFn: test.pvcListFn, updatePVCFn: test.updatePVCFn, + podGetFn: podGetScheduled(), + } + if test.podGetFn != nil { + ha.podGetFn = test.podGetFn } pod := test.podFn(instanceName, clusterName, 0) @@ -203,6 +209,39 @@ func TestHARealAcquireLockFn(t *testing.T) { g.Expect(schedulingPVC).To(Equal(currentPVC)) }, }, + { + name: "get scheduling pvc's pod error", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + Annotations: map[string]string{label.AnnPVCPodScheduling: "true"}, + }, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, + }, + }, + }, nil + }, + podGetFn: podGetErr(), + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(schedulingPVC.Annotations[label.AnnPVCPodScheduling]).NotTo(BeEmpty()) + g.Expect(strings.Contains(err.Error(), "get pod failed")).To(BeTrue()) + }, + }, { name: "scheduling pvc is not bound", podFn: newHAPDPod, @@ -234,6 +273,42 @@ func TestHARealAcquireLockFn(t *testing.T) { g.Expect(strings.Contains(err.Error(), "waiting for Pod ")).To(BeTrue()) }, }, + { + name: "scheduling pvc is bound, but pod not scheduled", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + Annotations: map[string]string{label.AnnPVCPodScheduling: "true"}, + }, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, + }, + }, + }, nil + }, + podGetFn: podGetNotScheduled(), + updatePVCFn: func(claim *corev1.PersistentVolumeClaim) error { + return nil + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(schedulingPVC.Annotations[label.AnnPVCPodScheduling]).NotTo(BeEmpty()) + g.Expect(strings.Contains(err.Error(), "waiting for Pod ")).To(BeTrue()) + }, + }, { name: "scheduling pvc is bound, update pvc failed", podFn: newHAPDPod, @@ -318,10 +393,11 @@ func TestHAFilter(t *testing.T) { podFn func(string, string, int32) *apiv1.Pod nodesFn func() []apiv1.Node podListFn func(string, string, string) (*apiv1.PodList, error) + podGetFn func(string, string) (*apiv1.Pod, error) pvcGetFn func(string, string) (*apiv1.PersistentVolumeClaim, error) tcGetFn func(string, string) (*v1alpha1.TidbCluster, error) acquireLockFn func(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) - expectFn func([]apiv1.Node, error) + expectFn func([]apiv1.Node, error, record.FakeRecorder) } testFn := func(test *testcase, t *testing.T) { @@ -331,9 +407,17 @@ func TestHAFilter(t *testing.T) { pod := test.podFn(instanceName, clusterName, 0) nodes := test.nodesFn() + recorder := record.NewFakeRecorder(10) - ha := ha{podListFn: test.podListFn, pvcGetFn: test.pvcGetFn, tcGetFn: test.tcGetFn, acquireLockFn: test.acquireLockFn} - test.expectFn(ha.Filter(instanceName, pod, nodes)) + ha := ha{ + podListFn: test.podListFn, + pvcGetFn: test.pvcGetFn, + tcGetFn: test.tcGetFn, + acquireLockFn: test.acquireLockFn, + recorder: recorder, + } + n, err := ha.Filter(instanceName, pod, nodes) + test.expectFn(n, err, *recorder) } tests := []testcase{ @@ -349,7 +433,7 @@ func TestHAFilter(t *testing.T) { }, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -369,7 +453,7 @@ func TestHAFilter(t *testing.T) { acquireLockFn: func(pod *corev1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { return nil, nil, fmt.Errorf("failed to acquire the lock") }, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "failed to acquire the lock")).To(BeTrue()) }, @@ -382,7 +466,7 @@ func TestHAFilter(t *testing.T) { return nil, fmt.Errorf("get pvc failed") }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "get pvc failed")).To(BeTrue()) }, @@ -393,7 +477,7 @@ func TestHAFilter(t *testing.T) { nodesFn: fakeThreeNodes, podListFn: podListErr(), acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "list pods failed")).To(BeTrue()) }, @@ -407,7 +491,7 @@ func TestHAFilter(t *testing.T) { return nil, fmt.Errorf("get tidbcluster failed") }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "get tidbcluster failed")).To(BeTrue()) }, @@ -419,7 +503,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "kube nodes is empty")).To(BeTrue()) }, @@ -438,7 +522,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetOneReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -458,7 +542,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetOneReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2"})) @@ -478,7 +562,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetTwoReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -498,7 +582,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetTwoReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2"})) @@ -518,7 +602,7 @@ func TestHAFilter(t *testing.T) { acquireLockFn: acquireSuccess, podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -538,9 +622,13 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {1}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) + events := collectEvents(recorder.Events) + g.Expect(events).To(HaveLen(1)) + g.Expect(events[0]).To(ContainSubstring("FailedScheduling")) + g.Expect(strings.Contains(err.Error(), "can't scheduled to nodes:")).To(BeTrue()) + g.Expect(len(nodes)).To(Equal(0)) }, }, { @@ -550,7 +638,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2"})) @@ -563,9 +651,12 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) + events := collectEvents(recorder.Events) + g.Expect(events).To(HaveLen(1)) + g.Expect(events[0]).To(ContainSubstring("FailedScheduling")) + g.Expect(strings.Contains(err.Error(), "can't scheduled to nodes:")).To(BeTrue()) g.Expect(len(nodes)).To(Equal(0)) }, }, @@ -576,7 +667,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) @@ -589,7 +680,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3"})) @@ -602,7 +693,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(nodes[0].Name).To(Equal("kube-node-3")) @@ -615,7 +706,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-4": {4}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) @@ -632,9 +723,12 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 4 return tc, nil }, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) + events := collectEvents(recorder.Events) + g.Expect(events).To(HaveLen(1)) + g.Expect(events[0]).To(ContainSubstring("FailedScheduling")) + g.Expect(strings.Contains(err.Error(), "can't scheduled to nodes:")).To(BeTrue()) g.Expect(len(nodes)).To(Equal(0)) }, }, @@ -649,7 +743,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) @@ -666,7 +760,7 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 5 return tc, nil }, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3"})) @@ -683,7 +777,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(nodes[0].Name).To(Equal("kube-node-4")) @@ -700,7 +794,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(4)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3", "kube-node-4"})) @@ -717,7 +811,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(4)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3", "kube-node-4"})) @@ -734,7 +828,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3", "kube-node-4"})) @@ -751,7 +845,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error) { + expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2"})) @@ -806,6 +900,34 @@ func podListErr() func(string, string, string) (*apiv1.PodList, error) { } } +func podGetErr() func(string, string) (*apiv1.Pod, error) { + return func(ns, podName string) (*apiv1.Pod, error) { + return nil, errors.New("get pod failed") + } +} + +func podGetScheduled() func(string, string) (*apiv1.Pod, error) { + return func(ns, podName string) (*apiv1.Pod, error) { + return &apiv1.Pod{ + TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, + Spec: apiv1.PodSpec{ + NodeName: "node-1", + }, + }, nil + } +} + +func podGetNotScheduled() func(string, string) (*apiv1.Pod, error) { + return func(ns, podName string) (*apiv1.Pod, error) { + return &apiv1.Pod{ + TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, + Spec: apiv1.PodSpec{ + NodeName: "", + }, + }, nil + } +} + func tcGetFn(ns string, tcName string) (*v1alpha1.TidbCluster, error) { return &v1alpha1.TidbCluster{ TypeMeta: metav1.TypeMeta{Kind: "TidbCluster", APIVersion: "v1alpha1"}, @@ -857,3 +979,17 @@ func getSortedNodeNames(nodes []apiv1.Node) []string { func acquireSuccess(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { return nil, nil, nil } + +func collectEvents(source <-chan string) []string { + done := false + events := make([]string, 0) + for !done { + select { + case event := <-source: + events = append(events, event) + default: + done = true + } + } + return events +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 5044c57c24..347df990f0 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -75,12 +75,12 @@ func (s *scheduler) Filter(args *schedulerapiv1.ExtenderArgs) (*schedulerapiv1.E glog.Infof("scheduling pod: %s/%s", ns, podName) var err error for _, predicate := range s.predicates { - glog.V(4).Infof("entering predicate: %s, nodes: %v", predicate.Name(), getNodeNames(kubeNodes)) + glog.Infof("entering predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) kubeNodes, err = predicate.Filter(instanceName, pod, kubeNodes) if err != nil { return nil, err } - glog.V(4).Infof("leaving predicate: %s, nodes: %v", predicate.Name(), getNodeNames(kubeNodes)) + glog.Infof("leaving predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) } return &schedulerapiv1.ExtenderFilterResult{ @@ -105,11 +105,3 @@ func (s *scheduler) Priority(args *schedulerapiv1.ExtenderArgs) (schedulerapiv1. } var _ Scheduler = &scheduler{} - -func getNodeNames(nodes []apiv1.Node) []string { - nodeNames := make([]string, 0) - for _, node := range nodes { - nodeNames = append(nodeNames, node.GetName()) - } - return nodeNames -} diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index 6e19663433..a3b28660ce 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -37,7 +37,7 @@ func main() { conf := tests.ParseConfigOrDie() cli, kubeCli := client.NewCliOrDie() - oa := tests.NewOperatorActions(cli, kubeCli, tests.DefaultPollTimeout, conf) + oa := tests.NewOperatorActions(cli, kubeCli, tests.DefaultPollInterval, conf) fta := tests.NewFaultTriggerAction(cli, kubeCli, conf) fta.CheckAndRecoverEnvOrDie() From e723b663a383ed24df5f16c90735be1e3c3908ad Mon Sep 17 00:00:00 2001 From: weekface Date: Fri, 26 Apr 2019 14:57:20 +0800 Subject: [PATCH 2/2] address comment --- pkg/scheduler/predicates/ha.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index 484e38caff..571f3bcc35 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -165,6 +165,9 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] return getNodeFromNames(nodes, minNodeNames), nil } +// kubernetes scheduling is parallel, to achieve HA, we must ensure the scheduling is serial, +// so when a pod is scheduling, we set an annotation to its PVC, other pods can't be scheduled at this time, +// delete the PVC's annotation when the pod is scheduled(PVC is bound and the pod's nodeName is set) func (h *ha) realAcquireLock(pod *apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { ns := pod.GetNamespace() component := pod.Labels[label.ComponentLabelKey]