Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

[scheduler, mtsource] Initial HA support for MT Kafka source #587

Merged
merged 6 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/source/multi/500-controller-service.yaml

This file was deleted.

15 changes: 15 additions & 0 deletions config/source/multi/deployments/adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,18 @@ spec:
containerPort: 8008

terminationGracePeriodSeconds: 10
affinity:
aavarghese marked this conversation as resolved.
Show resolved Hide resolved
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
control-plane: kafkasource-mt-adapter
topologyKey: kubernetes.io/hostname
weight: 50
- podAffinityTerm:
labelSelector:
matchLabels:
control-plane: kafkasource-mt-adapter
topologyKey: topology.kubernetes.io/zone
weight: 50
6 changes: 5 additions & 1 deletion config/source/multi/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ spec:

# How often (in seconds) the autoscaler tries to scale down the statefulset.
- name: AUTOSCALER_REFRESH_PERIOD
value: '10'
value: '100'

# The number of virtual replicas this pod can handle.
- name: POD_CAPACITY
value: '100'

# The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list)
- name: SCHEDULER_POLICY_TYPE
value: 'MAXFILLUP'

resources:
requests:
cpu: 20m
Expand Down
1 change: 1 addition & 0 deletions config/source/multi/roles/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ rules:
- events
- configmaps
- secrets
- nodes
verbs: *everything

# let the webhook label the appropriate namespace
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/duck/v1alpha1/placement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Placement struct {
// PodName is the name of the pod where the resource is placed
PodName string `json:"podName,omitempty"`

// ZoneName is the name of the zone where the pod is located
ZoneName string `json:"zoneName,omitempty"`

// VReplicas is the number of virtual replicas assigned to in the pod
VReplicas int32 `json:"vreplicas,omitempty"`
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/common/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen
// The number of replicas may be lower than the last ordinal, for instance
// when the statefulset is manually scaled down. In that case, replicas above
// scale.Spec.Replicas have not been considered when scheduling vreplicas.
// Adjust accordingly
pending -= state.freeCapacity()
// Adjust accordingly (applicable only for MAXFILLUP scheduling policy and not for HA)
if state.schedulerPolicy != EVENSPREAD {
pending -= state.freeCapacity()
}

// Still need more?
if pending > 0 {
Expand Down
49 changes: 38 additions & 11 deletions pkg/common/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
gtesting "k8s.io/client-go/testing"

listers "knative.dev/eventing/pkg/reconciler/testing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
"knative.dev/pkg/logging"

duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1"
"knative.dev/eventing-kafka/pkg/common/scheduler"
Expand All @@ -39,12 +39,13 @@ const (

func TestAutoscaler(t *testing.T) {
testCases := []struct {
name string
replicas int32
vpods []scheduler.VPod
pendings int32
scaleDown bool
wantReplicas int32
name string
replicas int32
vpods []scheduler.VPod
pendings int32
scaleDown bool
wantReplicas int32
schedulerPolicy SchedulerPolicyType
}{
{
name: "no replicas, no placements, no pending",
Expand Down Expand Up @@ -181,17 +182,42 @@ func TestAutoscaler(t *testing.T) {
pendings: int32(8),
wantReplicas: int32(3),
},
{
name: "no replicas, with placements, with pending, enough capacity",
replicas: int32(0),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{
{PodName: "pod-0", VReplicas: int32(8)},
{PodName: "pod-1", VReplicas: int32(7)}}),
},
pendings: int32(3),
wantReplicas: int32(3),
schedulerPolicy: EVENSPREAD,
},
{
name: "with replicas, with placements, with pending, enough capacity",
replicas: int32(2),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{
{PodName: "pod-0", VReplicas: int32(8)},
{PodName: "pod-1", VReplicas: int32(7)}}),
},
pendings: int32(3),
wantReplicas: int32(3),
schedulerPolicy: EVENSPREAD,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, _ := setupFakeContext(t)

vpodClient := tscheduler.NewVPodClient()
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10)
ls := listers.NewListers(nil)
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister())

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, tc.replicas), metav1.CreateOptions{})
_, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down Expand Up @@ -231,10 +257,11 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
})

vpodClient := tscheduler.NewVPodClient()
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10)
ls := listers.NewListers(nil)
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, ls.GetNodeLister())

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, 10), metav1.CreateOptions{})
_, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
Loading