Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Next generation MT Scheduler and Descheduler with pluggable policies #768

Merged
merged 11 commits into from
Sep 8, 2021
61 changes: 61 additions & 0 deletions config/source/common/configmaps/config-descheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2021 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-source-descheduler
namespace: knative-eventing
labels:
kafka.eventing.knative.dev/release: devel
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################

# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.
#
# These predicate plugins are used to filter out pods that
# a vreplica cannot be removed from due to not satisfying the
# predicate condition. For each pod, the descheduler
# will call filter plugins in their configured order.
# predicates: |+
[]
#
# These priority plugins are used to score each pod that
# has passed the filtering phase. The scheduler will then
# select the pod with the highest weighted score sum to
# remove vreplica from. If two pods have an equal score,
# a pod is chosen randomly.
# priorities: |+
[
{"Name": "RemoveWithEvenPodSpreadPriority",
"Weight": 10,
"Args": "{\"MaxSkew\": 2}"},
{"Name": "RemoveWithAvailabilityZonePriority",
"Weight": 10,
"Args": "{\"MaxSkew\": 2}"},
{"Name": "RemoveWithHighestOrdinalPriority",
"Weight": 2}
]
63 changes: 63 additions & 0 deletions config/source/common/configmaps/config-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2021 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-source-scheduler
namespace: knative-eventing
labels:
kafka.eventing.knative.dev/release: devel
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################

# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.
#
# These predicate plugins are used to filter out pods that
# a vreplica cannot be placed on due to not satisfying the
# predicate condition. For each pod, the scheduler
# will call filter plugins in their configured order.
# predicates: |+
[
{"Name": "PodFitsResources"},
{"Name": "NoMaxResourceCount",
"Args": "{\"NumPartitions\": 100}"},
{"Name": "EvenPodSpread",
"Args": "{\"MaxSkew\": 2}"}
]
#
# These priority plugins are used to score each pod that
# has passed the filtering phase. The scheduler will then
# select the pod with the highest weighted score sum. If
# two pods have an equal score, a pod is chosen randomly.
# priorities: |+
[
{"Name": "AvailabilityZonePriority",
"Weight": 10,
"Args": "{\"MaxSkew\": 2}"},
{"Name": "LowestOrdinalPriority",
"Weight": 2}
]
1 change: 1 addition & 0 deletions config/source/multi/400-config-descheduler.yaml
1 change: 1 addition & 0 deletions config/source/multi/400-config-scheduler.yaml
4 changes: 2 additions & 2 deletions config/source/multi/deployments/adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ metadata:
labels:
kafka.eventing.knative.dev/release: devel
spec:
replicas: 1
replicas: 0
selector:
matchLabels: &labels
control-plane: kafkasource-mt-adapter
Expand Down Expand Up @@ -78,7 +78,7 @@ spec:
- name: profiling
containerPort: 8008

terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 10 # has to be set to 0 for forceful termination and restart for recovery
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
Expand Down
7 changes: 7 additions & 0 deletions config/source/multi/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,17 @@ spec:

- name: VREPLICA_LIMITS_MPS
value: '250'

# The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list)
- name: SCHEDULER_POLICY_TYPE
value: 'MAXFILLUP'

- name: SCHEDULER_CONFIG
value: ''

- name: DESCHEDULER_CONFIG
value: ''

resources:
requests:
cpu: 20m
Expand Down
3 changes: 0 additions & 3 deletions pkg/apis/duck/v1alpha1/placement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type Placement struct {
// PodName is the name of the pod where the resource is placed
PodName string `json:"podName,omitempty"`

// ZoneName is the name of the zone where the pod is located
ZoneName string `json:"zoneName,omitempty"`

// VReplicas is the number of virtual replicas assigned to in the pod
VReplicas int32 `json:"vreplicas,omitempty"`
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/sources/v1beta1/kafka_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (k *KafkaSource) GetPlacements() []v1alpha1.Placement {
}
return k.Status.Placeable.Placement
}

func (k *KafkaSource) GetResourceVersion() string {
return k.ObjectMeta.ResourceVersion
}
27 changes: 17 additions & 10 deletions pkg/apis/sources/v1beta1/kafka_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,25 @@ import (

func TestScheduling(t *testing.T) {
testCases := map[string]struct {
source KafkaSource
key types.NamespacedName
vreplicas int32
placements []v1alpha1.Placement
source KafkaSource
key types.NamespacedName
vreplicas int32
placements []v1alpha1.Placement
rsrcversion string
}{
"all empty": {
source: KafkaSource{},
key: types.NamespacedName{},
vreplicas: int32(1),
placements: nil,
source: KafkaSource{},
key: types.NamespacedName{},
vreplicas: int32(1),
placements: nil,
rsrcversion: "",
},
"all full": {
source: KafkaSource{
ObjectMeta: metav1.ObjectMeta{
Name: "asource",
Namespace: "anamespace",
Name: "asource",
Namespace: "anamespace",
ResourceVersion: "12345",
},
Spec: KafkaSourceSpec{
Consumers: pointer.Int32Ptr(4),
Expand All @@ -62,6 +65,7 @@ func TestScheduling(t *testing.T) {
placements: []v1alpha1.Placement{
{PodName: "apod", VReplicas: 4},
},
rsrcversion: "12345",
},
}

Expand All @@ -76,6 +80,9 @@ func TestScheduling(t *testing.T) {
if tc.source.GetVReplicas() != tc.vreplicas {
t.Errorf("unexpected vreplicas (want %d, got %d)", tc.vreplicas, tc.source.GetVReplicas())
}
if tc.source.GetResourceVersion() != tc.rsrcversion {
t.Errorf("unexpected resource version (want %v, got %v)", tc.rsrcversion, tc.source.GetResourceVersion())
}
if !reflect.DeepEqual(tc.source.GetPlacements(), tc.placements) {
t.Errorf("unexpected placements (want %v, got %v)", tc.placements, tc.source.GetPlacements())
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ const (

// KafkaTopicConfigRetentionMs is the key in the Sarama TopicDetail ConfigEntries map for retention time (in ms)
KafkaTopicConfigRetentionMs = "retention.ms"

// PodAnnotationKey is an annotation used by the scheduler to be informed of pods
// being evicted and not use it for placing vreplicas
PodAnnotationKey = "eventing.knative.dev/unschedulable"
)
88 changes: 88 additions & 0 deletions pkg/common/scheduler/factory/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2021 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package factory

import (
"fmt"

state "knative.dev/eventing-kafka/pkg/common/scheduler/state"
)

// RegistryFP is a collection of all available filter plugins.
type RegistryFP map[string]state.FilterPlugin

// RegistrySP is a collection of all available scoring plugins.
type RegistrySP map[string]state.ScorePlugin

var (
FilterRegistry = make(RegistryFP)
ScoreRegistry = make(RegistrySP)
)

// Register adds a new plugin to the registry. If a plugin with the same name
// exists, it returns an error.
func RegisterFP(name string, factory state.FilterPlugin) error {
if _, ok := FilterRegistry[name]; ok {
return fmt.Errorf("a filter plugin named %v already exists", name)
}
FilterRegistry[name] = factory
return nil
}

// Unregister removes an existing plugin from the registry. If no plugin with
// the provided name exists, it returns an error.
func UnregisterFP(name string) error {
if _, ok := FilterRegistry[name]; !ok {
return fmt.Errorf("no filter plugin named %v exists", name)
}
delete(FilterRegistry, name)
return nil
}

func GetFilterPlugin(name string) (state.FilterPlugin, error) {
if f, exist := FilterRegistry[name]; exist {
return f, nil
}
return nil, fmt.Errorf("no fitler plugin named %v exists", name)
}

// Register adds a new plugin to the registry. If a plugin with the same name
// exists, it returns an error.
func RegisterSP(name string, factory state.ScorePlugin) error {
if _, ok := ScoreRegistry[name]; ok {
return fmt.Errorf("a score plugin named %v already exists", name)
}
ScoreRegistry[name] = factory
return nil
}

// Unregister removes an existing plugin from the registry. If no plugin with
// the provided name exists, it returns an error.
func UnregisterSP(name string) error {
if _, ok := ScoreRegistry[name]; !ok {
return fmt.Errorf("no score plugin named %v exists", name)
}
delete(ScoreRegistry, name)
return nil
}

func GetScorePlugin(name string) (state.ScorePlugin, error) {
if f, exist := ScoreRegistry[name]; exist {
return f, nil
}
return nil, fmt.Errorf("no score plugin named %v exists", name)
}
Loading