diff --git a/config/source/multi/600-config-scheduler.yaml b/config/source/multi/600-config-scheduler.yaml new file mode 100644 index 0000000000..84ebe6394a --- /dev/null +++ b/config/source/multi/600-config-scheduler.yaml @@ -0,0 +1,36 @@ +# Copyright 2021 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-scheduler + namespace: knative-eventing + labels: + kafka.eventing.knative.dev/release: devel +data: + predicates: |+ + [ + {"name" : "PodFitsResources"}, + {"name" : "NoMaxResourceCount"}, + {"name" : "EvenPodSpread"} + ] + priorities: |+ + [ + {"name" : "AvailabilityZonePriority", "weight" : 1}, + {"name" : "AvailabilityNodePriority", "weight" : 1}, + {"name" : "LowestOrdinalPriority", "weight" : 1} + ] + + \ No newline at end of file diff --git a/config/source/multi/deployments/controller.yaml b/config/source/multi/deployments/controller.yaml index fe5417d877..a405604d66 100644 --- a/config/source/multi/deployments/controller.yaml +++ b/config/source/multi/deployments/controller.yaml @@ -55,7 +55,10 @@ spec: # The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list) - name: SCHEDULER_POLICY_TYPE - value: 'EVENSPREAD_BYNODE' + value: '' + + - name: CONFIG_SCHEDULER + value: config-scheduler resources: requests: diff --git a/pkg/common/scheduler/factory/registry.go b/pkg/common/scheduler/factory/registry.go new file mode 100644 index 0000000000..5e00d456a5 --- /dev/null +++ b/pkg/common/scheduler/factory/registry.go @@ -0,0 +1,88 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "fmt" + + state "knative.dev/eventing-kafka/pkg/common/scheduler/state" +) + +// RegistryFP is a collection of all available filter plugins. +type RegistryFP map[string]state.FilterPlugin + +// RegistrySP is a collection of all available scoring plugins. +type RegistrySP map[string]state.ScorePlugin + +var ( + FilterRegistry = make(RegistryFP) + ScoreRegistry = make(RegistrySP) +) + +// Register adds a new plugin to the registry. If a plugin with the same name +// exists, it returns an error. +func RegisterFP(name string, factory state.FilterPlugin) error { + if _, ok := FilterRegistry[name]; ok { + return fmt.Errorf("a plugin named %v already exists", name) + } + FilterRegistry[name] = factory + return nil +} + +// Unregister removes an existing plugin from the registry. If no plugin with +// the provided name exists, it returns an error. +func UnregisterFP(name string) error { + if _, ok := FilterRegistry[name]; !ok { + return fmt.Errorf("no plugin named %v exists", name) + } + delete(FilterRegistry, name) + return nil +} + +func GetFilterPlugin(name string) (state.FilterPlugin, error) { + if f, exist := FilterRegistry[name]; exist { + return f, nil + } + return nil, fmt.Errorf("no plugin named %v exists", name) +} + +// Register adds a new plugin to the registry. If a plugin with the same name +// exists, it returns an error. +func RegisterSP(name string, factory state.ScorePlugin) error { + if _, ok := ScoreRegistry[name]; ok { + return fmt.Errorf("a plugin named %v already exists", name) + } + ScoreRegistry[name] = factory + return nil +} + +// Unregister removes an existing plugin from the registry. If no plugin with +// the provided name exists, it returns an error. +func UnregisterSP(name string) error { + if _, ok := ScoreRegistry[name]; !ok { + return fmt.Errorf("no plugin named %v exists", name) + } + delete(ScoreRegistry, name) + return nil +} + +func GetScorePlugin(name string) (state.ScorePlugin, error) { + if f, exist := ScoreRegistry[name]; exist { + return f, nil + } + return nil, fmt.Errorf("no plugin named %v exists", name) +} diff --git a/pkg/common/scheduler/plugins/core/availabilitynodepriority/availability_node_priority.go b/pkg/common/scheduler/plugins/core/availabilitynodepriority/availability_node_priority.go new file mode 100644 index 0000000000..bd610abb2c --- /dev/null +++ b/pkg/common/scheduler/plugins/core/availabilitynodepriority/availability_node_priority.go @@ -0,0 +1,71 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package availabilitynodepriority + +import ( + "context" + "fmt" + + "knative.dev/eventing-kafka/pkg/common/scheduler/factory" + state "knative.dev/eventing-kafka/pkg/common/scheduler/state" + "knative.dev/pkg/logging" +) + +// AvailabilityNodePriority is a score plugin that favors pods that create an even spread of resources across nodes for HA +type AvailabilityNodePriority struct { +} + +// Verify AvailabilityNodePriority Implements ScorePlugin Interface +var _ state.ScorePlugin = &AvailabilityNodePriority{} + +// Name of the plugin +const Name = state.AvailabilityNodePriority + +func init() { + factory.RegisterSP(Name, &AvailabilityNodePriority{}) //TODO: Not working + fmt.Println("AvailabilityNodePriority has been registered") +} + +// Name returns name of the plugin +func (pl *AvailabilityNodePriority) Name() string { + return Name +} + +// Score invoked at the score extension point. +func (pl *AvailabilityNodePriority) Score(ctx context.Context, states *state.State, podID int32) (int64, *state.Status) { + logger := logging.FromContext(ctx).With("Score", pl.Name()) + + score := calculatePriority() + + logger.Infof("Pod %q scored by %q priority successfully", podID, pl.Name()) + return score, state.NewStatus(state.Success) +} + +// ScoreExtensions of the Score plugin. +func (pl *AvailabilityNodePriority) ScoreExtensions() state.ScoreExtensions { + return pl +} + +// NormalizeScore invoked after scoring all pods. +func (pl *AvailabilityNodePriority) NormalizeScore(ctx context.Context, state *state.State, scores state.PodScoreList) *state.Status { + return nil +} + +// calculatePriority returns the priority of a pod. Given the ... +func calculatePriority() int64 { + return 1 +} diff --git a/pkg/common/scheduler/plugins/core/availabilitynodepriority/availability_node_priority_test.go b/pkg/common/scheduler/plugins/core/availabilitynodepriority/availability_node_priority_test.go new file mode 100644 index 0000000000..e7bacfd06a --- /dev/null +++ b/pkg/common/scheduler/plugins/core/availabilitynodepriority/availability_node_priority_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package availabilitynodepriority diff --git a/pkg/common/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority.go b/pkg/common/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority.go new file mode 100644 index 0000000000..40a3ab7981 --- /dev/null +++ b/pkg/common/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority.go @@ -0,0 +1,70 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package availabilityzonepriority + +import ( + "context" + + "knative.dev/eventing-kafka/pkg/common/scheduler/factory" + state "knative.dev/eventing-kafka/pkg/common/scheduler/state" + "knative.dev/pkg/logging" +) + +// AvailabilityZonePriority is a score plugin that favors pods that create an even spread of resources across zones for HA +type AvailabilityZonePriority struct { +} + +// Verify AvailabilityZonePriority Implements ScorePlugin Interface +var _ state.ScorePlugin = &AvailabilityZonePriority{} + +// Name of the plugin +const Name = state.AvailabilityZonePriority + +func init() { + factory.RegisterSP(Name, &AvailabilityZonePriority{}) + //fmt.Println("AvailabilityZonePriority plugin has been registered") +} + +// Name returns name of the plugin +func (pl *AvailabilityZonePriority) Name() string { + return Name +} + +// Ssts invoked at the ssts extension point. +func (pl *AvailabilityZonePriority) Score(ctx context.Context, states *state.State, podID int32) (int64, *state.Status) { + logger := logging.FromContext(ctx).With("Score", pl.Name()) + + score := calculatePriority() + + logger.Infof("Pod %q scored by %q priority successfully", podID, pl.Name()) + return score, state.NewStatus(state.Success) +} + +// ScoreExtensions of the Score plugin. +func (pl *AvailabilityZonePriority) ScoreExtensions() state.ScoreExtensions { + return pl +} + +// NormalizeScore invoked after scoring all pods. +func (pl *AvailabilityZonePriority) NormalizeScore(ctx context.Context, state *state.State, scores state.PodScoreList) *state.Status { + return nil +} + +// calculatePriority returns the priority of a pod. Given the ... +func calculatePriority() int64 { + return 1 +} diff --git a/pkg/common/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority_test.go b/pkg/common/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority_test.go new file mode 100644 index 0000000000..e8a6e90b79 --- /dev/null +++ b/pkg/common/scheduler/plugins/core/availabilityzonepriority/availability_zone_priority_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package availabilityzonepriority diff --git a/pkg/common/scheduler/plugins/core/evenpodspread/even_pod_spread.go b/pkg/common/scheduler/plugins/core/evenpodspread/even_pod_spread.go new file mode 100644 index 0000000000..1ef8a3d83c --- /dev/null +++ b/pkg/common/scheduler/plugins/core/evenpodspread/even_pod_spread.go @@ -0,0 +1,53 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package evenpodspread + +import ( + "context" + + "knative.dev/eventing-kafka/pkg/common/scheduler/factory" + state "knative.dev/eventing-kafka/pkg/common/scheduler/state" + "knative.dev/pkg/logging" +) + +// EvenPodSpread is a filter plugin that eliminates pods that do not create an equal spread of resources across pods +type EvenPodSpread struct { +} + +// Verify EvenPodSpread Implements FilterPlugin Interface +var _ state.FilterPlugin = &EvenPodSpread{} + +// Name of the plugin +const Name = state.EvenPodSpread + +func init() { + factory.RegisterFP(Name, &EvenPodSpread{}) + //fmt.Println("EvenPodSpread plugin has been registered") +} + +// Name returns name of the plugin +func (pl *EvenPodSpread) Name() string { + return Name +} + +// Filter invoked at the filter extension point. +func (pl *EvenPodSpread) Filter(ctx context.Context, states *state.State, podID int32) *state.Status { + logger := logging.FromContext(ctx).With("Filter", pl.Name()) + + logger.Infof("Pod %q passed %q predicate successfully", podID, pl.Name()) + return state.NewStatus(state.Success) +} diff --git a/pkg/common/scheduler/plugins/core/evenpodspread/even_pod_spread_test.go b/pkg/common/scheduler/plugins/core/evenpodspread/even_pod_spread_test.go new file mode 100644 index 0000000000..013065b25b --- /dev/null +++ b/pkg/common/scheduler/plugins/core/evenpodspread/even_pod_spread_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package evenpodspread diff --git a/pkg/common/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority.go b/pkg/common/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority.go new file mode 100644 index 0000000000..6403f211f7 --- /dev/null +++ b/pkg/common/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority.go @@ -0,0 +1,66 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lowestordinalpriority + +import ( + "context" + + "knative.dev/eventing-kafka/pkg/common/scheduler/factory" + state "knative.dev/eventing-kafka/pkg/common/scheduler/state" + "knative.dev/pkg/logging" +) + +// LowestOrdinalPriority is a score plugin that favors pods that create an even spread of resources across zones for HA +type LowestOrdinalPriority struct { +} + +// Verify LowestOrdinalPriority Implements ScorePlugin Interface +var _ state.ScorePlugin = &LowestOrdinalPriority{} + +// Name of the plugin +const Name = state.LowestOrdinalPriority + +func init() { + factory.RegisterSP(Name, &LowestOrdinalPriority{}) + //fmt.Println("LowestOrdinalPriority plugin has been registered") +} + +// Name returns name of the plugin +func (pl *LowestOrdinalPriority) Name() string { + return Name +} + +// Score invoked at the score extension point. +func (pl *LowestOrdinalPriority) Score(ctx context.Context, states *state.State, podID int32) (int64, *state.Status) { + logger := logging.FromContext(ctx).With("Score", pl.Name()) + + score := int64((states.LastOrdinal * 100) - podID) //lower ordinals get higher score + + logger.Infof("Pod %q scored by %q priority successfully", podID, pl.Name()) + return score, state.NewStatus(state.Success) +} + +// ScoreExtensions of the Score plugin. +func (pl *LowestOrdinalPriority) ScoreExtensions() state.ScoreExtensions { + return pl +} + +// NormalizeScore invoked after scoring all pods. +func (pl *LowestOrdinalPriority) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status { + //no normalizing to perform for this plugin + return state.NewStatus(state.Success) +} diff --git a/pkg/common/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority_test.go b/pkg/common/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority_test.go new file mode 100644 index 0000000000..7840a8dd27 --- /dev/null +++ b/pkg/common/scheduler/plugins/core/lowestordinalpriority/lowest_ordinal_priority_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lowestordinalpriority diff --git a/pkg/common/scheduler/plugins/core/podfitsresources/pod_fits_resources.go b/pkg/common/scheduler/plugins/core/podfitsresources/pod_fits_resources.go new file mode 100644 index 0000000000..4da01d9a52 --- /dev/null +++ b/pkg/common/scheduler/plugins/core/podfitsresources/pod_fits_resources.go @@ -0,0 +1,62 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podfitsresources + +import ( + "context" + + "knative.dev/eventing-kafka/pkg/common/scheduler/factory" + state "knative.dev/eventing-kafka/pkg/common/scheduler/state" + "knative.dev/pkg/logging" +) + +// PodFitsResources plugin filters pods that do not have sufficient free capacity +// for a vreplica to be placed on it +type PodFitsResources struct { +} + +// Verify PodFitsResources Implements FilterPlugin Interface +var _ state.FilterPlugin = &PodFitsResources{} + +// Name of the plugin +const Name = state.PodFitsResources + +const ( + ErrReasonUnschedulable = "pod at full capacity" +) + +func init() { + factory.RegisterFP(Name, &PodFitsResources{}) + //fmt.Println("PodFitsResources plugin has been registered") +} + +// Name returns name of the plugin +func (pl *PodFitsResources) Name() string { + return Name +} + +// Filter invoked at the filter extension point. +func (pl *PodFitsResources) Filter(ctx context.Context, states *state.State, podID int32) *state.Status { + logger := logging.FromContext(ctx).With("Filter", pl.Name()) + + if len(states.FreeCap) == 0 || states.Free(podID) > 0 { //vpods with no placements or pods with positive free cap + logger.Infof("Pod %d passed %s predicate successfully", podID, pl.Name()) + return state.NewStatus(state.Success) + } + logger.Infof("Pod %d has no free capacity %v", podID, states.FreeCap) + return state.NewStatus(state.Unschedulable, ErrReasonUnschedulable) +} diff --git a/pkg/common/scheduler/plugins/core/podfitsresources/pod_fits_resources_test.go b/pkg/common/scheduler/plugins/core/podfitsresources/pod_fits_resources_test.go new file mode 100644 index 0000000000..4a4e79bd23 --- /dev/null +++ b/pkg/common/scheduler/plugins/core/podfitsresources/pod_fits_resources_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podfitsresources diff --git a/pkg/common/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count.go b/pkg/common/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count.go new file mode 100644 index 0000000000..a5a0eab813 --- /dev/null +++ b/pkg/common/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count.go @@ -0,0 +1,58 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nomaxresourcecount + +import ( + "context" + + "knative.dev/eventing-kafka/pkg/common/scheduler/factory" + state "knative.dev/eventing-kafka/pkg/common/scheduler/state" + "knative.dev/pkg/logging" +) + +// NoMaxResourceCount plugin filters pods that cause total pods with placements +// to exceed total partitioncount +type NoMaxResourceCount struct { +} + +// Verify NoMaxResourceCount Implements FilterPlugin Interface +var _ state.FilterPlugin = &NoMaxResourceCount{} + +// Name of the plugin +const Name = state.NoMaxResourceCount + +const ( + ErrReasonUnschedulable = "pod(s) were unschedulable" +) + +func init() { + factory.RegisterFP(Name, &NoMaxResourceCount{}) + //fmt.Println("NoMaxResourceCount plugin has been registered") +} + +// Name returns name of the plugin +func (pl *NoMaxResourceCount) Name() string { + return Name +} + +// Filter invoked at the filter extension point. +func (pl *NoMaxResourceCount) Filter(ctx context.Context, states *state.State, podID int32) *state.Status { + logger := logging.FromContext(ctx).With("Filter", pl.Name()) + + logger.Infof("Pod %q passed %q predicate successfully", podID, pl.Name()) + return state.NewStatus(state.Success) +} diff --git a/pkg/common/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count_test.go b/pkg/common/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count_test.go new file mode 100644 index 0000000000..a1e714f340 --- /dev/null +++ b/pkg/common/scheduler/plugins/kafka/nomaxresourcecount/no_max_resource_count_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nomaxresourcecount diff --git a/pkg/common/scheduler/scheduler.go b/pkg/common/scheduler/scheduler.go index f4234f90da..a7328cab43 100644 --- a/pkg/common/scheduler/scheduler.go +++ b/pkg/common/scheduler/scheduler.go @@ -28,6 +28,21 @@ var ( ErrNotEnoughReplicas = errors.New("scheduling failed (not enough pod replicas)") ) +type SchedulerPolicyType string + +const ( + // MAXFILLUP policy type adds vreplicas to existing pods to fill them up before adding to new pods + MAXFILLUP SchedulerPolicyType = "MAXFILLUP" + // EVENSPREAD policy type spreads vreplicas uniformly across zones to reduce impact of failure + EVENSPREAD = "EVENSPREAD" + // EVENSPREAD_BYNODE policy type spreads vreplicas uniformly across nodes to reduce impact of failure + EVENSPREAD_BYNODE = "EVENSPREAD_BYNODE" +) + +const ( + ZoneLabel = "topology.kubernetes.io/zone" +) + // VPodLister is the function signature for returning a list of VPods type VPodLister func() ([]VPod, error) diff --git a/pkg/common/scheduler/statefulset/helpers.go b/pkg/common/scheduler/state/helpers.go similarity index 86% rename from pkg/common/scheduler/statefulset/helpers.go rename to pkg/common/scheduler/state/helpers.go index 2d545f1a4d..91ad35944f 100644 --- a/pkg/common/scheduler/statefulset/helpers.go +++ b/pkg/common/scheduler/state/helpers.go @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package statefulset +package state import ( "math" @@ -21,11 +21,11 @@ import ( "strings" ) -func podNameFromOrdinal(name string, ordinal int32) string { +func PodNameFromOrdinal(name string, ordinal int32) string { return name + "-" + strconv.Itoa(int(ordinal)) } -func ordinalFromPodName(podName string) int32 { +func OrdinalFromPodName(podName string) int32 { ordinal, err := strconv.ParseInt(podName[strings.LastIndex(podName, "-")+1:], 10, 32) if err != nil { return math.MaxInt32 diff --git a/pkg/common/scheduler/state/interface.go b/pkg/common/scheduler/state/interface.go new file mode 100644 index 0000000000..ebcfa28e4a --- /dev/null +++ b/pkg/common/scheduler/state/interface.go @@ -0,0 +1,178 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "context" + "errors" + "strings" +) + +const ( + PodFitsResources = "PodFitsResources" + NoMaxResourceCount = "NoMaxResourceCount" + EvenPodSpread = "EvenPodSpread" + AvailabilityNodePriority = "AvailabilityNodePriority" + AvailabilityZonePriority = "AvailabilityZonePriority" + LowestOrdinalPriority = "LowestOrdinalPriority" +) + +// Plugin is the parent type for all the scheduling framework plugins. +type Plugin interface { + Name() string +} + +type FilterPlugin interface { + Plugin + // Filter is called by the scheduler. + // All FilterPlugins should return "Success" to declare that + // the given pod fits the vreplica. + Filter(ctx context.Context, state *State, podID int32) *Status +} + +// ScoreExtensions is an interface for Score extended functionality. +type ScoreExtensions interface { + // NormalizeScore is called for all pod scores produced by the same plugin's "Score" + // method. A successful run of NormalizeScore will update the scores list and return + // a success status. + NormalizeScore(ctx context.Context, state *State, scores PodScoreList) *Status +} + +type ScorePlugin interface { + Plugin + // Score is called by the scheduler. + // All ScorePlugins should return "Success" + Score(ctx context.Context, state *State, podID int32) (int64, *Status) + + // ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not + ScoreExtensions() ScoreExtensions +} + +// Code is the Status code/type which is returned from plugins. +type Code int + +// These are predefined codes used in a Status. +const ( + // Success means that plugin ran correctly and found pod schedulable. + Success Code = iota + // Unschedulable is used when a plugin finds a pod unschedulable due to not satisying the predicate. + Unschedulable + // Error is used for internal plugin errors, unexpected input, etc. + Error +) + +// Status indicates the result of running a plugin. +type Status struct { + code Code + reasons []string + err error +} + +// Code returns code of the Status. +func (s *Status) Code() Code { + if s == nil { + return Success + } + return s.code +} + +// Message returns a concatenated message on reasons of the Status. +func (s *Status) Message() string { + if s == nil { + return "" + } + return strings.Join(s.reasons, ", ") +} + +// NewStatus makes a Status out of the given arguments and returns its pointer. +func NewStatus(code Code, reasons ...string) *Status { + s := &Status{ + code: code, + reasons: reasons, + } + if code == Error { + s.err = errors.New(s.Message()) + } + return s +} + +// AsStatus wraps an error in a Status. +func AsStatus(err error) *Status { + return &Status{ + code: Error, + reasons: []string{err.Error()}, + err: err, + } +} + +// AsError returns nil if the status is a success; otherwise returns an "error" object +// with a concatenated message on reasons of the Status. +func (s *Status) AsError() error { + if s.IsSuccess() { + return nil + } + if s.err != nil { + return s.err + } + return errors.New(s.Message()) +} + +// IsSuccess returns true if and only if "Status" is nil or Code is "Success". +func (s *Status) IsSuccess() bool { + return s.Code() == Success +} + +// IsUnschedulable returns true if "Status" is Unschedulable +func (s *Status) IsUnschedulable() bool { + return s.Code() == Unschedulable +} + +type PodScore struct { + ID int32 + Score int64 +} + +type PodScoreList []PodScore + +// PluginToPodScores declares a map from plugin name to its PodScoreList. +type PluginToPodScores map[string]PodScoreList + +// PluginToStatus maps plugin name to status. Currently used to identify which Filter plugin +// returned which status. +type PluginToStatus map[string]*Status + +// Merge merges the statuses in the map into one. The resulting status code have the following +// precedence: Error, Unschedulable, Success +func (p PluginToStatus) Merge() *Status { + if len(p) == 0 { + return nil + } + + finalStatus := NewStatus(Success) + for _, s := range p { + if s.Code() == Error { + finalStatus.err = s.AsError() + } + if s.Code() > finalStatus.code { + finalStatus.code = s.Code() + } + + finalStatus.reasons = append(finalStatus.reasons, s.reasons...) + } + + return finalStatus +} diff --git a/pkg/common/scheduler/statefulset/state.go b/pkg/common/scheduler/state/state.go similarity index 76% rename from pkg/common/scheduler/statefulset/state.go rename to pkg/common/scheduler/state/state.go index a6cb57317c..b5e3af452f 100644 --- a/pkg/common/scheduler/statefulset/state.go +++ b/pkg/common/scheduler/state/state.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package statefulset +package state import ( "context" @@ -28,59 +28,59 @@ import ( "knative.dev/pkg/logging" ) -type stateAccessor interface { +type StateAccessor interface { // State returns the current state (snapshot) about placed vpods // Take into account reserved vreplicas and update `reserved` to reflect // the current state. - State(reserved map[types.NamespacedName]map[string]int32) (*state, error) + State(reserved map[types.NamespacedName]map[string]int32) (*State, error) } // state provides information about the current scheduling of all vpods // It is used by for the scheduler and the autoscaler -type state struct { +type State struct { // free tracks the free capacity of each pod. - free []int32 + FreeCap []int32 - // lastOrdinal is the ordinal index corresponding to the last statefulset replica + // LastOrdinal is the ordinal index corresponding to the last statefulset replica // with placed vpods. - lastOrdinal int32 + LastOrdinal int32 // Pod capacity. - capacity int32 + Capacity int32 // Number of zones in cluster - numZones int32 + NumZones int32 // Number of available nodes in cluster - numNodes int32 + NumNodes int32 // Scheduling policy type for placing vreplicas on pods - schedulerPolicy SchedulerPolicyType + SchedulerPolicy scheduler.SchedulerPolicyType // Mapping node names of nodes currently in cluster to their zone info - nodeToZoneMap map[string]string + NodeToZoneMap map[string]string } // Free safely returns the free capacity at the given ordinal -func (s *state) Free(ordinal int32) int32 { - if int32(len(s.free)) <= ordinal { - return s.capacity +func (s *State) Free(ordinal int32) int32 { + if int32(len(s.FreeCap)) <= ordinal { + return s.Capacity } - return s.free[ordinal] + return s.FreeCap[ordinal] } // SetFree safely sets the free capacity at the given ordinal -func (s *state) SetFree(ordinal int32, value int32) { - s.free = grow(s.free, ordinal, s.capacity) - s.free[int(ordinal)] = value +func (s *State) SetFree(ordinal int32, value int32) { + s.FreeCap = grow(s.FreeCap, ordinal, s.Capacity) + s.FreeCap[int(ordinal)] = value } // freeCapacity returns the number of vreplicas that can be used, // up to the last ordinal -func (s *state) freeCapacity() int32 { +func (s *State) FreeCapacity() int32 { t := int32(0) - for i := int32(0); i <= s.lastOrdinal; i++ { - t += s.free[i] + for i := int32(0); i <= s.LastOrdinal; i++ { + t += s.FreeCap[i] } return t } @@ -91,12 +91,12 @@ type stateBuilder struct { logger *zap.SugaredLogger vpodLister scheduler.VPodLister capacity int32 - schedulerPolicy SchedulerPolicyType + schedulerPolicy scheduler.SchedulerPolicyType nodeLister corev1.NodeLister } -// newStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func newStateBuilder(ctx context.Context, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy SchedulerPolicyType, nodeLister corev1.NodeLister) stateAccessor { +// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested +func NewStateBuilder(ctx context.Context, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, nodeLister corev1.NodeLister) StateAccessor { return &stateBuilder{ ctx: ctx, logger: logging.FromContext(ctx), @@ -107,7 +107,7 @@ func newStateBuilder(ctx context.Context, lister scheduler.VPodLister, podCapaci } } -func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*state, error) { +func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) { vpods, err := s.vpodLister() if err != nil { return nil, err @@ -151,7 +151,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } - if s.schedulerPolicy == EVENSPREAD || s.schedulerPolicy == EVENSPREAD_BYNODE { + if s.schedulerPolicy == scheduler.EVENSPREAD || s.schedulerPolicy == scheduler.EVENSPREAD_BYNODE { //TODO: need a node watch to see if # nodes/ # zones have gone up or down nodes, err := s.nodeLister.List(labels.Everything()) if err != nil { @@ -165,7 +165,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) if node.Spec.Unschedulable { continue //ignore node that is currently unschedulable } - zoneName, ok := node.GetLabels()[ZoneLabel] + zoneName, ok := node.GetLabels()[scheduler.ZoneLabel] if !ok { continue //ignore node that doesn't have zone info (maybe a test setup or control node) } @@ -174,15 +174,15 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) zoneMap[zoneName] = struct{}{} } - s.logger.Infow("cluster state info", zap.String("numZones", fmt.Sprint(len(zoneMap))), zap.String("numNodes", fmt.Sprint(len(nodeToZoneMap)))) - return &state{free: free, lastOrdinal: last, capacity: s.capacity, numZones: int32(len(zoneMap)), numNodes: int32(len(nodeToZoneMap)), schedulerPolicy: s.schedulerPolicy, nodeToZoneMap: nodeToZoneMap}, nil + s.logger.Infow("cluster state info", zap.String("NumZones", fmt.Sprint(len(zoneMap))), zap.String("NumNodes", fmt.Sprint(len(nodeToZoneMap)))) + return &State{FreeCap: free, LastOrdinal: last, Capacity: s.capacity, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)), SchedulerPolicy: s.schedulerPolicy, NodeToZoneMap: nodeToZoneMap}, nil } - return &state{free: free, lastOrdinal: last, capacity: s.capacity, schedulerPolicy: s.schedulerPolicy}, nil + return &State{FreeCap: free, LastOrdinal: last, Capacity: s.capacity, SchedulerPolicy: s.schedulerPolicy}, nil } func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { - ordinal := ordinalFromPodName(podName) + ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) free[ordinal] -= vreplicas diff --git a/pkg/common/scheduler/statefulset/state_test.go b/pkg/common/scheduler/state/state_test.go similarity index 61% rename from pkg/common/scheduler/statefulset/state_test.go rename to pkg/common/scheduler/state/state_test.go index 18cbdf173f..f17053d1ac 100644 --- a/pkg/common/scheduler/statefulset/state_test.go +++ b/pkg/common/scheduler/state/state_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package statefulset +package state import ( "fmt" @@ -26,18 +26,23 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" + "knative.dev/eventing-kafka/pkg/common/scheduler" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" listers "knative.dev/eventing/pkg/reconciler/testing/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" ) +const ( + testNs = "test-ns" +) + func TestStateBuilder(t *testing.T) { testCases := []struct { name string vpods [][]duckv1alpha1.Placement - expected state + expected State freec int32 - schedulerPolicy SchedulerPolicyType + schedulerPolicy scheduler.SchedulerPolicyType reserved map[types.NamespacedName]map[string]int32 nodes []*v1.Node err error @@ -45,16 +50,16 @@ func TestStateBuilder(t *testing.T) { { name: "no vpods", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, schedulerPolicy: MAXFILLUP}, + expected: State{Capacity: 10, FreeCap: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP}, freec: int32(0), - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, }, { name: "one vpods", vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: state{capacity: 10, free: []int32{int32(9)}, lastOrdinal: 0, schedulerPolicy: MAXFILLUP}, + expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, LastOrdinal: 0, SchedulerPolicy: scheduler.MAXFILLUP}, freec: int32(9), - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, }, { name: "many vpods, no gaps", @@ -63,9 +68,9 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 2}}, {{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}}, }, - expected: state{capacity: 10, free: []int32{int32(8), int32(5), int32(5)}, lastOrdinal: 2, schedulerPolicy: MAXFILLUP}, + expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, LastOrdinal: 2, SchedulerPolicy: scheduler.MAXFILLUP}, freec: int32(18), - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, }, { name: "many vpods, with gaps", @@ -74,9 +79,9 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: state{capacity: 10, free: []int32{int32(9), int32(10), int32(5), int32(10)}, lastOrdinal: 2, schedulerPolicy: MAXFILLUP}, + expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, LastOrdinal: 2, SchedulerPolicy: scheduler.MAXFILLUP}, freec: int32(24), - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, }, { name: "many vpods, with gaps and reserved vreplicas", @@ -85,14 +90,14 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: state{capacity: 10, free: []int32{int32(4), int32(10), int32(5), int32(10)}, lastOrdinal: 2, schedulerPolicy: MAXFILLUP}, + expected: State{Capacity: 10, FreeCap: []int32{int32(4), int32(10), int32(5), int32(10)}, LastOrdinal: 2, SchedulerPolicy: scheduler.MAXFILLUP}, freec: int32(19), reserved: map[types.NamespacedName]map[string]int32{ {Name: "vpod-name-3", Namespace: testNs}: { "statefulset-name-0": 5, }, }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, }, { name: "many vpods, with gaps and reserved vreplicas on existing and new placements, fully committed", @@ -101,7 +106,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: state{capacity: 10, free: []int32{int32(4), int32(7), int32(5), int32(10), int32(5)}, lastOrdinal: 4, schedulerPolicy: MAXFILLUP}, + expected: State{Capacity: 10, FreeCap: []int32{int32(4), int32(7), int32(5), int32(10), int32(5)}, LastOrdinal: 4, SchedulerPolicy: scheduler.MAXFILLUP}, freec: int32(31), reserved: map[types.NamespacedName]map[string]int32{ {Name: "vpod-name-3", Namespace: "vpod-ns-3"}: { @@ -112,7 +117,7 @@ func TestStateBuilder(t *testing.T) { "statefulset-name-1": 3, }, }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, }, { name: "many vpods, with gaps and reserved vreplicas on existing and new placements, partially committed", @@ -121,7 +126,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: state{capacity: 10, free: []int32{int32(4), int32(7), int32(2), int32(10)}, lastOrdinal: 2, schedulerPolicy: MAXFILLUP}, + expected: State{Capacity: 10, FreeCap: []int32{int32(4), int32(7), int32(2), int32(10)}, LastOrdinal: 2, SchedulerPolicy: scheduler.MAXFILLUP}, freec: int32(13), reserved: map[types.NamespacedName]map[string]int32{ {Name: "vpod-name-0", Namespace: "vpod-ns-0"}: { @@ -132,45 +137,45 @@ func TestStateBuilder(t *testing.T) { "statefulset-name-1": 3, }, }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, }, { name: "no vpods, all nodes with zone labels", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 3, numNodes: 4, schedulerPolicy: EVENSPREAD, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-2"}}, + expected: State{Capacity: 10, FreeCap: []int32{}, LastOrdinal: -1, NumZones: 3, NumNodes: 4, SchedulerPolicy: scheduler.EVENSPREAD, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-2"}}, freec: int32(0), - schedulerPolicy: EVENSPREAD, - nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNode("node-1", "zone-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, + schedulerPolicy: scheduler.EVENSPREAD, + nodes: []*v1.Node{tscheduler.MakeNode("node-0", "zone-0"), tscheduler.MakeNode("node-1", "zone-1"), tscheduler.MakeNode("node-2", "zone-2"), tscheduler.MakeNode("node-3", "zone-2")}, }, { name: "no vpods, one node with no label", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 2, numNodes: 3, schedulerPolicy: EVENSPREAD, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-2": "zone-2", "node-3": "zone-2"}}, + expected: State{Capacity: 10, FreeCap: []int32{}, LastOrdinal: -1, NumZones: 2, NumNodes: 3, SchedulerPolicy: scheduler.EVENSPREAD, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-2": "zone-2", "node-3": "zone-2"}}, freec: int32(0), - schedulerPolicy: EVENSPREAD, - nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNodeNoLabel("node-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, + schedulerPolicy: scheduler.EVENSPREAD, + nodes: []*v1.Node{tscheduler.MakeNode("node-0", "zone-0"), tscheduler.MakeNodeNoLabel("node-1"), tscheduler.MakeNode("node-2", "zone-2"), tscheduler.MakeNode("node-3", "zone-2")}, }, { name: "no vpods, all nodes with zone labels", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 3, numNodes: 4, schedulerPolicy: EVENSPREAD_BYNODE, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-2"}}, + expected: State{Capacity: 10, FreeCap: []int32{}, LastOrdinal: -1, NumZones: 3, NumNodes: 4, SchedulerPolicy: scheduler.EVENSPREAD_BYNODE, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-2"}}, freec: int32(0), - schedulerPolicy: EVENSPREAD_BYNODE, - nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNode("node-1", "zone-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, + nodes: []*v1.Node{tscheduler.MakeNode("node-0", "zone-0"), tscheduler.MakeNode("node-1", "zone-1"), tscheduler.MakeNode("node-2", "zone-2"), tscheduler.MakeNode("node-3", "zone-2")}, }, { name: "no vpods, one node with no label", vpods: [][]duckv1alpha1.Placement{}, - expected: state{capacity: 10, free: []int32{}, lastOrdinal: -1, numZones: 2, numNodes: 3, schedulerPolicy: EVENSPREAD_BYNODE, nodeToZoneMap: map[string]string{"node-0": "zone-0", "node-2": "zone-2", "node-3": "zone-2"}}, + expected: State{Capacity: 10, FreeCap: []int32{}, LastOrdinal: -1, NumZones: 2, NumNodes: 3, SchedulerPolicy: scheduler.EVENSPREAD_BYNODE, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-2": "zone-2", "node-3": "zone-2"}}, freec: int32(0), - schedulerPolicy: EVENSPREAD_BYNODE, - nodes: []*v1.Node{makeNode("node-0", "zone-0"), makeNodeNoLabel("node-1"), makeNode("node-2", "zone-2"), makeNode("node-3", "zone-2")}, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, + nodes: []*v1.Node{tscheduler.MakeNode("node-0", "zone-0"), tscheduler.MakeNodeNoLabel("node-1"), tscheduler.MakeNode("node-2", "zone-2"), tscheduler.MakeNode("node-3", "zone-2")}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ctx, _ := setupFakeContext(t) + ctx, _ := tscheduler.SetupFakeContext(t) vpodClient := tscheduler.NewVPodClient() nodelist := make([]runtime.Object, 0, len(tc.nodes)) @@ -181,7 +186,7 @@ func TestStateBuilder(t *testing.T) { vpodClient.Create(vpodNamespace, vpodName, 1, placements) } - if tc.schedulerPolicy == EVENSPREAD || tc.schedulerPolicy == EVENSPREAD_BYNODE { + if tc.schedulerPolicy == scheduler.EVENSPREAD || tc.schedulerPolicy == scheduler.EVENSPREAD_BYNODE { for i := 0; i < len(tc.nodes); i++ { node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tc.nodes[i], metav1.CreateOptions{}) if err != nil { @@ -192,7 +197,7 @@ func TestStateBuilder(t *testing.T) { } ls := listers.NewListers(nodelist) - stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, ls.GetNodeLister()) + stateBuilder := NewStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, ls.GetNodeLister()) state, err := stateBuilder.State(tc.reserved) if err != nil { t.Fatal("unexpected error", err) @@ -202,8 +207,8 @@ func TestStateBuilder(t *testing.T) { t.Errorf("unexpected state, got %v, want %v", state, tc.expected) } - if state.freeCapacity() != tc.freec { - t.Errorf("unexpected free capacity, got %d, want %d", state.freeCapacity(), tc.freec) + if state.FreeCapacity() != tc.freec { + t.Errorf("unexpected free capacity, got %d, want %d", state.FreeCapacity(), tc.freec) } }) } diff --git a/pkg/common/scheduler/statefulset/autoscaler.go b/pkg/common/scheduler/statefulset/autoscaler.go index 3a5f0885c4..1852e53109 100644 --- a/pkg/common/scheduler/statefulset/autoscaler.go +++ b/pkg/common/scheduler/statefulset/autoscaler.go @@ -27,6 +27,7 @@ import ( clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" "knative.dev/eventing-kafka/pkg/common/scheduler" + "knative.dev/eventing-kafka/pkg/common/scheduler/state" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" ) @@ -45,7 +46,7 @@ type autoscaler struct { statefulSetName string vpodLister scheduler.VPodLister logger *zap.SugaredLogger - stateAccessor stateAccessor + stateAccessor state.StateAccessor trigger chan int32 evictor scheduler.Evictor @@ -59,7 +60,7 @@ type autoscaler struct { func NewAutoscaler(ctx context.Context, namespace, name string, lister scheduler.VPodLister, - stateAccessor stateAccessor, + stateAccessor state.StateAccessor, evictor scheduler.Evictor, refreshPeriod time.Duration, capacity int32) Autoscaler { @@ -122,9 +123,9 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen a.logger.Infow("checking adapter capacity", zap.Int32("pending", pending), zap.Int32("replicas", scale.Spec.Replicas), - zap.Int32("last ordinal", state.lastOrdinal)) + zap.Int32("last ordinal", state.LastOrdinal)) - newreplicas := state.lastOrdinal + 1 // Ideal number + newreplicas := state.LastOrdinal + 1 // Ideal number // Take into account pending replicas if pending > 0 { @@ -133,8 +134,8 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen } // Make sure to never scale down past the last ordinal - if newreplicas <= state.lastOrdinal { - newreplicas = state.lastOrdinal + 1 + if newreplicas <= state.LastOrdinal { + newreplicas = state.LastOrdinal + 1 } // Only scale down if permitted @@ -160,17 +161,17 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen return nil } -func (a *autoscaler) mayCompact(s *state) { +func (a *autoscaler) mayCompact(s *state.State) { // when there is only one pod there is nothing to move! - if s.lastOrdinal < 1 { + if s.LastOrdinal < 1 { return } - if s.schedulerPolicy == MAXFILLUP { + if s.SchedulerPolicy == scheduler.MAXFILLUP { // Determine if there is enough free capacity to // move all vreplicas placed in the last pod to pods with a lower ordinal - freeCapacity := s.freeCapacity() - s.Free(s.lastOrdinal) - usedInLastPod := s.capacity - s.Free(s.lastOrdinal) + freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal) + usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) if freeCapacity >= usedInLastPod { err := a.compact(s) @@ -184,7 +185,7 @@ func (a *autoscaler) mayCompact(s *state) { } } -func (a *autoscaler) compact(s *state) error { +func (a *autoscaler) compact(s *state.State) error { a.logger.Info("compacting vreplicas") vpods, err := a.vpodLister() if err != nil { @@ -194,9 +195,9 @@ func (a *autoscaler) compact(s *state) error { for _, vpod := range vpods { placements := vpod.GetPlacements() for _, placement := range placements { - ordinal := ordinalFromPodName(placement.PodName) + ordinal := state.OrdinalFromPodName(placement.PodName) - if ordinal == s.lastOrdinal { + if ordinal == s.LastOrdinal { a.logger.Infow("evicting vreplica(s)", zap.String("name", vpod.GetKey().Name), zap.String("namespace", vpod.GetKey().Namespace), diff --git a/pkg/common/scheduler/statefulset/autoscaler_test.go b/pkg/common/scheduler/statefulset/autoscaler_test.go index 7626211d5d..d15a570eb4 100644 --- a/pkg/common/scheduler/statefulset/autoscaler_test.go +++ b/pkg/common/scheduler/statefulset/autoscaler_test.go @@ -31,6 +31,7 @@ import ( duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" "knative.dev/eventing-kafka/pkg/common/scheduler" + "knative.dev/eventing-kafka/pkg/common/scheduler/state" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" ) @@ -46,7 +47,7 @@ func TestAutoscaler(t *testing.T) { pendings int32 scaleDown bool wantReplicas int32 - schedulerPolicy SchedulerPolicyType + schedulerPolicy scheduler.SchedulerPolicyType }{ { name: "no replicas, no placements, no pending", @@ -193,7 +194,7 @@ func TestAutoscaler(t *testing.T) { }, pendings: int32(3), wantReplicas: int32(3), - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "with replicas, with placements, with pending, enough capacity", @@ -205,7 +206,7 @@ func TestAutoscaler(t *testing.T) { }, pendings: int32(3), wantReplicas: int32(3), - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "no replicas, with placements, with pending, enough capacity", @@ -217,7 +218,7 @@ func TestAutoscaler(t *testing.T) { }, pendings: int32(3), wantReplicas: int32(3), - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "with replicas, with placements, with pending, enough capacity", @@ -229,20 +230,20 @@ func TestAutoscaler(t *testing.T) { }, pendings: int32(3), wantReplicas: int32(3), - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ctx, _ := setupFakeContext(t) + ctx, _ := tscheduler.SetupFakeContext(t) vpodClient := tscheduler.NewVPodClient() ls := listers.NewListers(nil) - stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister()) + stateAccessor := state.NewStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister()) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) - _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) + _, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } @@ -275,7 +276,7 @@ func TestAutoscaler(t *testing.T) { } func TestAutoscalerScaleDownToZero(t *testing.T) { - ctx, cancel := setupFakeContext(t) + ctx, cancel := tscheduler.SetupFakeContext(t) afterUpdate := make(chan bool) kubeclient.Get(ctx).PrependReactor("update", "statefulsets", func(action gtesting.Action) (handled bool, ret runtime.Object, err error) { @@ -287,10 +288,10 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { vpodClient := tscheduler.NewVPodClient() ls := listers.NewListers(nil) - stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, ls.GetNodeLister()) + stateAccessor := state.NewStateBuilder(ctx, vpodClient.List, 10, scheduler.MAXFILLUP, ls.GetNodeLister()) sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs) - _, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{}) + _, err := sfsClient.Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } @@ -336,7 +337,7 @@ func TestCompactor(t *testing.T) { name string replicas int32 vpods []scheduler.VPod - schedulerPolicy SchedulerPolicyType + schedulerPolicy scheduler.SchedulerPolicyType wantEvictions map[types.NamespacedName]duckv1alpha1.Placement }{ { @@ -345,7 +346,7 @@ func TestCompactor(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 0, nil), }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, wantEvictions: nil, }, { @@ -356,7 +357,7 @@ func TestCompactor(t *testing.T) { {PodName: "pod-0", VReplicas: int32(8)}, {PodName: "pod-1", VReplicas: int32(7)}}), }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, wantEvictions: nil, }, { @@ -367,7 +368,7 @@ func TestCompactor(t *testing.T) { {PodName: "pod-0", VReplicas: int32(8)}, {PodName: "pod-1", VReplicas: int32(3)}}), }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, wantEvictions: nil, }, { @@ -378,7 +379,7 @@ func TestCompactor(t *testing.T) { {PodName: "pod-0", VReplicas: int32(8)}, {PodName: "pod-1", VReplicas: int32(2)}}), }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, wantEvictions: map[types.NamespacedName]duckv1alpha1.Placement{ {Name: "vpod-1", Namespace: testNs}: {PodName: "pod-1", VReplicas: int32(2)}, }, @@ -395,7 +396,7 @@ func TestCompactor(t *testing.T) { {PodName: "pod-0", VReplicas: int32(2)}, {PodName: "pod-2", VReplicas: int32(7)}}), }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, wantEvictions: nil, }, { @@ -410,7 +411,7 @@ func TestCompactor(t *testing.T) { {PodName: "pod-0", VReplicas: int32(2)}, {PodName: "pod-2", VReplicas: int32(7)}}), }, - schedulerPolicy: MAXFILLUP, + schedulerPolicy: scheduler.MAXFILLUP, wantEvictions: map[types.NamespacedName]duckv1alpha1.Placement{ {Name: "vpod-2", Namespace: testNs}: {PodName: "pod-2", VReplicas: int32(7)}, }, @@ -419,11 +420,11 @@ func TestCompactor(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ctx, _ := setupFakeContext(t) + ctx, _ := tscheduler.SetupFakeContext(t) vpodClient := tscheduler.NewVPodClient() ls := listers.NewListers(nil) - stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister()) + stateAccessor := state.NewStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister()) evictions := make(map[types.NamespacedName]duckv1alpha1.Placement) recordEviction := func(vpod scheduler.VPod, from *duckv1alpha1.Placement) error { diff --git a/pkg/common/scheduler/statefulset/scheduler.go b/pkg/common/scheduler/statefulset/scheduler.go index 3661fea6f5..7cc5461833 100644 --- a/pkg/common/scheduler/statefulset/scheduler.go +++ b/pkg/common/scheduler/statefulset/scheduler.go @@ -19,7 +19,9 @@ package statefulset import ( "context" "errors" + "fmt" "math" + "math/rand" "sort" "sync" "time" @@ -27,6 +29,7 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -39,22 +42,16 @@ import ( duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" "knative.dev/eventing-kafka/pkg/common/scheduler" + "knative.dev/eventing-kafka/pkg/common/scheduler/factory" + "knative.dev/eventing-kafka/pkg/common/scheduler/state" podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" -) - -type SchedulerPolicyType string - -const ( - // MAXFILLUP policy type adds vreplicas to existing pods to fill them up before adding to new pods - MAXFILLUP SchedulerPolicyType = "MAXFILLUP" - // EVENSPREAD policy type spreads vreplicas uniformly across zones to reduce impact of failure - EVENSPREAD = "EVENSPREAD" - // EVENSPREAD_BYNODE policy type spreads vreplicas uniformly across nodes to reduce impact of failure - EVENSPREAD_BYNODE = "EVENSPREAD_BYNODE" -) -const ( - ZoneLabel = "topology.kubernetes.io/zone" + _ "knative.dev/eventing-kafka/pkg/common/scheduler/plugins/core/availabilitynodepriority" + _ "knative.dev/eventing-kafka/pkg/common/scheduler/plugins/core/availabilityzonepriority" + _ "knative.dev/eventing-kafka/pkg/common/scheduler/plugins/core/evenpodspread" + _ "knative.dev/eventing-kafka/pkg/common/scheduler/plugins/core/lowestordinalpriority" + _ "knative.dev/eventing-kafka/pkg/common/scheduler/plugins/core/podfitsresources" + _ "knative.dev/eventing-kafka/pkg/common/scheduler/plugins/kafka/nomaxresourcecount" ) // NewScheduler creates a new scheduler with pod autoscaling enabled. @@ -63,29 +60,31 @@ func NewScheduler(ctx context.Context, lister scheduler.VPodLister, refreshPeriod time.Duration, capacity int32, - schedulerPolicy SchedulerPolicyType, + schedulerPolicy scheduler.SchedulerPolicyType, nodeLister corev1listers.NodeLister, - evictor scheduler.Evictor) scheduler.Scheduler { + evictor scheduler.Evictor, + policy *SchedulerPolicy) scheduler.Scheduler { - stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy, nodeLister) + stateAccessor := state.NewStateBuilder(ctx, lister, capacity, schedulerPolicy, nodeLister) autoscaler := NewAutoscaler(ctx, namespace, name, lister, stateAccessor, evictor, refreshPeriod, capacity) podInformer := podinformer.Get(ctx) podLister := podInformer.Lister().Pods(namespace) go autoscaler.Start(ctx) - return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister) + return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister, policy) } // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { + ctx context.Context logger *zap.SugaredLogger statefulSetName string statefulSetClient clientappsv1.StatefulSetInterface podLister corev1listers.PodNamespaceLister vpodLister scheduler.VPodLister lock sync.Locker - stateAccessor stateAccessor + stateAccessor state.StateAccessor autoscaler Autoscaler // replicas is the (cached) number of statefulset replicas. @@ -99,15 +98,37 @@ type StatefulSetScheduler struct { // reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been // committed yet (ie. not appearing in vpodLister) reserved map[types.NamespacedName]map[string]int32 + + //Scheduler responsible for initializing and running scheduler plugins + policy *SchedulerPolicy + + // predicates that will always be configured. + mandatoryPredicates sets.String + + // predicates and priorities that will be used if either was set to nil in the config map + defaultPredicates sets.String + defaultPriorities map[string]int64 +} + +func ValidatePolicy(policy *SchedulerPolicy) []error { + var validationErrors []error + + for _, priority := range policy.Priorities { + if priority.Weight <= 0 || priority.Weight >= MaxTotalWeight { + validationErrors = append(validationErrors, fmt.Errorf("priority %s should have a positive weight applied to it or it has overflown", priority.Name)) + } + } + return validationErrors } func NewStatefulSetScheduler(ctx context.Context, namespace, name string, lister scheduler.VPodLister, - stateAccessor stateAccessor, - autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler { + stateAccessor state.StateAccessor, + autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister, policy *SchedulerPolicy) scheduler.Scheduler { scheduler := &StatefulSetScheduler{ + ctx: ctx, logger: logging.FromContext(ctx), statefulSetName: name, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace), @@ -118,6 +139,20 @@ func NewStatefulSetScheduler(ctx context.Context, stateAccessor: stateAccessor, reserved: make(map[types.NamespacedName]map[string]int32), autoscaler: autoscaler, + policy: policy, + mandatoryPredicates: sets.NewString( + state.PodFitsResources, + ), + defaultPredicates: sets.NewString( + state.PodFitsResources, + state.NoMaxResourceCount, + state.EvenPodSpread, + ), + defaultPriorities: map[string]int64{ + state.AvailabilityNodePriority: 1, + state.AvailabilityZonePriority: 1, + state.LowestOrdinalPriority: 1, + }, } // Monitor our statefulset @@ -140,7 +175,7 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla } sort.SliceStable(placements, func(i int, j int) bool { - return ordinalFromPodName(placements[i].PodName) < ordinalFromPodName(placements[j].PodName) + return state.OrdinalFromPodName(placements[i].PodName) < state.OrdinalFromPodName(placements[j].PodName) }) // Reserve new placements until they are committed to the vpod. @@ -187,40 +222,57 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 return placements, nil } - // Need less => scale down - if tr > vpod.GetVReplicas() { - logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - if state.schedulerPolicy == EVENSPREAD || state.schedulerPolicy == EVENSPREAD_BYNODE { - //spreadVal is the minimum number of replicas to be left behind in each failure domain for high availability - if state.schedulerPolicy == EVENSPREAD { - spreadVal = int32(math.Floor(float64(vpod.GetVReplicas()) / float64(state.numZones))) - } else if state.schedulerPolicy == EVENSPREAD_BYNODE { - spreadVal = int32(math.Floor(float64(vpod.GetVReplicas()) / float64(state.numNodes))) + if state.SchedulerPolicy != "" { + // Need less => scale down + if tr > vpod.GetVReplicas() { + logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + if state.SchedulerPolicy == scheduler.EVENSPREAD || state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { + //spreadVal is the minimum number of replicas to be left behind in each failure domain for high availability + if state.SchedulerPolicy == scheduler.EVENSPREAD { + spreadVal = int32(math.Floor(float64(vpod.GetVReplicas()) / float64(state.NumZones))) + } else if state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { + spreadVal = int32(math.Floor(float64(vpod.GetVReplicas()) / float64(state.NumNodes))) + } + logger.Infow("number of replicas per domain", zap.Int32("spreadVal", spreadVal)) + placements = s.removeReplicasEvenSpread(state, tr-vpod.GetVReplicas(), placements, spreadVal) + } else { + placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) + } + + // Do not trigger the autoscaler to avoid unnecessary churn + + return placements, nil + } + + // Need more => scale up + logger.Infow("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + if state.SchedulerPolicy == scheduler.EVENSPREAD || state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { + //spreadVal is the maximum number of replicas to be placed in each failure domain for high availability + if state.SchedulerPolicy == scheduler.EVENSPREAD { + spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.NumZones))) + } else if state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { + spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.NumNodes))) } logger.Infow("number of replicas per domain", zap.Int32("spreadVal", spreadVal)) - placements = s.removeReplicasEvenSpread(state, tr-vpod.GetVReplicas(), placements, spreadVal) + placements, left = s.addReplicasEvenSpread(state, vpod.GetVReplicas()-tr, placements, spreadVal) } else { - placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) + placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } - // Do not trigger the autoscaler to avoid unnecessary churn - - return placements, nil - } + } else if s.policy != nil { //Predicates and priorities must be used for scheduling + if errs := ValidatePolicy(s.policy); errs != nil { + return nil, errors.New("validating policy failed") + } - // Need more => scale up - logger.Infow("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - if state.schedulerPolicy == EVENSPREAD || state.schedulerPolicy == EVENSPREAD_BYNODE { - //spreadVal is the maximum number of replicas to be placed in each failure domain for high availability - if state.schedulerPolicy == EVENSPREAD { - spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numZones))) - } else if state.schedulerPolicy == EVENSPREAD_BYNODE { - spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numNodes))) + // Need less => scale down + if tr > vpod.GetVReplicas() { + logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + //TODO } - logger.Infow("number of replicas per domain", zap.Int32("spreadVal", spreadVal)) - placements, left = s.addReplicasEvenSpread(state, vpod.GetVReplicas()-tr, placements, spreadVal) - } else { - placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) + + // Need more => scale up + logger.Infow("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + placements, left = s.addReplicasWithPolicy(vpod, state, vpod.GetVReplicas()-tr, placements) } if left > 0 { @@ -239,9 +291,275 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 logger.Infow("scheduling successful", zap.Any("placement", placements)) delete(s.pending, vpod.GetKey()) + return placements, nil } +func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, states *state.State, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { + logger := s.logger.Named("add replicas with policy") + + numVreps := diff + for i := int32(0); i < numVreps; i++ { //schedule one vreplica at a time (find most suitable pod placement satisying predicates with high score) + selectedPodIDsToVreps := make(map[int32]int32) // map of podID to number of vreps placed on that pod + + // Get the current placements state + state, err := s.stateAccessor.State(s.reserved) + if err != nil { + logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) + return placements, diff + } + + if s.replicas == 0 { //no pods to filter + logger.Infow("no pods available in statefulset") + s.reservePlacements(vpod, placements) + diff = diff - i //for autoscaling up + break //end the iteration for all vreps since there are not pods + } + + feasiblePods, err := s.findFeasiblePods(s.ctx, state) + if err != nil { + logger.Info("error while filtering pods using predicates", zap.Error(err)) + s.reservePlacements(vpod, placements) + diff = diff - i //for autoscaling up + break + } + logger.Info("Computing predicates for a vreplica is done") + + if len(feasiblePods) == 0 { //no pods available to schedule this vreplica + logger.Info("no feasible pods available to schedule this vreplica") + s.reservePlacements(vpod, placements) + diff = diff - i //for autoscaling up + break + } + + if len(feasiblePods) == 1 { //nothing to score, place vrep on that pod + logger.Infof("Selected pod #%v for vreplica #%v ", feasiblePods[0], i) + selectedPodIDsToVreps[feasiblePods[0]]++ //increasing # of vreps for that pod ordinal + placements = s.addSelectionToPlacements(selectedPodIDsToVreps, placements) + s.reservePlacements(vpod, placements) + diff-- + continue + } + + priorityList, err := s.prioritizePods(s.ctx, state, feasiblePods) + if err != nil { + logger.Info("error while scoring pods using priorities", zap.Error(err)) + s.reservePlacements(vpod, placements) + diff = diff - i //for autoscaling up + break + } + + placementPodID, err := s.selectPod(priorityList) + if err != nil { + logger.Info("error while selecting the placement pod", zap.Error(err)) + s.reservePlacements(vpod, placements) + diff = diff - i //for autoscaling up + break + } + + logger.Infof("Selected pod #%v for vreplica #%v", placementPodID, i) + selectedPodIDsToVreps[placementPodID]++ //increasing # of vreps for that pod ordinal + placements = s.addSelectionToPlacements(selectedPodIDsToVreps, placements) + s.reservePlacements(vpod, placements) + diff-- + } + return placements, diff +} + +func (s *StatefulSetScheduler) addSelectionToPlacements(selectedPodIDsToVreps map[int32]int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { + logger := s.logger.Named("add selection to placements") + seen := false + + for podID, newreps := range selectedPodIDsToVreps { + for i := 0; i < len(placements); i++ { + ordinal := state.OrdinalFromPodName(placements[i].PodName) + if podID == ordinal { + logger.Infof("Increasing vreplicas for podID #%v by %v", podID, newreps) + seen = true + placements[i].VReplicas = placements[i].VReplicas + newreps + } + } + if !seen { + logger.Infof("Adding vreplicas for podID #%v by %v", podID, newreps) + placements = append(placements, duckv1alpha1.Placement{ + PodName: state.PodNameFromOrdinal(s.statefulSetName, podID), + VReplicas: newreps, + }) + } + } + return placements +} + +// findFeasiblePods finds the pods that fit the filter plugins +func (s *StatefulSetScheduler) findFeasiblePods(ctx context.Context, state *state.State) ([]int32, error) { + logger := s.logger.Named("find feasible pods") + + feasiblePods := make([]int32, 0) + for podId := int32(0); podId < s.replicas; podId++ { + statusMap := s.RunFilterPlugins(ctx, state, podId) + status := statusMap.Merge() + if status.IsSuccess() { + logger.Infof("SUCCESS! Adding Pod #%v to feasible list", podId) + feasiblePods = append(feasiblePods, podId) + } else { + logger.Infof("UNSCHEDULABLE! Cannot add Pod #%v to feasible list", podId) + } + } + + return feasiblePods, nil +} + +// prioritizePods prioritizes the pods by running the score plugins, which return a score for each pod. +// The scores from each plugin are added together to make the score for that pod. +func (s *StatefulSetScheduler) prioritizePods(ctx context.Context, states *state.State, feasiblePods []int32) (state.PodScoreList, error) { + logger := s.logger.Named("prioritize all feasible pods") + + // If no priority configs are provided, then all pods will have a score of one + result := make(state.PodScoreList, 0, len(feasiblePods)) + if !s.HasScorePlugins(states) { + for _, podID := range feasiblePods { + result = append(result, state.PodScore{ + ID: podID, + Score: 1, + }) + } + return result, nil + } + + scoresMap, scoreStatus := s.RunScorePlugins(ctx, states, feasiblePods) + if !scoreStatus.IsSuccess() { + logger.Infof("FAILURE! Cannot score feasible pods due to plugin errors %v", scoreStatus.AsError()) + return nil, scoreStatus.AsError() + } + + // Summarize all scores. + for i := range feasiblePods { + result = append(result, state.PodScore{ID: feasiblePods[i], Score: 0}) + for j := range scoresMap { + result[i].Score += scoresMap[j][i].Score + } + } + + logger.Info("SUCCESS! Scoring all feasible pods completed") + return result, nil +} + +// selectPod takes a prioritized list of pods and then picks one +func (s *StatefulSetScheduler) selectPod(podScoreList state.PodScoreList) (int32, error) { + logger := s.logger.Named("select the winning pod for current vrep") + + if len(podScoreList) == 0 { + logger.Error("empty priority list") + return -1, fmt.Errorf("empty priority list") //no selected pod + } + + maxScore := podScoreList[0].Score + selected := podScoreList[0].ID + cntOfMaxScore := 1 + for _, ps := range podScoreList[1:] { + if ps.Score > maxScore { + maxScore = ps.Score + selected = ps.ID + cntOfMaxScore = 1 + } else if ps.Score == maxScore { //if equal scores, randomly picks one + cntOfMaxScore++ + if rand.Intn(cntOfMaxScore) == 0 { + selected = ps.ID + } + } + } + logger.Infof("Pod #%v has been selected", selected) + return selected, nil +} + +// RunFilterPlugins runs the set of configured Filter plugins for a vrep on the given pod. +// If any of these plugins doesn't return "Success", the pod is not suitable for placing the vrep. +// Meanwhile, the failure message and status are set for the given pod. +func (s *StatefulSetScheduler) RunFilterPlugins(ctx context.Context, states *state.State, podID int32) state.PluginToStatus { + logger := s.logger.Named("run all filter plugins") + + statuses := make(state.PluginToStatus) + for _, plugin := range s.policy.Predicates { + pl, err := factory.GetFilterPlugin(plugin.Name) + if err != nil { + logger.Error("Could not find filter plugin in Registry: ", plugin.Name) + continue + } + + logger.Infof("Going to run filter plugin: %s using state: %v ", pl.Name(), states) + pluginStatus := s.runFilterPlugin(ctx, pl, states, podID) + if !pluginStatus.IsSuccess() { + if !pluginStatus.IsUnschedulable() { + logger.Errorf("filter plugin %q returned a bad status for pod %q: %v", pl.Name(), podID, pluginStatus.Message()) + errStatus := state.NewStatus(state.Error, fmt.Sprintf("running %q filter plugin for pod %q failed with: %v", pl.Name(), podID, pluginStatus.Message())) + return map[string]*state.Status{pl.Name(): errStatus} //TODO: if one plugin fails, then no more plugins are run + } + logger.Infof("filter plugin %q returned unschedulable for pod %q: %v", pl.Name(), podID, pluginStatus) + statuses[pl.Name()] = pluginStatus + return statuses //TODO: if one plugin fails (pod unschedulable), then no more plugins are run + } + } + + logger.Infof("all fitler plugins ran successfully") + return statuses +} + +func (s *StatefulSetScheduler) runFilterPlugin(ctx context.Context, pl state.FilterPlugin, states *state.State, podID int32) *state.Status { + status := pl.Filter(ctx, states, podID) + return status +} + +// RunScorePlugins runs the set of configured scoring plugins. It returns a list that stores for each scoring plugin name the corresponding PodScoreList(s). +// It also returns *Status, which is set to non-success if any of the plugins returns a non-success status. +func (s *StatefulSetScheduler) RunScorePlugins(ctx context.Context, states *state.State, feasiblePods []int32) (state.PluginToPodScores, *state.Status) { + logger := s.logger.Named("run all score plugins") + + pluginToPodScores := make(state.PluginToPodScores, len(s.policy.Priorities)) + for _, plugin := range s.policy.Priorities { + pl, err := factory.GetScorePlugin(plugin.Name) + if err != nil { + logger.Error("Could not find score plugin in registry: ", plugin.Name) + continue + } + + logger.Infof("Going to run score plugin: %s using state: %v ", pl.Name(), states) + pluginToPodScores[pl.Name()] = make(state.PodScoreList, len(feasiblePods)) + for index, podID := range feasiblePods { + score, pluginStatus := s.runScorePlugin(ctx, pl, states, podID) + if !pluginStatus.IsSuccess() { + logger.Errorf("scoring plugin %q returned error for pod %q: %v", pl.Name(), podID, pluginStatus.AsError()) + errStatus := state.NewStatus(state.Error, fmt.Sprintf("running %q scoring plugin for pod %q failed with: %v", pl.Name(), podID, pluginStatus.AsError())) + return pluginToPodScores, errStatus //TODO: if one plugin fails, then no more plugins are run + } + + score = score * plugin.Weight //WEIGHED SCORE VALUE + logger.Infof("scoring plugin %q produced score %v for pod %q: %v", pl.Name(), score, podID, pluginStatus) + pluginToPodScores[pl.Name()][index] = state.PodScore{ + ID: podID, + Score: score, + } + } + } + + logger.Info("all scoring plugins ran successfully") + return pluginToPodScores, state.NewStatus(state.Success) +} + +func (s *StatefulSetScheduler) runScorePlugin(ctx context.Context, pl state.ScorePlugin, states *state.State, podID int32) (int64, *state.Status) { + score, status := pl.Score(ctx, states, podID) + return score, status +} + +// HasFilterPlugins returns true if at least one filter plugin is defined. +func (s *StatefulSetScheduler) HasFilterPlugins(state *state.State) bool { + return len(s.policy.Predicates) > 0 +} + +// HasScorePlugins returns true if at least one score plugin is defined. +func (s *StatefulSetScheduler) HasScorePlugins(state *state.State) bool { + return len(s.policy.Priorities) > 0 +} + func (s *StatefulSetScheduler) removeReplicas(diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) for i := len(placements) - 1; i > -1; i-- { @@ -259,19 +577,19 @@ func (s *StatefulSetScheduler) removeReplicas(diff int32, placements []duckv1alp return newPlacements } -func (s *StatefulSetScheduler) removeReplicasEvenSpread(state *state, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) []duckv1alpha1.Placement { +func (s *StatefulSetScheduler) removeReplicasEvenSpread(state *state.State, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) []duckv1alpha1.Placement { logger := s.logger.Named("remove replicas") newPlacements := s.removeFromExistingReplicas(state, logger, diff, placements, evenSpread) return newPlacements } -func (s *StatefulSetScheduler) removeFromExistingReplicas(state *state, logger *zap.SugaredLogger, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) []duckv1alpha1.Placement { +func (s *StatefulSetScheduler) removeFromExistingReplicas(state *state.State, logger *zap.SugaredLogger, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) []duckv1alpha1.Placement { var domainNames []string var placementsByDomain map[string][]int32 newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) - if state.schedulerPolicy == EVENSPREAD_BYNODE { + if state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { placementsByDomain = s.getPlacementsByNodeKey(state, placements) } else { placementsByDomain = s.getPlacementsByZoneKey(state, placements) @@ -283,7 +601,7 @@ func (s *StatefulSetScheduler) removeFromExistingReplicas(state *state, logger * for i := 0; i < len(domainNames); i++ { //iterate through each domain var totalInDomain int32 - if state.schedulerPolicy == EVENSPREAD_BYNODE { + if state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { totalInDomain = s.getTotalVReplicasInNode(state, placements, domainNames[i]) } else { totalInDomain = s.getTotalVReplicasInZone(state, placements, domainNames[i]) @@ -327,17 +645,17 @@ func (s *StatefulSetScheduler) removeFromExistingReplicas(state *state, logger * return newPlacements } -func (s *StatefulSetScheduler) addReplicas(state *state, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) addReplicas(states *state.State, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { // Pod affinity algorithm: prefer adding replicas to existing pods before considering other replicas newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) // Add to existing for i := 0; i < len(placements); i++ { podName := placements[i].PodName - ordinal := ordinalFromPodName(podName) + ordinal := state.OrdinalFromPodName(podName) // Is there space in PodName? - f := state.Free(ordinal) + f := states.Free(ordinal) if diff >= 0 && f > 0 { allocation := integer.Int32Min(f, diff) newPlacements = append(newPlacements, duckv1alpha1.Placement{ @@ -346,7 +664,7 @@ func (s *StatefulSetScheduler) addReplicas(state *state, diff int32, placements }) diff -= allocation - state.SetFree(ordinal, f-allocation) + states.SetFree(ordinal, f-allocation) } else { newPlacements = append(newPlacements, placements[i]) } @@ -355,16 +673,16 @@ func (s *StatefulSetScheduler) addReplicas(state *state, diff int32, placements if diff > 0 { // Needs to allocate replicas to additional pods for ordinal := int32(0); ordinal < s.replicas; ordinal++ { - f := state.Free(ordinal) + f := states.Free(ordinal) if f > 0 { allocation := integer.Int32Min(f, diff) newPlacements = append(newPlacements, duckv1alpha1.Placement{ - PodName: podNameFromOrdinal(s.statefulSetName, ordinal), + PodName: state.PodNameFromOrdinal(s.statefulSetName, ordinal), VReplicas: allocation, }) diff -= allocation - state.SetFree(ordinal, f-allocation) + states.SetFree(ordinal, f-allocation) } if diff == 0 { @@ -376,7 +694,7 @@ func (s *StatefulSetScheduler) addReplicas(state *state, diff int32, placements return newPlacements, diff } -func (s *StatefulSetScheduler) addReplicasEvenSpread(state *state, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) addReplicasEvenSpread(state *state.State, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { // Pod affinity MAXFILLUP algorithm prefer adding replicas to existing pods to fill them up before adding to new pods // Pod affinity EVENSPREAD and EVENSPREAD_BYNODE algorithm spread replicas across pods in different regions (zone or node) for HA logger := s.logger.Named("add replicas") @@ -389,12 +707,12 @@ func (s *StatefulSetScheduler) addReplicasEvenSpread(state *state, diff int32, p return newPlacements, diff } -func (s *StatefulSetScheduler) addToExistingReplicas(state *state, logger *zap.SugaredLogger, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) addToExistingReplicas(state *state.State, logger *zap.SugaredLogger, diff int32, placements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { var domainNames []string var placementsByDomain map[string][]int32 newPlacements := make([]duckv1alpha1.Placement, 0, len(placements)) - if state.schedulerPolicy == EVENSPREAD_BYNODE { + if state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { placementsByDomain = s.getPlacementsByNodeKey(state, placements) } else { placementsByDomain = s.getPlacementsByZoneKey(state, placements) @@ -406,7 +724,7 @@ func (s *StatefulSetScheduler) addToExistingReplicas(state *state, logger *zap.S for i := 0; i < len(domainNames); i++ { //iterate through each domain var totalInDomain int32 - if state.schedulerPolicy == EVENSPREAD_BYNODE { + if state.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { totalInDomain = s.getTotalVReplicasInNode(state, placements, domainNames[i]) } else { totalInDomain = s.getTotalVReplicasInZone(state, placements, domainNames[i]) @@ -440,22 +758,22 @@ func (s *StatefulSetScheduler) addToExistingReplicas(state *state, logger *zap.S return newPlacements, diff } -func (s *StatefulSetScheduler) addToNewReplicas(state *state, logger *zap.SugaredLogger, diff int32, newPlacements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) addToNewReplicas(states *state.State, logger *zap.SugaredLogger, diff int32, newPlacements []duckv1alpha1.Placement, evenSpread int32) ([]duckv1alpha1.Placement, int32) { for ordinal := int32(0); ordinal < s.replicas; ordinal++ { - f := state.Free(ordinal) + f := states.Free(ordinal) if f > 0 { //here it is possible to hit pods that are in existing placements - podName := podNameFromOrdinal(s.statefulSetName, ordinal) - zoneName, nodeName, err := s.getPodInfo(state, podName) + podName := state.PodNameFromOrdinal(s.statefulSetName, ordinal) + zoneName, nodeName, err := s.getPodInfo(states, podName) if err != nil { logger.Errorw("Error getting zone and node info from pod", zap.Error(err)) continue //TODO: not continue? } var totalInDomain int32 - if state.schedulerPolicy == EVENSPREAD_BYNODE { - totalInDomain = s.getTotalVReplicasInNode(state, newPlacements, nodeName) + if states.SchedulerPolicy == scheduler.EVENSPREAD_BYNODE { + totalInDomain = s.getTotalVReplicasInNode(states, newPlacements, nodeName) } else { - totalInDomain = s.getTotalVReplicasInZone(state, newPlacements, zoneName) + totalInDomain = s.getTotalVReplicasInZone(states, newPlacements, zoneName) } if totalInDomain >= evenSpread { continue //since current zone that pod belongs to is already at max spread @@ -471,7 +789,7 @@ func (s *StatefulSetScheduler) addToNewReplicas(state *state, logger *zap.Sugare }) diff -= allocation - state.SetFree(ordinal, f-allocation) + states.SetFree(ordinal, f-allocation) } if diff == 0 { @@ -480,39 +798,40 @@ func (s *StatefulSetScheduler) addToNewReplicas(state *state, logger *zap.Sugare } return newPlacements, diff } -func (s *StatefulSetScheduler) getPodInfo(state *state, podName string) (zoneName string, nodeName string, err error) { + +func (s *StatefulSetScheduler) getPodInfo(state *state.State, podName string) (zoneName string, nodeName string, err error) { pod, err := s.podLister.Get(podName) if err != nil { return zoneName, nodeName, err } nodeName = pod.Spec.NodeName - zoneName, ok := state.nodeToZoneMap[nodeName] + zoneName, ok := state.NodeToZoneMap[nodeName] if !ok { return zoneName, nodeName, errors.New("could not find zone") } return zoneName, nodeName, nil } -func (s *StatefulSetScheduler) getPlacementsByZoneKey(state *state, placements []duckv1alpha1.Placement) (placementsByZone map[string][]int32) { +func (s *StatefulSetScheduler) getPlacementsByZoneKey(states *state.State, placements []duckv1alpha1.Placement) (placementsByZone map[string][]int32) { placementsByZone = make(map[string][]int32) for i := 0; i < len(placements); i++ { - zoneName, _, _ := s.getPodInfo(state, placements[i].PodName) - placementsByZone[zoneName] = append(placementsByZone[zoneName], ordinalFromPodName(placements[i].PodName)) + zoneName, _, _ := s.getPodInfo(states, placements[i].PodName) + placementsByZone[zoneName] = append(placementsByZone[zoneName], state.OrdinalFromPodName(placements[i].PodName)) } return placementsByZone } -func (s *StatefulSetScheduler) getPlacementsByNodeKey(state *state, placements []duckv1alpha1.Placement) (placementsByNode map[string][]int32) { +func (s *StatefulSetScheduler) getPlacementsByNodeKey(states *state.State, placements []duckv1alpha1.Placement) (placementsByNode map[string][]int32) { placementsByNode = make(map[string][]int32) for i := 0; i < len(placements); i++ { - _, nodeName, _ := s.getPodInfo(state, placements[i].PodName) - placementsByNode[nodeName] = append(placementsByNode[nodeName], ordinalFromPodName(placements[i].PodName)) + _, nodeName, _ := s.getPodInfo(states, placements[i].PodName) + placementsByNode[nodeName] = append(placementsByNode[nodeName], state.OrdinalFromPodName(placements[i].PodName)) } return placementsByNode } -func (s *StatefulSetScheduler) getTotalVReplicasInZone(state *state, placements []duckv1alpha1.Placement, zoneName string) int32 { +func (s *StatefulSetScheduler) getTotalVReplicasInZone(state *state.State, placements []duckv1alpha1.Placement, zoneName string) int32 { var totalReplicasInZone int32 for i := 0; i < len(placements); i++ { pZone, _, _ := s.getPodInfo(state, placements[i].PodName) @@ -523,7 +842,7 @@ func (s *StatefulSetScheduler) getTotalVReplicasInZone(state *state, placements return totalReplicasInZone } -func (s *StatefulSetScheduler) getTotalVReplicasInNode(state *state, placements []duckv1alpha1.Placement, nodeName string) int32 { +func (s *StatefulSetScheduler) getTotalVReplicasInNode(state *state.State, placements []duckv1alpha1.Placement, nodeName string) int32 { var totalReplicasInNode int32 for i := 0; i < len(placements); i++ { _, pNode, _ := s.getPodInfo(state, placements[i].PodName) @@ -536,7 +855,7 @@ func (s *StatefulSetScheduler) getTotalVReplicasInNode(state *state, placements func (s *StatefulSetScheduler) getPlacementFromPodOrdinal(placements []duckv1alpha1.Placement, ordinal int32) (placement duckv1alpha1.Placement) { for i := 0; i < len(placements); i++ { - if placements[i].PodName == podNameFromOrdinal(s.statefulSetName, ordinal) { + if placements[i].PodName == state.PodNameFromOrdinal(s.statefulSetName, ordinal) { return placements[i] } } diff --git a/pkg/common/scheduler/statefulset/scheduler_test.go b/pkg/common/scheduler/statefulset/scheduler_test.go index 3bdf3202ef..fec692da67 100644 --- a/pkg/common/scheduler/statefulset/scheduler_test.go +++ b/pkg/common/scheduler/statefulset/scheduler_test.go @@ -17,27 +17,20 @@ limitations under the License. package statefulset import ( - "context" "fmt" "reflect" "testing" "time" - appsv1 "k8s.io/api/apps/v1" - autoscalingv1 "k8s.io/api/autoscaling/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - gtesting "k8s.io/client-go/testing" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" - "knative.dev/pkg/controller" - rectesting "knative.dev/pkg/reconciler/testing" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" "knative.dev/eventing-kafka/pkg/common/scheduler" + "knative.dev/eventing-kafka/pkg/common/scheduler/state" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" listers "knative.dev/eventing/pkg/reconciler/testing/v1" ) @@ -58,7 +51,7 @@ func TestStatefulsetScheduler(t *testing.T) { placements []duckv1alpha1.Placement expected []duckv1alpha1.Placement err error - schedulerPolicy SchedulerPolicyType + schedulerPolicy scheduler.SchedulerPolicyType }{ { name: "no replicas, no vreplicas", @@ -145,7 +138,7 @@ func TestStatefulsetScheduler(t *testing.T) { vreplicas: 0, replicas: int32(0), expected: nil, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "no replicas, 1 vreplicas, fail, HA scheduling", @@ -153,14 +146,14 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(0), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{}, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "one replica, one vreplicas, HA scheduling", vreplicas: 1, replicas: int32(1), expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", VReplicas: 1}}, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "one replica, 3 vreplicas, HA scheduling", @@ -168,7 +161,7 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(1), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", VReplicas: 1}}, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "one replica, 15 vreplicas, unschedulable, HA scheduling", @@ -176,7 +169,7 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(1), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", VReplicas: 5}}, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "two replicas, 15 vreplicas, scheduled, HA scheduling", @@ -187,7 +180,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: 5}, {PodName: "statefulset-name-1", VReplicas: 5}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "two replicas, 15 vreplicas, already scheduled, HA scheduling", @@ -201,7 +194,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: 10}, {PodName: "statefulset-name-1", VReplicas: 5}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "three replicas, 30 vreplicas, HA scheduling", @@ -217,7 +210,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 10}, {PodName: "statefulset-name-2", VReplicas: 10}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "three replicas, 15 vreplicas, too much scheduled (scale down), HA scheduling", @@ -233,7 +226,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 5}, {PodName: "statefulset-name-2", VReplicas: 5}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "three replicas, 15 vreplicas, HA scheduling", @@ -244,7 +237,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 5}, {PodName: "statefulset-name-2", VReplicas: 5}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "three replicas, 20 vreplicas, HA scheduling", @@ -255,7 +248,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 7}, {PodName: "statefulset-name-2", VReplicas: 6}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "three replicas, 2 vreplicas, too much scheduled (scale down), HA scheduling", @@ -270,7 +263,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 1}, {PodName: "statefulset-name-2", VReplicas: 1}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "three replicas, 3 vreplicas, too much scheduled (scale down), HA scheduling", @@ -286,7 +279,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 1}, {PodName: "statefulset-name-2", VReplicas: 1}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "four replicas, 7 vreplicas, too much scheduled (scale down), HA scheduling", @@ -303,14 +296,14 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 2}, {PodName: "statefulset-name-2", VReplicas: 3}, }, - schedulerPolicy: EVENSPREAD, + schedulerPolicy: scheduler.EVENSPREAD, }, { name: "no replicas, no vreplicas, HA scheduling by node", vreplicas: 0, replicas: int32(0), expected: nil, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "no replicas, 1 vreplicas, fail, HA scheduling by node", @@ -318,14 +311,14 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(0), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{}, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "one replica, one vreplicas, HA scheduling by node", vreplicas: 1, replicas: int32(1), expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", VReplicas: 1}}, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "one replica, 3 vreplicas, HA scheduling by node", @@ -333,7 +326,7 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(1), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", VReplicas: 1}}, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "one replica, 15 vreplicas, unschedulable, HA scheduling by node", @@ -341,7 +334,7 @@ func TestStatefulsetScheduler(t *testing.T) { replicas: int32(1), err: scheduler.ErrNotEnoughReplicas, expected: []duckv1alpha1.Placement{{PodName: "statefulset-name-0", VReplicas: 3}}, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "five replicas, 15 vreplicas, scheduled, HA scheduling by node", @@ -354,7 +347,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-3", VReplicas: 3}, {PodName: "statefulset-name-4", VReplicas: 3}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "two replicas, 15 vreplicas, already scheduled, HA scheduling by node", @@ -368,7 +361,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: 10}, {PodName: "statefulset-name-1", VReplicas: 5}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "three replicas, 30 vreplicas, HA scheduling by node", @@ -385,7 +378,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 5}, {PodName: "statefulset-name-2", VReplicas: 5}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "three replicas, 6 vreplicas, too much scheduled (scale down), HA scheduling by node", @@ -401,7 +394,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 1}, {PodName: "statefulset-name-2", VReplicas: 5}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "three replicas, 15 vreplicas, HA scheduling by node", @@ -413,7 +406,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-2", VReplicas: 3}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "five replicas, 20 vreplicas, HA scheduling by node", @@ -426,7 +419,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-3", VReplicas: 4}, {PodName: "statefulset-name-4", VReplicas: 4}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "three replicas, 2 vreplicas, too much scheduled (scale down), HA scheduling by node", @@ -441,7 +434,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 1}, {PodName: "statefulset-name-2", VReplicas: 1}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "four replicas, 6 vreplicas, too much scheduled (scale down), HA scheduling by node", @@ -459,7 +452,7 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-2", VReplicas: 2}, {PodName: "statefulset-name-3", VReplicas: 2}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, { name: "three replicas, 7 vreplicas, too much scheduled (scale down), HA scheduling by node", @@ -475,23 +468,23 @@ func TestStatefulsetScheduler(t *testing.T) { {PodName: "statefulset-name-1", VReplicas: 2}, {PodName: "statefulset-name-2", VReplicas: 4}, }, - schedulerPolicy: EVENSPREAD_BYNODE, + schedulerPolicy: scheduler.EVENSPREAD_BYNODE, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ctx, _ := setupFakeContext(t) + ctx, _ := tscheduler.SetupFakeContext(t) nodelist := make([]runtime.Object, 0, numZones) podlist := make([]runtime.Object, 0, tc.replicas) vpodClient := tscheduler.NewVPodClient() - if tc.schedulerPolicy == EVENSPREAD || tc.schedulerPolicy == EVENSPREAD_BYNODE { + if tc.schedulerPolicy == scheduler.EVENSPREAD || tc.schedulerPolicy == scheduler.EVENSPREAD_BYNODE { for i := int32(0); i < numZones; i++ { for j := int32(0); j < numNodes/numZones; j++ { nodeName := "node" + fmt.Sprint((j*((numNodes/numZones)+1))+i) zoneName := "zone" + fmt.Sprint(i) - node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{}) + node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tscheduler.MakeNode(nodeName, zoneName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } @@ -501,7 +494,7 @@ func TestStatefulsetScheduler(t *testing.T) { for i := int32(0); i < tc.replicas; i++ { nodeName := "node" + fmt.Sprint(i) podName := sfsName + "-" + fmt.Sprint(i) - pod, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{}) + pod, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, tscheduler.MakePod(testNs, podName, nodeName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } @@ -509,14 +502,14 @@ func TestStatefulsetScheduler(t *testing.T) { } } - _, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) + _, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, tscheduler.MakeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } lsn := listers.NewListers(nodelist) - sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, lsn.GetNodeLister()) + sa := state.NewStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, lsn.GetNodeLister()) lsp := listers.NewListers(podlist) - s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler) + s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs), &SchedulerPolicy{}).(*StatefulSetScheduler) // Give some time for the informer to notify the scheduler and set the number of replicas time.Sleep(200 * time.Millisecond) @@ -547,130 +540,3 @@ func TestStatefulsetScheduler(t *testing.T) { }) } } - -func makeStatefulset(ns, name string, replicas int32) *appsv1.StatefulSet { - obj := &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: ns, - }, - Spec: appsv1.StatefulSetSpec{ - Replicas: &replicas, - }, - Status: appsv1.StatefulSetStatus{ - Replicas: replicas, - }, - } - - return obj -} - -func makeNode(name, zonename string) *corev1.Node { - obj := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{ - ZoneLabel: zonename, - }, - }, - } - return obj -} - -func makeNodeNoLabel(name string) *corev1.Node { - obj := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - } - return obj -} - -func makePod(ns, name, nodename string) *corev1.Pod { - obj := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: ns, - }, - Spec: corev1.PodSpec{ - NodeName: nodename, - }, - } - return obj -} - -func setupFakeContext(t *testing.T) (context.Context, context.CancelFunc) { - ctx, cancel, informers := rectesting.SetupFakeContextWithCancel(t) - err := controller.StartInformers(ctx.Done(), informers...) - if err != nil { - t.Fatal("unexpected error", err) - } - - kc := kubeclient.Get(ctx) - kc.PrependReactor("create", "statefulsets", func(action gtesting.Action) (handled bool, ret runtime.Object, err error) { - createAction := action.(gtesting.CreateActionImpl) - sfs := createAction.GetObject().(*appsv1.StatefulSet) - scale := &autoscalingv1.Scale{ - ObjectMeta: metav1.ObjectMeta{ - Name: sfs.Name, - Namespace: sfs.Namespace, - }, - Spec: autoscalingv1.ScaleSpec{ - Replicas: func() int32 { - if sfs.Spec.Replicas == nil { - return 1 - } - return *sfs.Spec.Replicas - }(), - }, - } - kc.Tracker().Add(scale) - return false, nil, nil - }) - - kc.PrependReactor("get", "statefulsets", func(action gtesting.Action) (handled bool, ret runtime.Object, err error) { - getAction := action.(gtesting.GetAction) - if action.GetSubresource() == "scale" { - scale, err := kc.Tracker().Get(autoscalingv1.SchemeGroupVersion.WithResource("scales"), getAction.GetNamespace(), getAction.GetName()) - return true, scale, err - - } - return false, nil, nil - }) - - kc.PrependReactor("update", "statefulsets", func(action gtesting.Action) (handled bool, ret runtime.Object, err error) { - updateAction := action.(gtesting.UpdateActionImpl) - if action.GetSubresource() == "scale" { - scale := updateAction.GetObject().(*autoscalingv1.Scale) - - err := kc.Tracker().Update(autoscalingv1.SchemeGroupVersion.WithResource("scales"), scale, scale.GetNamespace()) - if err != nil { - return true, nil, err - } - - meta, err := meta.Accessor(updateAction.GetObject()) - if err != nil { - return true, nil, err - } - - obj, err := kc.Tracker().Get(appsv1.SchemeGroupVersion.WithResource("statefulsets"), meta.GetNamespace(), meta.GetName()) - if err != nil { - return true, nil, err - } - - sfs := obj.(*appsv1.StatefulSet) - sfs.Spec.Replicas = &scale.Spec.Replicas - - err = kc.Tracker().Update(appsv1.SchemeGroupVersion.WithResource("statefulsets"), sfs, sfs.GetNamespace()) - if err != nil { - return true, nil, err - } - - return true, scale, nil - - } - return false, nil, nil - }) - - return ctx, cancel -} diff --git a/pkg/common/scheduler/statefulset/scheduler_types.go b/pkg/common/scheduler/statefulset/scheduler_types.go new file mode 100644 index 0000000000..8300d4ba6a --- /dev/null +++ b/pkg/common/scheduler/statefulset/scheduler_types.go @@ -0,0 +1,49 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statefulset + +const ( + // MaxTotalWeight is the maximum total weight. + MaxTotalWeight int64 = 100 +) + +// Policy describes a struct of a policy resource. +type SchedulerPolicy struct { + // Holds the information to configure the fit predicate functions. + Predicates []PredicatePolicy + // Holds the information to configure the priority functions. + Priorities []PriorityPolicy +} + +// PredicatePolicy describes a struct of a predicate policy. +type PredicatePolicy struct { + // Identifier of the predicate policy + Name string + // Holds the parameters to configure the given predicate + //Argument *PredicateArgument +} + +// PriorityPolicy describes a struct of a priority policy. +type PriorityPolicy struct { + // Identifier of the priority policy + Name string + // The numeric multiplier for the pod scores that the priority function generates + // The weight should be a positive integer + Weight int64 + // Holds the parameters to configure the given priority function + //Argument *PriorityArgument +} diff --git a/pkg/common/scheduler/testing/vpod.go b/pkg/common/scheduler/testing/vpod.go index c8a33fa8c1..bc2c65590c 100644 --- a/pkg/common/scheduler/testing/vpod.go +++ b/pkg/common/scheduler/testing/vpod.go @@ -17,8 +17,25 @@ limitations under the License. package testing import ( + "context" + "testing" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" + "knative.dev/eventing-kafka/pkg/common/scheduler" + "knative.dev/pkg/controller" + + appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + gtesting "k8s.io/client-go/testing" + + kubeclient "knative.dev/pkg/client/injection/kube/client/fake" + _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" + rectesting "knative.dev/pkg/reconciler/testing" ) type sampleVPod struct { @@ -49,3 +66,130 @@ func (d *sampleVPod) GetVReplicas() int32 { func (d *sampleVPod) GetPlacements() []duckv1alpha1.Placement { return d.placements } + +func MakeNode(name, zonename string) *v1.Node { + obj := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + scheduler.ZoneLabel: zonename, + }, + }, + } + return obj +} + +func MakeNodeNoLabel(name string) *v1.Node { + obj := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + return obj +} + +func MakeStatefulset(ns, name string, replicas int32) *appsv1.StatefulSet { + obj := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + Replicas: replicas, + }, + } + + return obj +} + +func MakePod(ns, name, nodename string) *v1.Pod { + obj := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: v1.PodSpec{ + NodeName: nodename, + }, + } + return obj +} + +func SetupFakeContext(t *testing.T) (context.Context, context.CancelFunc) { + ctx, cancel, informers := rectesting.SetupFakeContextWithCancel(t) + err := controller.StartInformers(ctx.Done(), informers...) + if err != nil { + t.Fatal("unexpected error", err) + } + + kc := kubeclient.Get(ctx) + kc.PrependReactor("create", "statefulsets", func(action gtesting.Action) (handled bool, ret runtime.Object, err error) { + createAction := action.(gtesting.CreateActionImpl) + sfs := createAction.GetObject().(*appsv1.StatefulSet) + scale := &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: sfs.Name, + Namespace: sfs.Namespace, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: func() int32 { + if sfs.Spec.Replicas == nil { + return 1 + } + return *sfs.Spec.Replicas + }(), + }, + } + kc.Tracker().Add(scale) + return false, nil, nil + }) + + kc.PrependReactor("get", "statefulsets", func(action gtesting.Action) (handled bool, ret runtime.Object, err error) { + getAction := action.(gtesting.GetAction) + if action.GetSubresource() == "scale" { + scale, err := kc.Tracker().Get(autoscalingv1.SchemeGroupVersion.WithResource("scales"), getAction.GetNamespace(), getAction.GetName()) + return true, scale, err + + } + return false, nil, nil + }) + + kc.PrependReactor("update", "statefulsets", func(action gtesting.Action) (handled bool, ret runtime.Object, err error) { + updateAction := action.(gtesting.UpdateActionImpl) + if action.GetSubresource() == "scale" { + scale := updateAction.GetObject().(*autoscalingv1.Scale) + + err := kc.Tracker().Update(autoscalingv1.SchemeGroupVersion.WithResource("scales"), scale, scale.GetNamespace()) + if err != nil { + return true, nil, err + } + + meta, err := meta.Accessor(updateAction.GetObject()) + if err != nil { + return true, nil, err + } + + obj, err := kc.Tracker().Get(appsv1.SchemeGroupVersion.WithResource("statefulsets"), meta.GetNamespace(), meta.GetName()) + if err != nil { + return true, nil, err + } + + sfs := obj.(*appsv1.StatefulSet) + sfs.Spec.Replicas = &scale.Spec.Replicas + + err = kc.Tracker().Update(appsv1.SchemeGroupVersion.WithResource("statefulsets"), sfs, sfs.GetNamespace()) + if err != nil { + return true, nil, err + } + + return true, scale, nil + + } + return false, nil, nil + }) + + return ctx, cancel +} diff --git a/pkg/source/reconciler/mtsource/controller.go b/pkg/source/reconciler/mtsource/controller.go index 68f1705654..25ffe52d01 100644 --- a/pkg/source/reconciler/mtsource/controller.go +++ b/pkg/source/reconciler/mtsource/controller.go @@ -47,9 +47,10 @@ import ( ) type envConfig struct { - SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"` - PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"` - SchedulerPolicy stsscheduler.SchedulerPolicyType `envconfig:"SCHEDULER_POLICY_TYPE" required:"true"` + SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"` + PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"` + SchedulerPolicyType scheduler.SchedulerPolicyType `envconfig:"SCHEDULER_POLICY_TYPE" required:"true"` + SchedulerPolicyConfigMap string `envconfig:"CONFIG_SCHEDULER" required:"true"` } func NewController( @@ -119,15 +120,19 @@ func NewController( patched, err := kafkaclient.Get(ctx).SourcesV1beta1().KafkaSources(key.Namespace).Patch(ctx, key.Name, types.JSONPatchType, patch, metav1.PatchOptions{}, "status") if err != nil { - return fmt.Errorf("Failed patching: %w", err) + return fmt.Errorf("failed patching: %w", err) } logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched)) return nil } - c.scheduler = stsscheduler.NewScheduler(ctx, - system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy, - nodeInformer.Lister(), evictor) + policy := &stsscheduler.SchedulerPolicy{} + if err := initPolicyFromConfigMap(ctx, env.SchedulerPolicyConfigMap, policy); err != nil { + return nil + } + + logging.FromContext(ctx).Debugw("Scheduler Policy Config Map read", zap.Any("policy", policy)) + c.scheduler = stsscheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicyType, nodeInformer.Lister(), evictor, policy) logging.FromContext(ctx).Info("Setting up kafka event handlers") kafkaInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) diff --git a/pkg/source/reconciler/mtsource/kafkasource.go b/pkg/source/reconciler/mtsource/kafkasource.go index f80ff386c7..3f3ae01172 100644 --- a/pkg/source/reconciler/mtsource/kafkasource.go +++ b/pkg/source/reconciler/mtsource/kafkasource.go @@ -18,25 +18,30 @@ package mtsource import ( "context" + "encoding/json" "errors" "fmt" "strings" "github.com/Shopify/sarama" "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "knative.dev/eventing/pkg/reconciler/source" duckv1 "knative.dev/pkg/apis/duck/v1" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" + "knative.dev/pkg/system" "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1" "knative.dev/eventing-kafka/pkg/client/clientset/versioned" reconcilerkafkasource "knative.dev/eventing-kafka/pkg/client/injection/reconciler/sources/v1beta1/kafkasource" listers "knative.dev/eventing-kafka/pkg/client/listers/sources/v1beta1" "knative.dev/eventing-kafka/pkg/common/scheduler" + sts "knative.dev/eventing-kafka/pkg/common/scheduler/statefulset" "knative.dev/eventing-kafka/pkg/source/client" ) @@ -183,3 +188,31 @@ func (r *Reconciler) createCloudEventAttributes(src *v1beta1.KafkaSource) []duck } return ceAttributes } + +// initPolicyFromConfigMap reads predicates and priorities data from configMap +func initPolicyFromConfigMap(ctx context.Context, configMapName string, policy *sts.SchedulerPolicy) error { + policyConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(context.TODO(), configMapName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("couldn't get scheduler policy config map %s/%s: %v", system.Namespace(), configMapName, err) + } + + preds, found := policyConfigMap.Data["predicates"] + if !found { + return fmt.Errorf("missing policy config map value at key predicates") + } + if err := json.NewDecoder(strings.NewReader(preds)).Decode(&policy.Predicates); err != nil { + return fmt.Errorf("invalid policy: %v", err) + } + logging.FromContext(ctx).Infof("Predicates to be registered: %v", policy.Predicates) + + priors, found := policyConfigMap.Data["priorities"] + if !found { + return fmt.Errorf("missing policy config map value at key priorities") + } + if err := json.NewDecoder(strings.NewReader(priors)).Decode(&policy.Priorities); err != nil { + return fmt.Errorf("invalid policy: %v", err) + } + logging.FromContext(ctx).Infof("Priorities to be registered: %v", policy.Priorities) + + return nil +}