Skip to content

Commit

Permalink
improve HA algorithm for TiKV/PD
Browse files Browse the repository at this point in the history
  • Loading branch information
cofyc committed Jul 25, 2019
1 parent 7e66e18 commit dd012e7
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 19 deletions.
86 changes: 69 additions & 17 deletions pkg/scheduler/predicates/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package predicates
import (
"errors"
"fmt"
"math"
"sort"
"strings"
"sync"
Expand All @@ -28,6 +29,7 @@ import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
)
Expand Down Expand Up @@ -67,8 +69,14 @@ func (h *ha) Name() string {
return "HighAvailability"
}

// 1. return the node to kube-scheduler if there is only one node and the pod's pvc is bound
// 2. return these nodes that have least pods and its pods count is less than (replicas+1)/2 to kube-scheduler
// 1. return the node to kube-scheduler if there is only one feasible node and the pod's pvc is bound
// 2. if there are more than two feasible nodes, we are trying to distribute TiKV/PD pods across the nodes for the best HA
// a) for PD (one raft group, copies of data equals to replicas), no more than majority of replicas pods on one node, otherwise majority of replicas may lose when a node is lost.
// e.g. when replicas is 3, we requires no more than 1 pods per node.
// b) for TiKV (multiple raft groups, in each raft group, copies of data is hard-coded to 3)
// when replicas is less than 3, no HA is forced because HA is impossible
// when replicas is equal or greater than 3, we require TiKV pods are running on more than 3 nodes and no more than ceil(replicas / 3) per node
// for PD/TiKV, we both try to balance the number of pods acorss the nodes
// 3. let kube-scheduler to make the final decision
func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]apiv1.Node, error) {
h.lock.Lock()
Expand All @@ -79,6 +87,11 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]
component := pod.Labels[label.ComponentLabelKey]
tcName := getTCNameFromPod(pod, component)

if component != label.PDLabelVal && component != label.TiKVLabelVal {
glog.V(4).Infof("component %s is ignored in HA predicate", component)
return nodes, nil
}

if len(nodes) == 0 {
return nil, fmt.Errorf("kube nodes is empty")
}
Expand Down Expand Up @@ -108,13 +121,17 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]
replicas := getReplicasFrom(tc, component)
glog.Infof("ha: tidbcluster %s/%s component %s replicas %d", ns, tcName, component, replicas)

allNodes := make(sets.String)
nodeMap := make(map[string][]string)
for _, node := range nodes {
nodeMap[node.GetName()] = make([]string, 0)
}
for _, pod := range podList.Items {
pName := pod.GetName()
nodeName := pod.Spec.NodeName
if nodeName != "" {
allNodes.Insert(nodeName)
}
if nodeName == "" || nodeMap[nodeName] == nil {
continue
}
Expand All @@ -126,30 +143,65 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]
min := -1
minNodeNames := make([]string, 0)
for nodeName, podNames := range nodeMap {
// replicas less than 3 cannot achieve high availability
if replicas < 3 {
minNodeNames = append(minNodeNames, nodeName)
glog.Infof("replicas is %d, add node %s to minNodeNames", replicas, nodeName)
continue
}

podsCount := len(podNames)
// When replicas equals 3, pods on each node should not be greater than 1.
maxPodsPerNode := 1
if replicas > 3 {
// When replicas is greater than 3, we allow more than one pods on one node.
maxPodsPerNode = int((replicas + 1) / 2)
maxPodsPerNode := 0

if component == label.PDLabelVal {
/**
* replicas maxPodsPerNode
* ---------------------------
* 1 1
* 2 1
* 3 1
* 4 1
* 5 2
* ...
*/
maxPodsPerNode = int((replicas+1)/2) - 1
if maxPodsPerNode <= 0 {
maxPodsPerNode = 1
}
} else {
// replicas less than 3 cannot achieve high availability
if replicas < 3 {
minNodeNames = append(minNodeNames, nodeName)
glog.Infof("replicas is %d, add node %s to minNodeNames", replicas, nodeName)
continue
}

// 1. TiKV instances must run on at least 3 nodes, otherwise HA is not possible
if allNodes.Len() < 3 {
maxPodsPerNode = 1
} else {
/**
* 2. we requires TiKV instances to run on at least 3 nodes, so max
* allowed pods on each node is ceil(replicas / 3)
*
* replicas maxPodsPerNode best HA on three nodes
* ---------------------------------------------------
* 3 1 1, 1, 1
* 4 2 1, 1, 2
* 5 2 1, 2, 2
* 6 2 2, 2, 2
* 7 3 2, 2, 3
* 8 3 2, 3, 3
* ...
*/
maxPodsPerNode = int(math.Ceil(float64(replicas) / 3))
}
}

if podsCount+1 > maxPodsPerNode {
// pods on this node exceeds the limit, skip
glog.Infof("node %s podsCount+1 is %d, max allowed pods is %d, skipping",
nodeName, podsCount+1, maxPodsPerNode)
glog.Infof("node %s has %d instances of component %s, max allowed is %d, skipping",
nodeName, podsCount, component, maxPodsPerNode)
continue
}

// Choose nodes which has minimum count of the component
if min == -1 {
min = podsCount
}

if podsCount > min {
glog.Infof("node %s podsCount %d > min %d, skipping", nodeName, podsCount, min)
continue
Expand Down
34 changes: 32 additions & 2 deletions pkg/scheduler/predicates/ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,13 +753,13 @@ func TestHAFilter(t *testing.T) {
},
{
name: "three nodes, three pods scheduled on these three nodes, replicas is 4, return all the three nodes",
podFn: newHAPDPod,
podFn: newHATiKVPod,
nodesFn: fakeThreeNodes,
podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}}),
acquireLockFn: acquireSuccess,
tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) {
tc, _ := tcGetFn(ns, tcName)
tc.Spec.PD.Replicas = 4
tc.Spec.TiKV.Replicas = 4
return tc, nil
},
expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) {
Expand All @@ -768,6 +768,26 @@ func TestHAFilter(t *testing.T) {
g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"}))
},
},
{
name: "two nodes, 2,2 pods scheduled on these two nodes, replicas is 5, can't schedule",
podFn: newHATiKVPod,
nodesFn: fakeTwoNodes,
podListFn: podListFn(map[string][]int32{"kube-node-1": {2}, "kube-node-2": {2}}),
tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) {
tc, _ := tcGetFn(ns, tcName)
tc.Spec.TiKV.Replicas = 5
return tc, nil
},
acquireLockFn: acquireSuccess,
expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) {
g.Expect(err).To(HaveOccurred())
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("FailedScheduling"))
g.Expect(strings.Contains(err.Error(), "can't schedule to nodes:")).To(BeTrue())
g.Expect(len(nodes)).To(Equal(0))
},
},
{
name: "three nodes, three pods scheduled on these three nodes, replicas is 5, return three nodes",
podFn: newHAPDPod,
Expand Down Expand Up @@ -905,6 +925,16 @@ func newHAPDPod(instanceName, clusterName string, ordinal int32) *apiv1.Pod {
}
}

func newHATiKVPod(instanceName, clusterName string, ordinal int32) *apiv1.Pod {
return &apiv1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", controller.TiKVMemberName(clusterName), ordinal),
Namespace: corev1.NamespaceDefault,
Labels: label.New().Instance(instanceName).TiKV().Labels(),
},
}
}
func podListFn(nodePodMap map[string][]int32) func(string, string, string) (*apiv1.PodList, error) {
return func(ns, clusterName, component string) (*apiv1.PodList, error) {
podList := &apiv1.PodList{
Expand Down

0 comments on commit dd012e7

Please sign in to comment.