From 96a28e1029cbb65d0be88465262d7ab6a0b2d4ad Mon Sep 17 00:00:00 2001 From: Ansu Varghese Date: Thu, 22 Apr 2021 16:39:51 -0400 Subject: [PATCH 1/6] Even spread for HA implementation Signed-off-by: Ansu Varghese --- .../source/multi/500-controller-service.yaml | 1 - config/source/multi/deployments/adapter.yaml | 15 ++ .../source/multi/deployments/controller.yaml | 6 +- config/source/multi/roles/clusterrole.yaml | 1 + pkg/apis/duck/v1alpha1/placement_types.go | 3 + .../scheduler/statefulset/autoscaler.go | 6 +- .../scheduler/statefulset/autoscaler_test.go | 5 +- pkg/common/scheduler/statefulset/scheduler.go | 223 +++++++++++++++++- .../scheduler/statefulset/scheduler_test.go | 194 ++++++++++++++- pkg/common/scheduler/statefulset/state.go | 59 ++++- .../scheduler/statefulset/state_test.go | 5 +- pkg/source/reconciler/mtsource/controller.go | 7 +- 12 files changed, 489 insertions(+), 36 deletions(-) delete mode 120000 config/source/multi/500-controller-service.yaml diff --git a/config/source/multi/500-controller-service.yaml b/config/source/multi/500-controller-service.yaml deleted file mode 120000 index 462d68f198..0000000000 --- a/config/source/multi/500-controller-service.yaml +++ /dev/null @@ -1 +0,0 @@ -deployments/controller-service.yaml \ No newline at end of file diff --git a/config/source/multi/deployments/adapter.yaml b/config/source/multi/deployments/adapter.yaml index 9cebb7c894..7663c598ac 100644 --- a/config/source/multi/deployments/adapter.yaml +++ b/config/source/multi/deployments/adapter.yaml @@ -80,3 +80,18 @@ spec: containerPort: 8008 terminationGracePeriodSeconds: 10 + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchLabels: + control-plane: kafkasource-mt-adapter + topologyKey: kubernetes.io/hostname + weight: 50 + - podAffinityTerm: + labelSelector: + matchLabels: + control-plane: kafkasource-mt-adapter + topologyKey: topology.kubernetes.io/zone + weight: 50 diff --git a/config/source/multi/deployments/controller.yaml b/config/source/multi/deployments/controller.yaml index f3c6f6f06b..8f35ac82e0 100644 --- a/config/source/multi/deployments/controller.yaml +++ b/config/source/multi/deployments/controller.yaml @@ -47,12 +47,16 @@ spec: # How often (in seconds) the autoscaler tries to scale down the statefulset. - name: AUTOSCALER_REFRESH_PERIOD - value: '10' + value: '100' # The number of virtual replicas this pod can handle. - name: POD_CAPACITY value: '100' + # The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list) + - name: SCHEDULER_POLICY_TYPE + value: '1' + resources: requests: cpu: 20m diff --git a/config/source/multi/roles/clusterrole.yaml b/config/source/multi/roles/clusterrole.yaml index d19bef947f..574044570c 100644 --- a/config/source/multi/roles/clusterrole.yaml +++ b/config/source/multi/roles/clusterrole.yaml @@ -93,6 +93,7 @@ rules: - events - configmaps - secrets + - nodes verbs: *everything # let the webhook label the appropriate namespace diff --git a/pkg/apis/duck/v1alpha1/placement_types.go b/pkg/apis/duck/v1alpha1/placement_types.go index a811b022bd..b108211e2c 100644 --- a/pkg/apis/duck/v1alpha1/placement_types.go +++ b/pkg/apis/duck/v1alpha1/placement_types.go @@ -50,6 +50,9 @@ type Placement struct { // PodName is the name of the pod where the resource is placed PodName string `json:"podName,omitempty"` + // ZoneName is the name of the zone where the pod is located + ZoneName string `json:"zoneName,omitempty"` + // VReplicas is the number of virtual replicas assigned to in the pod VReplicas int32 `json:"vreplicas,omitempty"` } diff --git a/pkg/common/scheduler/statefulset/autoscaler.go b/pkg/common/scheduler/statefulset/autoscaler.go index 4d6091df89..ce6af18155 100644 --- a/pkg/common/scheduler/statefulset/autoscaler.go +++ b/pkg/common/scheduler/statefulset/autoscaler.go @@ -124,8 +124,10 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen // The number of replicas may be lower than the last ordinal, for instance // when the statefulset is manually scaled down. In that case, replicas above // scale.Spec.Replicas have not been considered when scheduling vreplicas. - // Adjust accordingly - pending -= state.freeCapacity() + // Adjust accordingly (applicable only for maxFillUp scheduling policy and not for HA) + if state.schedulerPolicy == MaxFillup { + pending -= state.freeCapacity() + } // Still need more? if pending > 0 { diff --git a/pkg/common/scheduler/statefulset/autoscaler_test.go b/pkg/common/scheduler/statefulset/autoscaler_test.go index 1bd1486812..b236c220a0 100644 --- a/pkg/common/scheduler/statefulset/autoscaler_test.go +++ b/pkg/common/scheduler/statefulset/autoscaler_test.go @@ -26,7 +26,6 @@ import ( kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" - "knative.dev/pkg/logging" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" "knative.dev/eventing-kafka/pkg/common/scheduler" @@ -188,7 +187,7 @@ func TestAutoscaler(t *testing.T) { ctx, _ := setupFakeContext(t) vpodClient := tscheduler.NewVPodClient() - stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10) + stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MaxFillup) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, tc.replicas), metav1.CreateOptions{}) @@ -231,7 +230,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { }) vpodClient := tscheduler.NewVPodClient() - stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10) + stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MaxFillup) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, 10), metav1.CreateOptions{}) diff --git a/pkg/common/scheduler/statefulset/scheduler.go b/pkg/common/scheduler/statefulset/scheduler.go index 658e00bfe1..57e1bc7617 100644 --- a/pkg/common/scheduler/statefulset/scheduler.go +++ b/pkg/common/scheduler/statefulset/scheduler.go @@ -18,13 +18,18 @@ package statefulset import ( "context" + "errors" + "math" + "sort" "sync" "time" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" @@ -37,14 +42,28 @@ import ( "knative.dev/eventing-kafka/pkg/common/scheduler" ) +type SchedulerPolicyType int + +const ( + // MaxFillup policy type adds replicas to existing pods to fill them up before adding to new pods + MaxFillup SchedulerPolicyType = iota + // EvenSpread policy type spreads replicas uniformly across zones in different regions for HA, and within each zone, fills up replicas in existing and new pods + EvenSpread +) + +const ( + ZoneLabel = "topology.kubernetes.io/zone" +) + // NewScheduler creates a new scheduler with pod autoscaling enabled. func NewScheduler(ctx context.Context, namespace, name string, lister scheduler.VPodLister, refreshPeriod time.Duration, - capacity int32) scheduler.Scheduler { + capacity int32, + schedulerPolicy SchedulerPolicyType) scheduler.Scheduler { - stateAccessor := newStateBuilder(logging.FromContext(ctx), lister, capacity) + stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy) autoscaler := NewAutoscaler(ctx, namespace, name, stateAccessor, refreshPeriod, capacity) go autoscaler.Start(ctx) @@ -57,6 +76,7 @@ type StatefulSetScheduler struct { logger *zap.SugaredLogger statefulSetName string statefulSetClient clientappsv1.StatefulSetInterface + podClient clientcorev1.PodInterface vpodLister scheduler.VPodLister lock sync.Locker stateAccessor stateAccessor @@ -80,6 +100,7 @@ func NewStatefulSetScheduler(ctx context.Context, logger: logging.FromContext(ctx), statefulSetName: name, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace), + podClient: kubeclient.Get(ctx).CoreV1().Pods(namespace), vpodLister: lister, pending: make(map[types.NamespacedName]int32), lock: new(sync.Mutex), @@ -113,10 +134,16 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla } placements := vpod.GetPlacements() + var spreadVal, left int32 - // The scheduler - // - allocates as many vreplicas as possible to the same pod(s) (TODO: HA) + // The scheduler when policy type is + // Policy: MaxFillup (SchedulerPolicyType == MaxFillup) + // - allocates as many vreplicas as possible to the same pod(s) // - allocates remaining vreplicas to new pods + // Policy: EvenSpread (SchedulerPolicyType == EvenSpread) + // - divides up vreplicas equally between the zones and + // - allocates as many vreplicas as possible to existing pods while not going over the equal spread value + // - allocates remaining vreplicas to new pods created in new zones still satisfying equal spread // Exact number of vreplicas => do nothing tr := scheduler.GetTotalVReplicas(placements) @@ -128,10 +155,20 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla return placements, nil } + if state.schedulerPolicy == EvenSpread { + //spreadVal is the minimum number of replicas to be placed in each zone for high availability (total replicas divided by number of zones in cluster) + spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numZones))) + logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal)) + } + // Need less => scale down if tr > vpod.GetVReplicas() { logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements := s.removeReplicas(tr-vpod.GetVReplicas(), placements) + if state.schedulerPolicy == MaxFillup { + placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) + } else { + placements = s.removeReplicasEvenSpread(tr-vpod.GetVReplicas(), placements, spreadVal) + } // Do not trigger the autoscaler to avoid unnecessary churn @@ -140,10 +177,14 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla // Need more => scale up logger.Infow("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements, left := s.addReplicas(state, vpod.GetVReplicas()-tr, placements) + if state.schedulerPolicy == MaxFillup { + placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) + } else { + placements, left = s.addReplicasEvenSpread(state, vpod.GetVReplicas()-tr, placements, spreadVal) + } if left > 0 { // Give time for the autoscaler to do its job - logger.Info("scheduling failed (not enough pod replicas)", zap.Any("placement", placements)) + logger.Info("scheduling failed (not enough pod replicas)", zap.Any("placement", placements), zap.Int32("left", left)) s.pending[vpod.GetKey()] = left @@ -177,6 +218,51 @@ func (s *StatefulSetScheduler) removeReplicas(diff int32, placements []duckv1alp return newPlacements } +func (s *StatefulSetScheduler) removeReplicasEvenSpread(diff int32, placements []duckv1alpha1.Placement, evenSpread int32) []duckv1alpha1.Placement { + newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) + logger := s.logger.Named("remove replicas") + + placementsByZone := getPlacementsByZoneKey(placements) + zoneNames := make([]string, 0, len(placementsByZone)) + for zoneName := range placementsByZone { + zoneNames = append(zoneNames, zoneName) + } + sort.Strings(zoneNames) //for ordered accessing of map + + for i := 0; i < len(zoneNames); i++ { //iterate through each zone + totalInZone := getTotalVReplicasInZone(placements, zoneNames[i]) + logger.Info(zap.String("zoneName", zoneNames[i]), zap.Int32("totalInZone", totalInZone)) + + placementOrdinals := placementsByZone[zoneNames[i]] + for j := 0; j < len(placementOrdinals); j++ { //iterating through all existing pods belonging to a single zone + ordinal := placementOrdinals[j] + placement := s.getPlacementFromPodOrdinal(placements, ordinal) + + if diff >= 0 && totalInZone > evenSpread { + deallocation := integer.Int32Min(diff, integer.Int32Min(placement.VReplicas, totalInZone-evenSpread)) + logger.Info(zap.Int32("diff", diff), zap.Int32("deallocation", deallocation)) + + if deallocation < placement.VReplicas { + newPlacements = append(newPlacements, duckv1alpha1.Placement{ + PodName: placement.PodName, + ZoneName: placement.ZoneName, + VReplicas: placement.VReplicas - deallocation, + }) + } + diff -= deallocation + totalInZone -= deallocation + } else { + newPlacements = append(newPlacements, duckv1alpha1.Placement{ + PodName: placement.PodName, + ZoneName: placement.ZoneName, + VReplicas: placement.VReplicas, + }) + } + } + } + return newPlacements +} + func (s *StatefulSetScheduler) addReplicas(state *state, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { // Pod affinity algorithm: prefer adding replicas to existing pods before considering other replicas // In the future, we might want to spread replicas across pods in different regions. @@ -227,6 +313,129 @@ func (s *StatefulSetScheduler) addReplicas(state *state, diff int32, placements return newPlacements, diff } +func (s *StatefulSetScheduler) addReplicasEvenSpread(state *state, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { + // Pod affinity MaxFillup algorithm prefer adding replicas to existing pods to fill them up before adding to new pods + // Pod affinity EvenSpread algorithm spread replicas across pods in different regions for HA + newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) + logger := s.logger.Named("add replicas") + + placementsByZone := getPlacementsByZoneKey(placements) + zoneNames := make([]string, 0, len(placementsByZone)) + for zoneName := range placementsByZone { + zoneNames = append(zoneNames, zoneName) + } + sort.Strings(zoneNames) //for ordered accessing of map + + for i := 0; i < len(zoneNames); i++ { //iterate through each zone + totalInZone := getTotalVReplicasInZone(placements, zoneNames[i]) + logger.Info(zap.String("zoneName", zoneNames[i]), zap.Int32("totalInZone", totalInZone)) + + placementOrdinals := placementsByZone[zoneNames[i]] + for j := 0; j < len(placementOrdinals); j++ { //iterating through all existing pods belonging to a single zone + ordinal := placementOrdinals[j] + placement := s.getPlacementFromPodOrdinal(placements, ordinal) + + // Is there space in Pod? + f := state.Free(ordinal) + if diff >= 0 && f > 0 && totalInZone < evenSpread { + allocation := integer.Int32Min(diff, integer.Int32Min(f, (evenSpread-totalInZone))) + logger.Info(zap.Int32("diff", diff), zap.Int32("allocation", allocation)) + + newPlacements = append(newPlacements, duckv1alpha1.Placement{ + PodName: placement.PodName, + ZoneName: placement.ZoneName, + VReplicas: placement.VReplicas + allocation, + }) + + diff -= allocation + state.SetFree(ordinal, f-allocation) + totalInZone += allocation + } else { + newPlacements = append(newPlacements, placement) + } + } + } + + if diff > 0 { + for ordinal := int32(0); ordinal < s.replicas; ordinal++ { + f := state.Free(ordinal) + if f > 0 { //here it is possible to hit pods that are in existing placements + podName := podNameFromOrdinal(s.statefulSetName, ordinal) + zoneName, err := s.getZoneNameFromPod(state, podName) + if err != nil { + logger.Errorw("Error getting zone info from pod", zap.Error(err)) + continue //TODO: not continue? + } + + totalInZone := getTotalVReplicasInZone(newPlacements, zoneName) + if totalInZone >= evenSpread { + continue //since current zone that pod belongs to is already at max spread + } + logger.Info("Need to schedule on a new pod", zap.Int32("ordinal", ordinal), zap.Int32("free", f), zap.String("zoneName", zoneName), zap.Int32("totalInZone", totalInZone)) + + allocation := integer.Int32Min(diff, integer.Int32Min(f, (evenSpread-totalInZone))) + logger.Info(zap.Int32("diff", diff), zap.Int32("allocation", allocation)) + + newPlacements = append(newPlacements, duckv1alpha1.Placement{ + PodName: podName, + ZoneName: zoneName, + VReplicas: allocation, //TODO could there be existing vreplicas already? + }) + + diff -= allocation + state.SetFree(ordinal, f-allocation) + placementsByZone[zoneName] = append(placementsByZone[zoneName], ordinal) + } + + if diff == 0 { + break + } + } + } + return newPlacements, diff +} + +func (s *StatefulSetScheduler) getZoneNameFromPod(state *state, podName string) (zoneName string, err error) { + pod, err := s.podClient.Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + return zoneName, err + } + + zoneName, ok := state.nodeToZoneMap[pod.Spec.NodeName] + if !ok { + return zoneName, errors.New("Could not find zone") + } + return zoneName, nil +} + +func getPlacementsByZoneKey(placements []duckv1alpha1.Placement) map[string][]int32 { + placementsByZone := make(map[string][]int32) + for i := 0; i < len(placements); i++ { + zoneName := placements[i].ZoneName + placementsByZone[zoneName] = append(placementsByZone[zoneName], ordinalFromPodName(placements[i].PodName)) + } + return placementsByZone +} + +func getTotalVReplicasInZone(placements []duckv1alpha1.Placement, zoneName string) int32 { + var totalReplicasInZone int32 + for i := 0; i < len(placements); i++ { + if placements[i].ZoneName == zoneName { + totalReplicasInZone += placements[i].VReplicas + } + } + return totalReplicasInZone +} + +func (s *StatefulSetScheduler) getPlacementFromPodOrdinal(placements []duckv1alpha1.Placement, ordinal int32) (placement duckv1alpha1.Placement) { + for i := 0; i < len(placements); i++ { + if placements[i].PodName == podNameFromOrdinal(s.statefulSetName, ordinal) { + return placements[i] + } + } + return placement +} + // pendingReplicas returns the total number of vreplicas // that haven't been scheduled yet func (s *StatefulSetScheduler) pendingVReplicas() int32 { diff --git a/pkg/common/scheduler/statefulset/scheduler_test.go b/pkg/common/scheduler/statefulset/scheduler_test.go index 55cae4584a..4d50269356 100644 --- a/pkg/common/scheduler/statefulset/scheduler_test.go +++ b/pkg/common/scheduler/statefulset/scheduler_test.go @@ -18,12 +18,14 @@ package statefulset import ( "context" + "fmt" "reflect" "testing" "time" appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,7 +34,6 @@ import ( kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" "knative.dev/pkg/controller" - "knative.dev/pkg/logging" rectesting "knative.dev/pkg/reconciler/testing" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" @@ -44,16 +45,18 @@ const ( sfsName = "statefulset-name" vpodName = "source-name" vpodNamespace = "source-namespace" + numZones = 3 ) func TestStatefulsetScheduler(t *testing.T) { testCases := []struct { - name string - vreplicas int32 - replicas int32 - placements []duckv1alpha1.Placement - expected []duckv1alpha1.Placement - err error + name string + vreplicas int32 + replicas int32 + placements []duckv1alpha1.Placement + expected []duckv1alpha1.Placement + err error + schedulerPolicy SchedulerPolicyType }{ { name: "no replicas, no vreplicas", @@ -109,6 +112,19 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 5}, }, }, + { + name: "two replicas, 20 vreplicas, scheduling", + vreplicas: 20, + replicas: int32(2), + placements: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: 5}, + {PodName: "statefulset-name-1", VReplicas: 5}, + }, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: 10}, + {PodName: "statefulset-name-1", VReplicas: 10}, + }, + }, { name: "two replicas, 15 vreplicas, too much scheduled (scale down)", vreplicas: 15, @@ -122,6 +138,123 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 10}, }, }, + { + name: "no replicas, no vreplicas, HA scheduling", + vreplicas: 0, + replicas: int32(0), + expected: nil, + schedulerPolicy: EvenSpread, + }, + { + name: "no replicas, 1 vreplicas, fail, HA scheduling", + vreplicas: 1, + replicas: int32(0), + err: scheduler.ErrNotEnoughReplicas, + expected: []duckv1alpha1.Placement{}, + schedulerPolicy: EvenSpread, + }, + { + name: "one replica, one vreplicas, HA scheduling", + vreplicas: 1, + replicas: int32(1), + expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}}, + schedulerPolicy: EvenSpread, + }, + { + name: "one replica, 3 vreplicas, HA scheduling", + vreplicas: 3, + replicas: int32(1), + err: scheduler.ErrNotEnoughReplicas, + expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}}, + schedulerPolicy: EvenSpread, + }, + { + name: "one replica, 15 vreplicas, unschedulable, HA scheduling", + vreplicas: 15, + replicas: int32(1), + err: scheduler.ErrNotEnoughReplicas, + expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 5}}, + schedulerPolicy: EvenSpread, + }, + { + name: "two replicas, 15 vreplicas, scheduled, HA scheduling", + vreplicas: 15, + replicas: int32(2), + err: scheduler.ErrNotEnoughReplicas, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 5}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, + }, + schedulerPolicy: EvenSpread, + }, + { + name: "two replicas, 15 vreplicas, already scheduled, HA scheduling", + vreplicas: 15, + replicas: int32(2), + placements: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 10}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, + }, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 10}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, + }, + schedulerPolicy: EvenSpread, + }, + { + name: "three replicas, 30 vreplicas, HA scheduling", + vreplicas: 30, + replicas: int32(3), + placements: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 5}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 10}, + }, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 10}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 10}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 10}, + }, + schedulerPolicy: EvenSpread, + }, + { + name: "two replicas, 15 vreplicas, too much scheduled (scale down), HA scheduling", + vreplicas: 15, + replicas: int32(3), + placements: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 10}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 10}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 5}, + }, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 5}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 5}, + }, + schedulerPolicy: EvenSpread, + }, + { + name: "three replicas, 15 vreplicas, HA scheduling", + vreplicas: 15, + replicas: int32(3), + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 5}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 5}, + }, + schedulerPolicy: EvenSpread, + }, + { + name: "three replicas, 15 vreplicas, HA scheduling", + vreplicas: 20, + replicas: int32(3), + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 7}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 7}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 6}, + }, + schedulerPolicy: EvenSpread, + }, } for _, tc := range testCases { @@ -130,11 +263,31 @@ func TestStatefulsetScheduler(t *testing.T) { vpodClient := tscheduler.NewVPodClient() + if tc.schedulerPolicy == EvenSpread { + for i := int32(0); i < numZones; i++ { + nodeName := "node" + fmt.Sprint(i) + zoneName := "zone" + fmt.Sprint(i) + _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(ctx, nodeName, zoneName), metav1.CreateOptions{}) + if err != nil { + t.Fatal("unexpected error", err) + } + } + for i := int32(0); i < tc.replicas; i++ { + nodeName := "node" + fmt.Sprint(i) + podName := sfsName + "-" + fmt.Sprint(i) + _, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(ctx, testNs, podName, nodeName), metav1.CreateOptions{}) + if err != nil { + t.Fatal("unexpected error", err) + } + } + } + _, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, makeStatefulset(ctx, testNs, sfsName, tc.replicas), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } - sa := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10) + + sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy) s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil).(*StatefulSetScheduler) // Give some time for the informer to notify the scheduler and set the number of replicas @@ -184,6 +337,31 @@ func makeStatefulset(ctx context.Context, ns, name string, replicas int32) *apps return obj } +func makeNode(ctx context.Context, name, zonename string) *corev1.Node { + obj := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + ZoneLabel: zonename, + }, + }, + } + return obj +} + +func makePod(ctx context.Context, ns, name, nodename string) *corev1.Pod { + obj := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: corev1.PodSpec{ + NodeName: nodename, + }, + } + return obj +} + func setupFakeContext(t *testing.T) (context.Context, context.CancelFunc) { ctx, cancel, informers := rectesting.SetupFakeContextWithCancel(t) err := controller.StartInformers(ctx.Done(), informers...) diff --git a/pkg/common/scheduler/statefulset/state.go b/pkg/common/scheduler/statefulset/state.go index 6343bd6a03..0c998be152 100644 --- a/pkg/common/scheduler/statefulset/state.go +++ b/pkg/common/scheduler/statefulset/state.go @@ -17,8 +17,14 @@ limitations under the License. package statefulset import ( + "context" + "errors" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing-kafka/pkg/common/scheduler" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/logging" ) type stateAccessor interface { @@ -38,6 +44,15 @@ type state struct { // Pod capacity. capacity int32 + + // Number of zones in cluster + numZones int32 + + // Scheduling policy type for placing vreplicas on pods + schedulerPolicy SchedulerPolicyType + + // Mapping node names of nodes currently in cluster to their zone info + nodeToZoneMap map[string]string } // Free safely returns the free capacity at the given ordinal @@ -66,17 +81,21 @@ func (s *state) freeCapacity() int32 { // stateBuilder reconstruct the state from scratch, by listing vpods type stateBuilder struct { - logger *zap.SugaredLogger - vpodLister scheduler.VPodLister - capacity int32 + ctx context.Context + logger *zap.SugaredLogger + vpodLister scheduler.VPodLister + capacity int32 + schedulerPolicy SchedulerPolicyType } // newStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func newStateBuilder(logger *zap.SugaredLogger, lister scheduler.VPodLister, podCapacity int32) stateAccessor { +func newStateBuilder(ctx context.Context, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy SchedulerPolicyType) stateAccessor { return &stateBuilder{ - logger: logger, - vpodLister: lister, - capacity: podCapacity, + ctx: ctx, + logger: logging.FromContext(ctx), + vpodLister: lister, + capacity: podCapacity, + schedulerPolicy: schedulerPolicy, } } @@ -112,7 +131,31 @@ func (s *stateBuilder) State() (*state, error) { } } } - return &state{free: free, lastOrdinal: last, capacity: s.capacity}, nil + + if s.schedulerPolicy == EvenSpread { + //TODO: need a node watch to see if # nodes/ # zones have gone up or down + nodes, err := kubeclient.Get(s.ctx).CoreV1().Nodes().List(s.ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + nodeToZoneMap := make(map[string]string, len(nodes.Items)) + zoneMap := make(map[string]struct{}) + for i := 0; i < len(nodes.Items); i++ { + node := nodes.Items[i] + zoneName, ok := node.GetLabels()[ZoneLabel] + if !ok { + return nil, errors.New("Could not find label for zone") + } + + nodeToZoneMap[node.Name] = zoneName + zoneMap[zoneName] = struct{}{} + } + + return &state{free: free, lastOrdinal: last, capacity: s.capacity, numZones: int32(len(zoneMap)), schedulerPolicy: s.schedulerPolicy, nodeToZoneMap: nodeToZoneMap}, nil + + } + return &state{free: free, lastOrdinal: last, capacity: s.capacity, schedulerPolicy: s.schedulerPolicy}, nil } func grow(slice []int32, ordinal int32, def int32) []int32 { diff --git a/pkg/common/scheduler/statefulset/state_test.go b/pkg/common/scheduler/statefulset/state_test.go index 1da07fdf58..7e228ede65 100644 --- a/pkg/common/scheduler/statefulset/state_test.go +++ b/pkg/common/scheduler/statefulset/state_test.go @@ -21,8 +21,6 @@ import ( "reflect" "testing" - logtesting "knative.dev/pkg/logging/testing" - duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" ) @@ -71,6 +69,7 @@ func TestStateBuilder(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + ctx, _ := setupFakeContext(t) vpodClient := tscheduler.NewVPodClient() for i, placements := range tc.vpods { @@ -80,7 +79,7 @@ func TestStateBuilder(t *testing.T) { vpodClient.Create(vpodNamespace, vpodName, 1, placements) } - stateBuilder := newStateBuilder(logtesting.TestLogger(t), vpodClient.List, int32(10)) + stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), MaxFillup) state, err := stateBuilder.State() if err != nil { t.Fatal("unexpected error", err) diff --git a/pkg/source/reconciler/mtsource/controller.go b/pkg/source/reconciler/mtsource/controller.go index 7c6ccbb932..b645a8a149 100644 --- a/pkg/source/reconciler/mtsource/controller.go +++ b/pkg/source/reconciler/mtsource/controller.go @@ -38,8 +38,9 @@ import ( ) type envConfig struct { - SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"` - PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"` + SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"` + PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"` + SchedulerPolicy scheduler.SchedulerPolicyType `envconfig:"SCHEDULER_POLICY_TYPE" required:"true"` } func NewController( @@ -70,7 +71,7 @@ func NewController( sourcesv1beta1.RegisterAlternateKafkaConditionSet(sourcesv1beta1.KafkaMTSourceCondSet) rp := time.Duration(env.SchedulerRefreshPeriod) * time.Second - c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity) + c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy) logging.FromContext(ctx).Info("Setting up kafka event handlers") kafkaInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) From 75ee2b2c0e4a0404803d3458dc3ced8764aa1a55 Mon Sep 17 00:00:00 2001 From: Ansu Varghese Date: Tue, 4 May 2021 12:52:04 -0400 Subject: [PATCH 2/6] Ignore nodes that don't have a zone label in state Signed-off-by: Ansu Varghese --- .../scheduler/statefulset/autoscaler_test.go | 4 +- .../scheduler/statefulset/scheduler_test.go | 21 +++++--- pkg/common/scheduler/statefulset/state.go | 3 +- .../scheduler/statefulset/state_test.go | 50 +++++++++++++++---- 4 files changed, 58 insertions(+), 20 deletions(-) diff --git a/pkg/common/scheduler/statefulset/autoscaler_test.go b/pkg/common/scheduler/statefulset/autoscaler_test.go index b236c220a0..0b9c518df3 100644 --- a/pkg/common/scheduler/statefulset/autoscaler_test.go +++ b/pkg/common/scheduler/statefulset/autoscaler_test.go @@ -190,7 +190,7 @@ func TestAutoscaler(t *testing.T) { stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MaxFillup) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) - _, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, tc.replicas), metav1.CreateOptions{}) + _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } @@ -233,7 +233,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MaxFillup) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) - _, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, 10), metav1.CreateOptions{}) + _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } diff --git a/pkg/common/scheduler/statefulset/scheduler_test.go b/pkg/common/scheduler/statefulset/scheduler_test.go index 4d50269356..e303a35fd1 100644 --- a/pkg/common/scheduler/statefulset/scheduler_test.go +++ b/pkg/common/scheduler/statefulset/scheduler_test.go @@ -267,7 +267,7 @@ func TestStatefulsetScheduler(t *testing.T) { for i := int32(0); i < numZones; i++ { nodeName := "node" + fmt.Sprint(i) zoneName := "zone" + fmt.Sprint(i) - _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(ctx, nodeName, zoneName), metav1.CreateOptions{}) + _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } @@ -275,14 +275,14 @@ func TestStatefulsetScheduler(t *testing.T) { for i := int32(0); i < tc.replicas; i++ { nodeName := "node" + fmt.Sprint(i) podName := sfsName + "-" + fmt.Sprint(i) - _, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(ctx, testNs, podName, nodeName), metav1.CreateOptions{}) + _, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } } } - _, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, makeStatefulset(ctx, testNs, sfsName, tc.replicas), metav1.CreateOptions{}) + _, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } @@ -320,7 +320,7 @@ func TestStatefulsetScheduler(t *testing.T) { } } -func makeStatefulset(ctx context.Context, ns, name string, replicas int32) *appsv1.StatefulSet { +func makeStatefulset(ns, name string, replicas int32) *appsv1.StatefulSet { obj := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -337,7 +337,7 @@ func makeStatefulset(ctx context.Context, ns, name string, replicas int32) *apps return obj } -func makeNode(ctx context.Context, name, zonename string) *corev1.Node { +func makeNode(name, zonename string) *corev1.Node { obj := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -349,7 +349,16 @@ func makeNode(ctx context.Context, name, zonename string) *corev1.Node { return obj } -func makePod(ctx context.Context, ns, name, nodename string) *corev1.Pod { +func makeNodeNoLabel(name string) *corev1.Node { + obj := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + return obj +} + +func makePod(ns, name, nodename string) *corev1.Pod { obj := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, diff --git a/pkg/common/scheduler/statefulset/state.go b/pkg/common/scheduler/statefulset/state.go index 0c998be152..5582beee18 100644 --- a/pkg/common/scheduler/statefulset/state.go +++ b/pkg/common/scheduler/statefulset/state.go @@ -18,7 +18,6 @@ package statefulset import ( "context" - "errors" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -145,7 +144,7 @@ func (s *stateBuilder) State() (*state, error) { node := nodes.Items[i] zoneName, ok := node.GetLabels()[ZoneLabel] if !ok { - return nil, errors.New("Could not find label for zone") + continue //ignore node that doesn't have zone info (maybe a test setup or control node) } nodeToZoneMap[node.Name] = zoneName diff --git a/pkg/common/scheduler/statefulset/state_test.go b/pkg/common/scheduler/statefulset/state_test.go index 7e228ede65..970eb31f46 100644 --- a/pkg/common/scheduler/statefulset/state_test.go +++ b/pkg/common/scheduler/statefulset/state_test.go @@ -21,28 +21,33 @@ import ( "reflect" "testing" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" + kubeclient "knative.dev/pkg/client/injection/kube/client" ) func TestStateBuilder(t *testing.T) { testCases := []struct { - name string - vpods [][]duckv1alpha1.Placement - expected state - freec int32 - err error + name string + vpods [][]duckv1alpha1.Placement + expected state + freec int32 + schedulerPolicy SchedulerPolicyType + nodes []*v1.Node + err error }{ { name: "no vpods", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1}, + expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, schedulerPolicy: MaxFillup}, freec: int32(0), }, { name: "one vpods", vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: state{capacity: 10, free: []int32{int32(9)}, lastOrdinal: 0}, + expected: state{capacity: 10, free: []int32{int32(9)}, lastOrdinal: 0, schedulerPolicy: MaxFillup}, freec: int32(9), }, { @@ -52,7 +57,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 2}}, {{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}}, }, - expected: state{capacity: 10, free: []int32{int32(8), int32(5), int32(5)}, lastOrdinal: 2}, + expected: state{capacity: 10, free: []int32{int32(8), int32(5), int32(5)}, lastOrdinal: 2, schedulerPolicy: MaxFillup}, freec: int32(18), }, { @@ -62,9 +67,25 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: state{capacity: 10, free: []int32{int32(9), int32(10), int32(5), int32(10)}, lastOrdinal: 2}, + expected: state{capacity: 10, free: []int32{int32(9), int32(10), int32(5), int32(10)}, lastOrdinal: 2, schedulerPolicy: MaxFillup}, freec: int32(24), }, + { + name: "no vpods, all nodes with zone labels", + vpods: [][]duckv1alpha1.Placement{}, + expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 3, schedulerPolicy: EvenSpread, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-2"}}, + freec: int32(0), + schedulerPolicy: EvenSpread, + nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNode("node-1", "zone-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, + }, + { + name: "no vpods, one node with no label", + vpods: [][]duckv1alpha1.Placement{}, + expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 2, schedulerPolicy: EvenSpread, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-2": "zone-2", "node-3": "zone-2"}}, + freec: int32(0), + schedulerPolicy: EvenSpread, + nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNodeNoLabel("node-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, + }, } for _, tc := range testCases { @@ -79,7 +100,16 @@ func TestStateBuilder(t *testing.T) { vpodClient.Create(vpodNamespace, vpodName, 1, placements) } - stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), MaxFillup) + if tc.schedulerPolicy == EvenSpread { + for i := 0; i < len(tc.nodes); i++ { + _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tc.nodes[i], metav1.CreateOptions{}) + if err != nil { + t.Fatal("unexpected error", err) + } + } + } + + stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy) state, err := stateBuilder.State() if err != nil { t.Fatal("unexpected error", err) From 22d95fb97f3fbf6f592073cb06c42e5d4ae2dc2c Mon Sep 17 00:00:00 2001 From: Ansu Varghese Date: Tue, 4 May 2021 16:36:58 -0400 Subject: [PATCH 3/6] Fix for not removing enough replicas Signed-off-by: Ansu Varghese --- pkg/common/scheduler/statefulset/scheduler.go | 23 +++++---- .../scheduler/statefulset/scheduler_test.go | 51 ++++++++++++++++++- 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/pkg/common/scheduler/statefulset/scheduler.go b/pkg/common/scheduler/statefulset/scheduler.go index 57e1bc7617..be27ccfec3 100644 --- a/pkg/common/scheduler/statefulset/scheduler.go +++ b/pkg/common/scheduler/statefulset/scheduler.go @@ -155,18 +155,15 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla return placements, nil } - if state.schedulerPolicy == EvenSpread { - //spreadVal is the minimum number of replicas to be placed in each zone for high availability (total replicas divided by number of zones in cluster) - spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numZones))) - logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal)) - } - // Need less => scale down if tr > vpod.GetVReplicas() { logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) if state.schedulerPolicy == MaxFillup { placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) } else { + //spreadVal is the minimum number of replicas to be left behind in each zone for high availability + spreadVal = int32(math.Floor(float64(vpod.GetVReplicas()) / float64(state.numZones))) + logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal)) placements = s.removeReplicasEvenSpread(tr-vpod.GetVReplicas(), placements, spreadVal) } @@ -180,6 +177,9 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla if state.schedulerPolicy == MaxFillup { placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } else { + //spreadVal is the minimum number of replicas to be placed in each zone for high availability + spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numZones))) + logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal)) placements, left = s.addReplicasEvenSpread(state, vpod.GetVReplicas()-tr, placements, spreadVal) } if left > 0 { @@ -238,19 +238,22 @@ func (s *StatefulSetScheduler) removeReplicasEvenSpread(diff int32, placements [ ordinal := placementOrdinals[j] placement := s.getPlacementFromPodOrdinal(placements, ordinal) - if diff >= 0 && totalInZone > evenSpread { + if diff > 0 && totalInZone >= evenSpread { deallocation := integer.Int32Min(diff, integer.Int32Min(placement.VReplicas, totalInZone-evenSpread)) logger.Info(zap.Int32("diff", diff), zap.Int32("deallocation", deallocation)) - if deallocation < placement.VReplicas { + if deallocation > 0 && deallocation < placement.VReplicas { newPlacements = append(newPlacements, duckv1alpha1.Placement{ PodName: placement.PodName, ZoneName: placement.ZoneName, VReplicas: placement.VReplicas - deallocation, }) + diff -= deallocation + totalInZone -= deallocation + } else { + diff -= placement.VReplicas + totalInZone -= placement.VReplicas } - diff -= deallocation - totalInZone -= deallocation } else { newPlacements = append(newPlacements, duckv1alpha1.Placement{ PodName: placement.PodName, diff --git a/pkg/common/scheduler/statefulset/scheduler_test.go b/pkg/common/scheduler/statefulset/scheduler_test.go index e303a35fd1..62e731b5f6 100644 --- a/pkg/common/scheduler/statefulset/scheduler_test.go +++ b/pkg/common/scheduler/statefulset/scheduler_test.go @@ -218,7 +218,7 @@ func TestStatefulsetScheduler(t *testing.T) { schedulerPolicy: EvenSpread, }, { - name: "two replicas, 15 vreplicas, too much scheduled (scale down), HA scheduling", + name: "three replicas, 15 vreplicas, too much scheduled (scale down), HA scheduling", vreplicas: 15, replicas: int32(3), placements: []duckv1alpha1.Placement{ @@ -255,6 +255,55 @@ func TestStatefulsetScheduler(t *testing.T) { }, schedulerPolicy: EvenSpread, }, + { + name: "three replicas, 2 vreplicas, too much scheduled (scale down), HA scheduling", + vreplicas: 2, + replicas: int32(3), + placements: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 1}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 1}, + }, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 1}, + {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 1}, + }, + schedulerPolicy: EvenSpread, + }, + { + name: "three replicas, 3 vreplicas, too much scheduled (scale down), HA scheduling", + vreplicas: 3, + replicas: int32(3), + placements: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 2}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 2}, + {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 2}, + {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 2}, + }, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}, + {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 1}, + {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 1}, + }, + schedulerPolicy: EvenSpread, + }, + { + name: "three replicas, 7 vreplicas, too much scheduled (scale down), HA scheduling", + vreplicas: 7, + replicas: int32(3), + placements: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 4}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 3}, + {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 4}, + {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 3}, + }, + expected: []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 2}, + {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 2}, + {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 3}, + }, + schedulerPolicy: EvenSpread, + }, } for _, tc := range testCases { From bbd2170bd8db601208f19a36d4f7055a4ef22473 Mon Sep 17 00:00:00 2001 From: Ansu Varghese Date: Thu, 6 May 2021 02:24:01 -0400 Subject: [PATCH 4/6] Changing SchedulerPolicyType from int to string Signed-off-by: Ansu Varghese --- .../source/multi/deployments/controller.yaml | 2 +- .../scheduler/statefulset/autoscaler.go | 4 +- .../scheduler/statefulset/autoscaler_test.go | 45 +++++++++++++++---- pkg/common/scheduler/statefulset/scheduler.go | 36 ++++++++------- .../scheduler/statefulset/scheduler_test.go | 35 ++++++++------- pkg/common/scheduler/statefulset/state.go | 8 +++- .../scheduler/statefulset/state_test.go | 43 ++++++++++-------- pkg/source/reconciler/mtsource/controller.go | 5 ++- 8 files changed, 113 insertions(+), 65 deletions(-) diff --git a/config/source/multi/deployments/controller.yaml b/config/source/multi/deployments/controller.yaml index 8f35ac82e0..63fdc539d9 100644 --- a/config/source/multi/deployments/controller.yaml +++ b/config/source/multi/deployments/controller.yaml @@ -55,7 +55,7 @@ spec: # The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list) - name: SCHEDULER_POLICY_TYPE - value: '1' + value: 'EVENSPREAD' resources: requests: diff --git a/pkg/common/scheduler/statefulset/autoscaler.go b/pkg/common/scheduler/statefulset/autoscaler.go index ce6af18155..5923f35f00 100644 --- a/pkg/common/scheduler/statefulset/autoscaler.go +++ b/pkg/common/scheduler/statefulset/autoscaler.go @@ -124,8 +124,8 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen // The number of replicas may be lower than the last ordinal, for instance // when the statefulset is manually scaled down. In that case, replicas above // scale.Spec.Replicas have not been considered when scheduling vreplicas. - // Adjust accordingly (applicable only for maxFillUp scheduling policy and not for HA) - if state.schedulerPolicy == MaxFillup { + // Adjust accordingly (applicable only for MAXFILLUP scheduling policy and not for HA) + if state.schedulerPolicy != EVENSPREAD { pending -= state.freeCapacity() } diff --git a/pkg/common/scheduler/statefulset/autoscaler_test.go b/pkg/common/scheduler/statefulset/autoscaler_test.go index 0b9c518df3..ccd487572b 100644 --- a/pkg/common/scheduler/statefulset/autoscaler_test.go +++ b/pkg/common/scheduler/statefulset/autoscaler_test.go @@ -22,7 +22,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + corev1 "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" @@ -38,12 +40,13 @@ const ( func TestAutoscaler(t *testing.T) { testCases := []struct { - name string - replicas int32 - vpods []scheduler.VPod - pendings int32 - scaleDown bool - wantReplicas int32 + name string + replicas int32 + vpods []scheduler.VPod + pendings int32 + scaleDown bool + wantReplicas int32 + schedulerPolicy SchedulerPolicyType }{ { name: "no replicas, no placements, no pending", @@ -180,6 +183,30 @@ func TestAutoscaler(t *testing.T) { pendings: int32(8), wantReplicas: int32(3), }, + { + name: "no replicas, with placements, with pending, enough capacity", + replicas: int32(0), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + {PodName: "pod-0", VReplicas: int32(8)}, + {PodName: "pod-1", VReplicas: int32(7)}}), + }, + pendings: int32(3), + wantReplicas: int32(3), + schedulerPolicy: EVENSPREAD, + }, + { + name: "with replicas, with placements, with pending, enough capacity", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + {PodName: "pod-0", VReplicas: int32(8)}, + {PodName: "pod-1", VReplicas: int32(7)}}), + }, + pendings: int32(3), + wantReplicas: int32(3), + schedulerPolicy: EVENSPREAD, + }, } for _, tc := range testCases { @@ -187,7 +214,8 @@ func TestAutoscaler(t *testing.T) { ctx, _ := setupFakeContext(t) vpodClient := tscheduler.NewVPodClient() - stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MaxFillup) + nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) + stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, nodeLister) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) @@ -230,7 +258,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { }) vpodClient := tscheduler.NewVPodClient() - stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MaxFillup) + nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) + stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, nodeLister) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{}) diff --git a/pkg/common/scheduler/statefulset/scheduler.go b/pkg/common/scheduler/statefulset/scheduler.go index be27ccfec3..a10a7d2557 100644 --- a/pkg/common/scheduler/statefulset/scheduler.go +++ b/pkg/common/scheduler/statefulset/scheduler.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corev1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" @@ -42,13 +43,13 @@ import ( "knative.dev/eventing-kafka/pkg/common/scheduler" ) -type SchedulerPolicyType int +type SchedulerPolicyType string const ( - // MaxFillup policy type adds replicas to existing pods to fill them up before adding to new pods - MaxFillup SchedulerPolicyType = iota - // EvenSpread policy type spreads replicas uniformly across zones in different regions for HA, and within each zone, fills up replicas in existing and new pods - EvenSpread + // MAXFILLUP policy type adds replicas to existing pods to fill them up before adding to new pods + MAXFILLUP SchedulerPolicyType = "MAXFILLUP" + // EVENSPREAD policy type spreads replicas uniformly across failure-domains such as regions, zones, nodes, etc + EVENSPREAD = "EVENSPREAD" ) const ( @@ -61,9 +62,10 @@ func NewScheduler(ctx context.Context, lister scheduler.VPodLister, refreshPeriod time.Duration, capacity int32, - schedulerPolicy SchedulerPolicyType) scheduler.Scheduler { + schedulerPolicy SchedulerPolicyType, + nodeLister corev1.NodeLister) scheduler.Scheduler { - stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy) + stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy, nodeLister) autoscaler := NewAutoscaler(ctx, namespace, name, stateAccessor, refreshPeriod, capacity) go autoscaler.Start(ctx) @@ -137,10 +139,10 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla var spreadVal, left int32 // The scheduler when policy type is - // Policy: MaxFillup (SchedulerPolicyType == MaxFillup) + // Policy: MAXFILLUP (SchedulerPolicyType == MAXFILLUP) // - allocates as many vreplicas as possible to the same pod(s) // - allocates remaining vreplicas to new pods - // Policy: EvenSpread (SchedulerPolicyType == EvenSpread) + // Policy: EVENSPREAD (SchedulerPolicyType == EVENSPREAD) // - divides up vreplicas equally between the zones and // - allocates as many vreplicas as possible to existing pods while not going over the equal spread value // - allocates remaining vreplicas to new pods created in new zones still satisfying equal spread @@ -158,13 +160,13 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla // Need less => scale down if tr > vpod.GetVReplicas() { logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - if state.schedulerPolicy == MaxFillup { - placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) - } else { + if state.schedulerPolicy == EVENSPREAD { //spreadVal is the minimum number of replicas to be left behind in each zone for high availability spreadVal = int32(math.Floor(float64(vpod.GetVReplicas()) / float64(state.numZones))) logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal)) placements = s.removeReplicasEvenSpread(tr-vpod.GetVReplicas(), placements, spreadVal) + } else { + placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) } // Do not trigger the autoscaler to avoid unnecessary churn @@ -174,13 +176,13 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla // Need more => scale up logger.Infow("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - if state.schedulerPolicy == MaxFillup { - placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) - } else { + if state.schedulerPolicy == EVENSPREAD { //spreadVal is the minimum number of replicas to be placed in each zone for high availability spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numZones))) logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal)) placements, left = s.addReplicasEvenSpread(state, vpod.GetVReplicas()-tr, placements, spreadVal) + } else { + placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } if left > 0 { // Give time for the autoscaler to do its job @@ -317,8 +319,8 @@ func (s *StatefulSetScheduler) addReplicas(state *state, diff int32, placements } func (s *StatefulSetScheduler) addReplicasEvenSpread(state *state, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { - // Pod affinity MaxFillup algorithm prefer adding replicas to existing pods to fill them up before adding to new pods - // Pod affinity EvenSpread algorithm spread replicas across pods in different regions for HA + // Pod affinity MAXFILLUP algorithm prefer adding replicas to existing pods to fill them up before adding to new pods + // Pod affinity EVENSPREAD algorithm spread replicas across pods in different regions for HA newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) logger := s.logger.Named("add replicas") diff --git a/pkg/common/scheduler/statefulset/scheduler_test.go b/pkg/common/scheduler/statefulset/scheduler_test.go index 62e731b5f6..9bcfebcd60 100644 --- a/pkg/common/scheduler/statefulset/scheduler_test.go +++ b/pkg/common/scheduler/statefulset/scheduler_test.go @@ -29,7 +29,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + corelister "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" @@ -143,7 +145,7 @@ func TestStatefulsetScheduler(t *testing.T) { vreplicas: 0, replicas: int32(0), expected: nil, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "no replicas, 1 vreplicas, fail, HA scheduling", @@ -151,14 +153,14 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(0), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{}, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "one replica, one vreplicas, HA scheduling", vreplicas: 1, replicas: int32(1), expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}}, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "one replica, 3 vreplicas, HA scheduling", @@ -166,7 +168,7 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(1), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}}, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "one replica, 15 vreplicas, unschedulable, HA scheduling", @@ -174,7 +176,7 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(1), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 5}}, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "two replicas, 15 vreplicas, scheduled, HA scheduling", @@ -185,7 +187,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 5}, {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "two replicas, 15 vreplicas, already scheduled, HA scheduling", @@ -199,7 +201,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 10}, {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "three replicas, 30 vreplicas, HA scheduling", @@ -215,7 +217,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 10}, {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 10}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "three replicas, 15 vreplicas, too much scheduled (scale down), HA scheduling", @@ -231,7 +233,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 5}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "three replicas, 15 vreplicas, HA scheduling", @@ -242,7 +244,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 5}, {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 5}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "three replicas, 15 vreplicas, HA scheduling", @@ -253,7 +255,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 7}, {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 6}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "three replicas, 2 vreplicas, too much scheduled (scale down), HA scheduling", @@ -268,7 +270,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 1}, {PodName: "statefulset-name-2", ZoneName: "zone2", VReplicas: 1}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "three replicas, 3 vreplicas, too much scheduled (scale down), HA scheduling", @@ -285,7 +287,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 1}, {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 1}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, { name: "three replicas, 7 vreplicas, too much scheduled (scale down), HA scheduling", @@ -302,7 +304,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 2}, {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 3}, }, - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, }, } @@ -312,7 +314,7 @@ func TestStatefulsetScheduler(t *testing.T) { vpodClient := tscheduler.NewVPodClient() - if tc.schedulerPolicy == EvenSpread { + if tc.schedulerPolicy == EVENSPREAD { for i := int32(0); i < numZones; i++ { nodeName := "node" + fmt.Sprint(i) zoneName := "zone" + fmt.Sprint(i) @@ -336,7 +338,8 @@ func TestStatefulsetScheduler(t *testing.T) { t.Fatal("unexpected error", err) } - sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy) + nodeLister := corelister.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) + sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, nodeLister) s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil).(*StatefulSetScheduler) // Give some time for the informer to notify the scheduler and set the number of replicas diff --git a/pkg/common/scheduler/statefulset/state.go b/pkg/common/scheduler/statefulset/state.go index 5582beee18..86d20780e5 100644 --- a/pkg/common/scheduler/statefulset/state.go +++ b/pkg/common/scheduler/statefulset/state.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/client-go/listers/core/v1" "knative.dev/eventing-kafka/pkg/common/scheduler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" @@ -85,16 +86,18 @@ type stateBuilder struct { vpodLister scheduler.VPodLister capacity int32 schedulerPolicy SchedulerPolicyType + nodeLister corev1.NodeLister } // newStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func newStateBuilder(ctx context.Context, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy SchedulerPolicyType) stateAccessor { +func newStateBuilder(ctx context.Context, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy SchedulerPolicyType, nodeLister corev1.NodeLister) stateAccessor { return &stateBuilder{ ctx: ctx, logger: logging.FromContext(ctx), vpodLister: lister, capacity: podCapacity, schedulerPolicy: schedulerPolicy, + nodeLister: nodeLister, } } @@ -131,9 +134,10 @@ func (s *stateBuilder) State() (*state, error) { } } - if s.schedulerPolicy == EvenSpread { + if s.schedulerPolicy == EVENSPREAD { //TODO: need a node watch to see if # nodes/ # zones have gone up or down nodes, err := kubeclient.Get(s.ctx).CoreV1().Nodes().List(s.ctx, metav1.ListOptions{}) + // nodes, err := s.nodeLister.List(labels.Everything()) // Not working yet!! if err != nil { return nil, err } diff --git a/pkg/common/scheduler/statefulset/state_test.go b/pkg/common/scheduler/statefulset/state_test.go index 970eb31f46..9a003fe845 100644 --- a/pkg/common/scheduler/statefulset/state_test.go +++ b/pkg/common/scheduler/statefulset/state_test.go @@ -23,6 +23,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -39,16 +41,18 @@ func TestStateBuilder(t *testing.T) { err error }{ { - name: "no vpods", - vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, schedulerPolicy: MaxFillup}, - freec: int32(0), + name: "no vpods", + vpods: [][]duckv1alpha1.Placement{}, + expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, schedulerPolicy: MAXFILLUP}, + freec: int32(0), + schedulerPolicy: MAXFILLUP, }, { - name: "one vpods", - vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: state{capacity: 10, free: []int32{int32(9)}, lastOrdinal: 0, schedulerPolicy: MaxFillup}, - freec: int32(9), + name: "one vpods", + vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, + expected: state{capacity: 10, free: []int32{int32(9)}, lastOrdinal: 0, schedulerPolicy: MAXFILLUP}, + freec: int32(9), + schedulerPolicy: MAXFILLUP, }, { name: "many vpods, no gaps", @@ -57,8 +61,9 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 2}}, {{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}}, }, - expected: state{capacity: 10, free: []int32{int32(8), int32(5), int32(5)}, lastOrdinal: 2, schedulerPolicy: MaxFillup}, - freec: int32(18), + expected: state{capacity: 10, free: []int32{int32(8), int32(5), int32(5)}, lastOrdinal: 2, schedulerPolicy: MAXFILLUP}, + freec: int32(18), + schedulerPolicy: MAXFILLUP, }, { name: "many vpods, with gaps", @@ -67,23 +72,24 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: state{capacity: 10, free: []int32{int32(9), int32(10), int32(5), int32(10)}, lastOrdinal: 2, schedulerPolicy: MaxFillup}, - freec: int32(24), + expected: state{capacity: 10, free: []int32{int32(9), int32(10), int32(5), int32(10)}, lastOrdinal: 2, schedulerPolicy: MAXFILLUP}, + freec: int32(24), + schedulerPolicy: MAXFILLUP, }, { name: "no vpods, all nodes with zone labels", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 3, schedulerPolicy: EvenSpread, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-2"}}, + expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 3, schedulerPolicy: EVENSPREAD, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-2"}}, freec: int32(0), - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNode("node-1", "zone-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, }, { name: "no vpods, one node with no label", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 2, schedulerPolicy: EvenSpread, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-2": "zone-2", "node-3": "zone-2"}}, + expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 2, schedulerPolicy: EVENSPREAD, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-2": "zone-2", "node-3": "zone-2"}}, freec: int32(0), - schedulerPolicy: EvenSpread, + schedulerPolicy: EVENSPREAD, nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNodeNoLabel("node-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, }, } @@ -100,7 +106,7 @@ func TestStateBuilder(t *testing.T) { vpodClient.Create(vpodNamespace, vpodName, 1, placements) } - if tc.schedulerPolicy == EvenSpread { + if tc.schedulerPolicy == EVENSPREAD { for i := 0; i < len(tc.nodes); i++ { _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tc.nodes[i], metav1.CreateOptions{}) if err != nil { @@ -109,7 +115,8 @@ func TestStateBuilder(t *testing.T) { } } - stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy) + nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) + stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, nodeLister) state, err := stateBuilder.State() if err != nil { t.Fatal("unexpected error", err) diff --git a/pkg/source/reconciler/mtsource/controller.go b/pkg/source/reconciler/mtsource/controller.go index b645a8a149..3296a263ef 100644 --- a/pkg/source/reconciler/mtsource/controller.go +++ b/pkg/source/reconciler/mtsource/controller.go @@ -21,6 +21,8 @@ import ( "time" "github.com/kelseyhightower/envconfig" + corev1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -71,7 +73,8 @@ func NewController( sourcesv1beta1.RegisterAlternateKafkaConditionSet(sourcesv1beta1.KafkaMTSourceCondSet) rp := time.Duration(env.SchedulerRefreshPeriod) * time.Second - c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy) + nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) + c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy, nodeLister) logging.FromContext(ctx).Info("Setting up kafka event handlers") kafkaInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) From ca43d71bc40614994a70f6a069239a4b0dbb7f2f Mon Sep 17 00:00:00 2001 From: Ansu Varghese Date: Thu, 6 May 2021 15:23:48 -0400 Subject: [PATCH 5/6] Adding node lister and pod lister to get info from cache to compute spread etc Signed-off-by: Ansu Varghese --- .../scheduler/statefulset/autoscaler_test.go | 11 ++-- pkg/common/scheduler/statefulset/scheduler.go | 34 +++++++----- .../scheduler/statefulset/scheduler_test.go | 24 +++++---- pkg/common/scheduler/statefulset/state.go | 12 ++--- .../scheduler/statefulset/state_test.go | 12 +++-- pkg/source/reconciler/mtsource/controller.go | 7 ++- third_party/VENDOR-LICENSE/LICENSE | 27 ++++++++++ .../vendor/golang.org/x/crypto/LICENSE | 27 ++++++++++ .../vendor/golang.org/x/net/LICENSE | 27 ++++++++++ .../vendor/golang.org/x/sys/cpu/LICENSE | 27 ++++++++++ .../vendor/golang.org/x/text/LICENSE | 27 ++++++++++ .../kube/informers/core/v1/node/node.go | 52 +++++++++++++++++++ vendor/modules.txt | 1 + 13 files changed, 242 insertions(+), 46 deletions(-) create mode 100644 third_party/VENDOR-LICENSE/LICENSE create mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE create mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE create mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE create mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE create mode 100644 vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/node/node.go diff --git a/pkg/common/scheduler/statefulset/autoscaler_test.go b/pkg/common/scheduler/statefulset/autoscaler_test.go index ccd487572b..1c671eda12 100644 --- a/pkg/common/scheduler/statefulset/autoscaler_test.go +++ b/pkg/common/scheduler/statefulset/autoscaler_test.go @@ -22,10 +22,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - corev1 "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" - "k8s.io/client-go/tools/cache" + listers "knative.dev/eventing/pkg/reconciler/testing/v1" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" @@ -214,8 +213,8 @@ func TestAutoscaler(t *testing.T) { ctx, _ := setupFakeContext(t) vpodClient := tscheduler.NewVPodClient() - nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) - stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, nodeLister) + ls := listers.NewListers(nil) + stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister()) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) @@ -258,8 +257,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { }) vpodClient := tscheduler.NewVPodClient() - nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) - stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, nodeLister) + ls := listers.NewListers(nil) + stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, ls.GetNodeLister()) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{}) diff --git a/pkg/common/scheduler/statefulset/scheduler.go b/pkg/common/scheduler/statefulset/scheduler.go index a10a7d2557..4ff7299867 100644 --- a/pkg/common/scheduler/statefulset/scheduler.go +++ b/pkg/common/scheduler/statefulset/scheduler.go @@ -26,11 +26,9 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" - clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - corev1 "k8s.io/client-go/listers/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" @@ -41,6 +39,7 @@ import ( duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" "knative.dev/eventing-kafka/pkg/common/scheduler" + podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" ) type SchedulerPolicyType string @@ -63,14 +62,16 @@ func NewScheduler(ctx context.Context, refreshPeriod time.Duration, capacity int32, schedulerPolicy SchedulerPolicyType, - nodeLister corev1.NodeLister) scheduler.Scheduler { + nodeLister corev1listers.NodeLister) scheduler.Scheduler { stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy, nodeLister) autoscaler := NewAutoscaler(ctx, namespace, name, stateAccessor, refreshPeriod, capacity) + podInformer := podinformer.Get(ctx) + podLister := podInformer.Lister().Pods(namespace) go autoscaler.Start(ctx) - return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler) + return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister) } // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods @@ -78,7 +79,7 @@ type StatefulSetScheduler struct { logger *zap.SugaredLogger statefulSetName string statefulSetClient clientappsv1.StatefulSetInterface - podClient clientcorev1.PodInterface + podLister corev1listers.PodNamespaceLister vpodLister scheduler.VPodLister lock sync.Locker stateAccessor stateAccessor @@ -96,13 +97,13 @@ func NewStatefulSetScheduler(ctx context.Context, namespace, name string, lister scheduler.VPodLister, stateAccessor stateAccessor, - autoscaler Autoscaler) scheduler.Scheduler { + autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler { scheduler := &StatefulSetScheduler{ logger: logging.FromContext(ctx), statefulSetName: name, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace), - podClient: kubeclient.Get(ctx).CoreV1().Pods(namespace), + podLister: podlister, vpodLister: lister, pending: make(map[types.NamespacedName]int32), lock: new(sync.Mutex), @@ -177,7 +178,7 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla // Need more => scale up logger.Infow("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) if state.schedulerPolicy == EVENSPREAD { - //spreadVal is the minimum number of replicas to be placed in each zone for high availability + //spreadVal is the maximum number of replicas to be placed in each zone for high availability spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numZones))) logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal)) placements, left = s.addReplicasEvenSpread(state, vpod.GetVReplicas()-tr, placements, spreadVal) @@ -236,13 +237,13 @@ func (s *StatefulSetScheduler) removeReplicasEvenSpread(diff int32, placements [ logger.Info(zap.String("zoneName", zoneNames[i]), zap.Int32("totalInZone", totalInZone)) placementOrdinals := placementsByZone[zoneNames[i]] - for j := 0; j < len(placementOrdinals); j++ { //iterating through all existing pods belonging to a single zone + for j := len(placementOrdinals) - 1; j >= 0; j-- { //iterating through all existing pods belonging to a single zone from larger cardinal to smaller ordinal := placementOrdinals[j] placement := s.getPlacementFromPodOrdinal(placements, ordinal) if diff > 0 && totalInZone >= evenSpread { deallocation := integer.Int32Min(diff, integer.Int32Min(placement.VReplicas, totalInZone-evenSpread)) - logger.Info(zap.Int32("diff", diff), zap.Int32("deallocation", deallocation)) + logger.Info(zap.Int32("diff", diff), zap.Int32("ordinal", ordinal), zap.Int32("deallocation", deallocation)) if deallocation > 0 && deallocation < placement.VReplicas { newPlacements = append(newPlacements, duckv1alpha1.Placement{ @@ -252,9 +253,15 @@ func (s *StatefulSetScheduler) removeReplicasEvenSpread(diff int32, placements [ }) diff -= deallocation totalInZone -= deallocation - } else { + } else if deallocation >= placement.VReplicas { diff -= placement.VReplicas totalInZone -= placement.VReplicas + } else { + newPlacements = append(newPlacements, duckv1alpha1.Placement{ + PodName: placement.PodName, + ZoneName: placement.ZoneName, + VReplicas: placement.VReplicas, + }) } } else { newPlacements = append(newPlacements, duckv1alpha1.Placement{ @@ -262,6 +269,7 @@ func (s *StatefulSetScheduler) removeReplicasEvenSpread(diff int32, placements [ ZoneName: placement.ZoneName, VReplicas: placement.VReplicas, }) + } } } @@ -401,7 +409,7 @@ func (s *StatefulSetScheduler) addReplicasEvenSpread(state *state, diff int32, p } func (s *StatefulSetScheduler) getZoneNameFromPod(state *state, podName string) (zoneName string, err error) { - pod, err := s.podClient.Get(context.Background(), podName, metav1.GetOptions{}) + pod, err := s.podLister.Get(podName) if err != nil { return zoneName, err } diff --git a/pkg/common/scheduler/statefulset/scheduler_test.go b/pkg/common/scheduler/statefulset/scheduler_test.go index 9bcfebcd60..0d68a1afdc 100644 --- a/pkg/common/scheduler/statefulset/scheduler_test.go +++ b/pkg/common/scheduler/statefulset/scheduler_test.go @@ -29,9 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - corelister "k8s.io/client-go/listers/core/v1" gtesting "k8s.io/client-go/testing" - "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" @@ -41,6 +39,7 @@ import ( duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" "knative.dev/eventing-kafka/pkg/common/scheduler" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" + listers "knative.dev/eventing/pkg/reconciler/testing/v1" ) const ( @@ -284,7 +283,7 @@ func TestStatefulsetScheduler(t *testing.T) { }, expected: []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}, - {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 1}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 1}, {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 1}, }, schedulerPolicy: EVENSPREAD, @@ -301,7 +300,7 @@ func TestStatefulsetScheduler(t *testing.T) { }, expected: []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 2}, - {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 2}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 2}, {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 3}, }, schedulerPolicy: EVENSPREAD, @@ -311,25 +310,28 @@ func TestStatefulsetScheduler(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx, _ := setupFakeContext(t) - + nodelist := make([]runtime.Object, 0, numZones) + podlist := make([]runtime.Object, 0, tc.replicas) vpodClient := tscheduler.NewVPodClient() if tc.schedulerPolicy == EVENSPREAD { for i := int32(0); i < numZones; i++ { nodeName := "node" + fmt.Sprint(i) zoneName := "zone" + fmt.Sprint(i) - _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{}) + node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } + nodelist = append(nodelist, node) } for i := int32(0); i < tc.replicas; i++ { nodeName := "node" + fmt.Sprint(i) podName := sfsName + "-" + fmt.Sprint(i) - _, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{}) + pod, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } + podlist = append(podlist, pod) } } @@ -337,10 +339,10 @@ func TestStatefulsetScheduler(t *testing.T) { if err != nil { t.Fatal("unexpected error", err) } - - nodeLister := corelister.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) - sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, nodeLister) - s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil).(*StatefulSetScheduler) + lsn := listers.NewListers(nodelist) + sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, lsn.GetNodeLister()) + lsp := listers.NewListers(podlist) + s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler) // Give some time for the informer to notify the scheduler and set the number of replicas time.Sleep(200 * time.Millisecond) diff --git a/pkg/common/scheduler/statefulset/state.go b/pkg/common/scheduler/statefulset/state.go index 86d20780e5..975cfad9a0 100644 --- a/pkg/common/scheduler/statefulset/state.go +++ b/pkg/common/scheduler/statefulset/state.go @@ -20,10 +20,9 @@ import ( "context" "go.uber.org/zap" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" corev1 "k8s.io/client-go/listers/core/v1" "knative.dev/eventing-kafka/pkg/common/scheduler" - kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" ) @@ -136,16 +135,15 @@ func (s *stateBuilder) State() (*state, error) { if s.schedulerPolicy == EVENSPREAD { //TODO: need a node watch to see if # nodes/ # zones have gone up or down - nodes, err := kubeclient.Get(s.ctx).CoreV1().Nodes().List(s.ctx, metav1.ListOptions{}) - // nodes, err := s.nodeLister.List(labels.Everything()) // Not working yet!! + nodes, err := s.nodeLister.List(labels.Everything()) if err != nil { return nil, err } - nodeToZoneMap := make(map[string]string, len(nodes.Items)) + nodeToZoneMap := make(map[string]string, len(nodes)) zoneMap := make(map[string]struct{}) - for i := 0; i < len(nodes.Items); i++ { - node := nodes.Items[i] + for i := 0; i < len(nodes); i++ { + node := nodes[i] zoneName, ok := node.GetLabels()[ZoneLabel] if !ok { continue //ignore node that doesn't have zone info (maybe a test setup or control node) diff --git a/pkg/common/scheduler/statefulset/state_test.go b/pkg/common/scheduler/statefulset/state_test.go index 9a003fe845..e6bcdf3610 100644 --- a/pkg/common/scheduler/statefulset/state_test.go +++ b/pkg/common/scheduler/statefulset/state_test.go @@ -23,10 +23,10 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1 "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" + "k8s.io/apimachinery/pkg/runtime" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" + listers "knative.dev/eventing/pkg/reconciler/testing/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" ) @@ -98,6 +98,7 @@ func TestStateBuilder(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx, _ := setupFakeContext(t) vpodClient := tscheduler.NewVPodClient() + nodelist := make([]runtime.Object, 0, len(tc.nodes)) for i, placements := range tc.vpods { vpodName := fmt.Sprint("vpod-name-", i) @@ -108,15 +109,16 @@ func TestStateBuilder(t *testing.T) { if tc.schedulerPolicy == EVENSPREAD { for i := 0; i < len(tc.nodes); i++ { - _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tc.nodes[i], metav1.CreateOptions{}) + node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tc.nodes[i], metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } + nodelist = append(nodelist, node) } } - nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) - stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, nodeLister) + ls := listers.NewListers(nodelist) + stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, ls.GetNodeLister()) state, err := stateBuilder.State() if err != nil { t.Fatal("unexpected error", err) diff --git a/pkg/source/reconciler/mtsource/controller.go b/pkg/source/reconciler/mtsource/controller.go index 3296a263ef..d17cde4c2a 100644 --- a/pkg/source/reconciler/mtsource/controller.go +++ b/pkg/source/reconciler/mtsource/controller.go @@ -21,8 +21,6 @@ import ( "time" "github.com/kelseyhightower/envconfig" - corev1 "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -37,6 +35,7 @@ import ( kafkainformer "knative.dev/eventing-kafka/pkg/client/injection/informers/sources/v1beta1/kafkasource" "knative.dev/eventing-kafka/pkg/client/injection/reconciler/sources/v1beta1/kafkasource" scheduler "knative.dev/eventing-kafka/pkg/common/scheduler/statefulset" + nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node" ) type envConfig struct { @@ -57,6 +56,7 @@ func NewController( } kafkaInformer := kafkainformer.Get(ctx) + nodeInformer := nodeinformer.Get(ctx) c := &Reconciler{ KubeClientSet: kubeclient.Get(ctx), @@ -73,8 +73,7 @@ func NewController( sourcesv1beta1.RegisterAlternateKafkaConditionSet(sourcesv1beta1.KafkaMTSourceCondSet) rp := time.Duration(env.SchedulerRefreshPeriod) * time.Second - nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})) - c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy, nodeLister) + c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy, nodeInformer.Lister()) logging.FromContext(ctx).Info("Setting up kafka event handlers") kafkaInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) diff --git a/third_party/VENDOR-LICENSE/LICENSE b/third_party/VENDOR-LICENSE/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/third_party/VENDOR-LICENSE/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/node/node.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/node/node.go new file mode 100644 index 0000000000..cf5ef01fcb --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/node/node.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package node + +import ( + context "context" + + v1 "k8s.io/client-go/informers/core/v1" + factory "knative.dev/pkg/client/injection/kube/informers/factory" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Core().V1().Nodes() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1.NodeInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch k8s.io/client-go/informers/core/v1.NodeInformer from context.") + } + return untyped.(v1.NodeInformer) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 8f45519d7c..9acfb73f3f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1252,6 +1252,7 @@ knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints knative.dev/pkg/client/injection/kube/informers/core/v1/namespace +knative.dev/pkg/client/injection/kube/informers/core/v1/node knative.dev/pkg/client/injection/kube/informers/core/v1/pod knative.dev/pkg/client/injection/kube/informers/core/v1/service knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake From 781f66c711b0fd6464d94776eca59ad8694d44d4 Mon Sep 17 00:00:00 2001 From: Ansu Varghese Date: Mon, 10 May 2021 17:51:39 -0400 Subject: [PATCH 6/6] Reverting scheduler policy type to default strategy Signed-off-by: Ansu Varghese --- .../source/multi/deployments/controller.yaml | 2 +- third_party/VENDOR-LICENSE/LICENSE | 27 ------------------- .../vendor/golang.org/x/crypto/LICENSE | 27 ------------------- .../vendor/golang.org/x/net/LICENSE | 27 ------------------- .../vendor/golang.org/x/sys/cpu/LICENSE | 27 ------------------- .../vendor/golang.org/x/text/LICENSE | 27 ------------------- 6 files changed, 1 insertion(+), 136 deletions(-) delete mode 100644 third_party/VENDOR-LICENSE/LICENSE delete mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE delete mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE delete mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE delete mode 100644 third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE diff --git a/config/source/multi/deployments/controller.yaml b/config/source/multi/deployments/controller.yaml index 63fdc539d9..431160d9a2 100644 --- a/config/source/multi/deployments/controller.yaml +++ b/config/source/multi/deployments/controller.yaml @@ -55,7 +55,7 @@ spec: # The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list) - name: SCHEDULER_POLICY_TYPE - value: 'EVENSPREAD' + value: 'MAXFILLUP' resources: requests: diff --git a/third_party/VENDOR-LICENSE/LICENSE b/third_party/VENDOR-LICENSE/LICENSE deleted file mode 100644 index 6a66aea5ea..0000000000 --- a/third_party/VENDOR-LICENSE/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE deleted file mode 100644 index 6a66aea5ea..0000000000 --- a/third_party/VENDOR-LICENSE/vendor/golang.org/x/crypto/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE deleted file mode 100644 index 6a66aea5ea..0000000000 --- a/third_party/VENDOR-LICENSE/vendor/golang.org/x/net/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE deleted file mode 100644 index 6a66aea5ea..0000000000 --- a/third_party/VENDOR-LICENSE/vendor/golang.org/x/sys/cpu/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE b/third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE deleted file mode 100644 index 6a66aea5ea..0000000000 --- a/third_party/VENDOR-LICENSE/vendor/golang.org/x/text/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.