Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Even spread for HA implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Ansu Varghese <avarghese@us.ibm.com>
  • Loading branch information
aavarghese committed Apr 29, 2021
1 parent 77c511a commit 2acb50a
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 68 deletions.
22 changes: 21 additions & 1 deletion config/source/multi/deployments/adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ metadata:
labels:
kafka.eventing.knative.dev/release: devel
spec:
replicas: 1
replicas: 3
selector:
matchLabels: &labels
control-plane: kafkasource-mt-adapter
Expand All @@ -42,6 +42,11 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName


# The maximum number of messages per second, per vreplica
- name: VREPLICA_LIMITS_MPS
Expand Down Expand Up @@ -80,3 +85,18 @@ spec:
containerPort: 8008

terminationGracePeriodSeconds: 10
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
control-plane: kafkasource-mt-adapter
topologyKey: kubernetes.io/hostname
weight: 50
- podAffinityTerm:
labelSelector:
matchLabels:
control-plane: kafkasource-mt-adapter
topologyKey: topology.kubernetes.io/zone
weight: 50
10 changes: 9 additions & 1 deletion config/source/multi/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,20 @@ spec:

# How often (in seconds) the autoscaler tries to scale down the statefulset.
- name: AUTOSCALER_REFRESH_PERIOD
value: '10'
value: '100'

# The number of virtual replicas this pod can handle.
- name: POD_CAPACITY
value: '100'

# The number of zones in a multi-zone cluster, for spreading vreplica placement with HA
- name: SCHEDULE_SPREAD_ZONES
value: '3'

# The scheduling policy type for placing vreplicas on pods (a boolean value temporarily set to true for even spread across zones)
- name: SCHEDULE_POLICY_TYPE
value: 'true'

resources:
requests:
cpu: 20m
Expand Down
1 change: 1 addition & 0 deletions config/source/multi/roles/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ rules:
- events
- configmaps
- secrets
- nodes
verbs: *everything

# let the webhook label the appropriate namespace
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/duck/v1alpha1/placement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Placement struct {
// PodName is the name of the pod where the resource is placed
PodName string `json:"podName,omitempty"`

// ZoneName is the name of the zone where the pod is located
ZoneName string `json:"zoneName,omitempty"`

// VReplicas is the number of virtual replicas assigned to in the pod
VReplicas int32 `json:"vreplicas,omitempty"`
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/common/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen
// The number of replicas may be lower than the last ordinal, for instance
// when the statefulset is manually scaled down. In that case, replicas above
// scale.Spec.Replicas have not been considered when scheduling vreplicas.
// Adjust accordingly
pending -= state.freeCapacity()
// Adjust accordingly (applicable only for maxFillUp scheduling policy and not for HA)
if !state.schedulePolicy {
pending -= state.freeCapacity()
}

// Still need more?
if pending > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestAutoscaler(t *testing.T) {
ctx, _ := setupFakeContext(t)

vpodClient := tscheduler.NewVPodClient()
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10)
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10, false)

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, tc.replicas), metav1.CreateOptions{})
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
})

vpodClient := tscheduler.NewVPodClient()
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10)
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10, false)

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, 10), metav1.CreateOptions{})
Expand Down
Loading

0 comments on commit 2acb50a

Please sign in to comment.