Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topologySpreadConstraints configuration to pod spec. #2530

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def compare_config():

pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]

k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)

Expand All @@ -577,7 +577,7 @@ def compare_config():

self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar",
"The replication slot cannot be updated", 10, 5)

# make sure slot from Patroni didn't get deleted
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1,
"The replication slot from Patroni gets deleted", 10, 5)
Expand Down Expand Up @@ -933,7 +933,7 @@ def test_ignored_annotations(self):
},
}
}

old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
old_svc_creation_timestamp = svc.metadata.creation_timestamp
Expand Down Expand Up @@ -1370,7 +1370,7 @@ def test_persistent_volume_claim_retention_policy(self):
}
k8s.update_config(patch_scaled_policy_retain)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# decrease the number of instances
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', pg_patch_scale_down_instances)
Expand Down Expand Up @@ -1647,7 +1647,6 @@ def test_node_readiness_label(self):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)


@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_overwrite_pooler_deployment(self):
pooler_name = 'acid-minimal-cluster-pooler'
Expand Down Expand Up @@ -1796,7 +1795,7 @@ def test_password_rotation(self):
},
}
k8s.api.core_v1.patch_namespaced_secret(
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do",
namespace="default",
body=secret_fake_rotation)

Expand All @@ -1812,7 +1811,7 @@ def test_password_rotation(self):
"data": {
"enable_password_rotation": "true",
"password_rotation_interval": "30",
"password_rotation_user_retention": "30", # should be set to 60
"password_rotation_user_retention": "30", # should be set to 60
},
}
k8s.update_config(enable_password_rotation)
Expand Down Expand Up @@ -1865,7 +1864,7 @@ def test_password_rotation(self):
"Unexpected username in secret of test.db_user: expected {}, got {}".format("test.db_user", secret_username))

# disable password rotation for all other users (foo_user)
# and pick smaller intervals to see if the third fake rotation user is dropped
# and pick smaller intervals to see if the third fake rotation user is dropped
enable_password_rotation = {
"data": {
"enable_password_rotation": "false",
Expand Down Expand Up @@ -2363,6 +2362,56 @@ def test_taint_based_eviction(self):
# toggle pod anti affinity to move replica away from master node
self.assert_distributed_pods(master_nodes)

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_topology_spread_constraints(self):
'''
Enable topologySpreadConstraints for pods
'''
k8s = self.k8s
cluster_labels = "application=spilo,cluster-name=acid-minimal-cluster"

# Verify we are in good state from potential previous tests
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")

master_nodes, replica_nodes = k8s.get_cluster_nodes()
self.assertNotEqual(master_nodes, [])
self.assertNotEqual(replica_nodes, [])

# Patch label to nodes for topologySpreadConstraints
patch_node_label = {
"metadata": {
"labels": {
"topology.kubernetes.io/zone": "zalando"
}
}
}
k8s.api.core_v1.patch_node(master_nodes[0], patch_node_label)
k8s.api.core_v1.patch_node(replica_nodes[0], patch_node_label)

# Scale-out postgresql pods
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
{"spec": {"numberOfInstances": 6}})
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_labels), 6, "Postgresql StatefulSet are scale to 6")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 6, "All pods are running")

worker_node_1 = 0
worker_node_2 = 0
pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_labels)
for pod in pods.items:
if pod.spec.node_name == 'postgres-operator-e2e-tests-worker':
worker_node_1 += 1
elif pod.spec.node_name == 'postgres-operator-e2e-tests-worker2':
worker_node_2 += 1

self.assertEqual(worker_node_1, worker_node_2)
self.assertEqual(worker_node_1, 3)
self.assertEqual(worker_node_2, 3)

# Scale-it postgresql pods to previous replicas
k8s.api.custom_objects_api.patch_namespaced_custom_object("acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster",
{"spec": {"numberOfInstances": 2}})

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_zz_cluster_deletion(self):
'''
Expand Down Expand Up @@ -2438,7 +2487,7 @@ def test_zz_cluster_deletion(self):
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 6, "PVCs were deleted although disabled in config")

except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
Expand Down Expand Up @@ -2480,7 +2529,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci

# if nodes are different we can quit here
if master_nodes[0] not in replica_nodes:
return True
return True

# enable pod anti affintiy in config map which should trigger movement of replica
patch_enable_antiaffinity = {
Expand All @@ -2504,7 +2553,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
}
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

k8s.wait_for_pod_start('spilo-role=replica,' + cluster_labels)
k8s.wait_for_running_pods(cluster_labels, 2)

Expand All @@ -2515,7 +2564,7 @@ def assert_distributed_pods(self, target_nodes, cluster_labels='cluster-name=aci
# if nodes are different we can quit here
for target_node in target_nodes:
if (target_node not in master_nodes or target_node not in replica_nodes) and master_nodes[0] in replica_nodes:
print('Pods run on the same node')
print('Pods run on the same node')
return False

except timeout_decorator.TimeoutError:
Expand Down
6 changes: 6 additions & 0 deletions manifests/postgresql.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,12 @@ spec:
- PreferNoSchedule
tolerationSeconds:
type: integer
topologySpreadConstraints:
type: array
nullable: true
items:
type: object
x-kubernetes-preserve-unknown-fields: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not add all the fields of a topologySpreadConstraint like with what we have for nodeAffinity. I feel, it's too lazy and unsafe to allow arbitrary fields with x-kubernetes-preserve-unknown-fields: true

useLoadBalancer:
type: boolean
description: deprecated
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/acid.zalan.do/v1/crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,16 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
},
},
},
"topologySpreadConstraints": {
Type: "array",
Nullable: true,
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
XPreserveUnknownFields: util.True(),
},
},
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here with the XPreserveUnknownFields. Yes there are other fields where are doing it like this, but lets get it right for new additions. I know, it's tedious to reflect the full schema because we don use a framework like kubeBulder. But it should be the trade-off for contributors when they go the "easy way" with allowing full specs in our manifest over custom stripped-down designs better suitable for end users.

"useLoadBalancer": {
Type: "boolean",
Description: "deprecated",
Expand Down
37 changes: 19 additions & 18 deletions pkg/apis/acid.zalan.do/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,25 @@ type PostgresSpec struct {
UsersWithSecretRotation []string `json:"usersWithSecretRotation,omitempty"`
UsersWithInPlaceSecretRotation []string `json:"usersWithInPlaceSecretRotation,omitempty"`

NumberOfInstances int32 `json:"numberOfInstances"`
MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"`
Clone *CloneDescription `json:"clone,omitempty"`
Databases map[string]string `json:"databases,omitempty"`
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
SchedulerName *string `json:"schedulerName,omitempty"`
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
Sidecars []Sidecar `json:"sidecars,omitempty"`
InitContainers []v1.Container `json:"initContainers,omitempty"`
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
ShmVolume *bool `json:"enableShmVolume,omitempty"`
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"`
StandbyCluster *StandbyDescription `json:"standby,omitempty"`
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
NumberOfInstances int32 `json:"numberOfInstances"`
MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"`
Clone *CloneDescription `json:"clone,omitempty"`
Databases map[string]string `json:"databases,omitempty"`
PreparedDatabases map[string]PreparedDatabase `json:"preparedDatabases,omitempty"`
SchedulerName *string `json:"schedulerName,omitempty"`
NodeAffinity *v1.NodeAffinity `json:"nodeAffinity,omitempty"`
TopologySpreadConstraints []v1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
Sidecars []Sidecar `json:"sidecars,omitempty"`
InitContainers []v1.Container `json:"initContainers,omitempty"`
PodPriorityClassName string `json:"podPriorityClassName,omitempty"`
ShmVolume *bool `json:"enableShmVolume,omitempty"`
EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"`
LogicalBackupRetention string `json:"logicalBackupRetention,omitempty"`
LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"`
StandbyCluster *StandbyDescription `json:"standby,omitempty"`
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
// MasterServiceAnnotations takes precedence over ServiceAnnotations for master role if not empty
MasterServiceAnnotations map[string]string `json:"masterServiceAnnotations,omitempty"`
// ReplicaServiceAnnotations takes precedence over ServiceAnnotations for replica role if not empty
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod affinity does not match the current one")
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.TopologySpreadConstraints, statefulSet.Spec.Template.Spec.TopologySpreadConstraints) {
needsReplace = true
needsRollUpdate = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this really need to trigger a rolling update of pods executed by operator? Will not K8s take care of it then once the statefulset is replaced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm good point. Maybe we can leave as is for now. With rolling update we make sure pods immediately adhere the new constraints.

reasons = append(reasons, "new statefulset's pod topologySpreadConstraints does not match the current one")
}
if len(c.Statefulset.Spec.Template.Spec.Tolerations) != len(statefulSet.Spec.Template.Spec.Tolerations) {
needsReplace = true
needsRollUpdate = true
Expand Down
14 changes: 14 additions & 0 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,13 @@ func generatePodAntiAffinity(podAffinityTerm v1.PodAffinityTerm, preferredDuring
return podAntiAffinity
}

func generateTopologySpreadConstraints(labels labels.Set, topologySpreadConstraints []v1.TopologySpreadConstraint) []v1.TopologySpreadConstraint {
for _, topologySpreadConstraint := range topologySpreadConstraints {
topologySpreadConstraint.LabelSelector = &metav1.LabelSelector{MatchLabels: labels}
}
return topologySpreadConstraints
}

func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]string) []v1.Toleration {
// allow to override tolerations by postgresql manifest
if len(*tolerationsSpec) > 0 {
Expand Down Expand Up @@ -804,6 +811,7 @@ func (c *Cluster) generatePodTemplate(
initContainers []v1.Container,
sidecarContainers []v1.Container,
sharePgSocketWithSidecars *bool,
topologySpreadConstraintsSpec []v1.TopologySpreadConstraint,
tolerationsSpec *[]v1.Toleration,
spiloRunAsUser *int64,
spiloRunAsGroup *int64,
Expand Down Expand Up @@ -873,6 +881,10 @@ func (c *Cluster) generatePodTemplate(
podSpec.PriorityClassName = priorityClassName
}

if len(topologySpreadConstraintsSpec) > 0 {
podSpec.TopologySpreadConstraints = generateTopologySpreadConstraints(labels, topologySpreadConstraintsSpec)
}

if sharePgSocketWithSidecars != nil && *sharePgSocketWithSidecars {
addVarRunVolume(&podSpec)
}
Expand Down Expand Up @@ -1460,6 +1472,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
initContainers,
sidecarContainers,
c.OpConfig.SharePgSocketWithSidecars,
spec.TopologySpreadConstraints,
&tolerationSpec,
effectiveRunAsUser,
effectiveRunAsGroup,
Expand Down Expand Up @@ -2318,6 +2331,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {
[]v1.Container{},
[]v1.Container{},
util.False(),
[]v1.TopologySpreadConstraint{},
&tolerationsSpec,
nil,
nil,
Expand Down
53 changes: 53 additions & 0 deletions pkg/cluster/k8sres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3886,3 +3886,56 @@ func TestGenerateCapabilities(t *testing.T) {
}
}
}

func TestTopologySpreadConstraints(t *testing.T) {
clusterName := "acid-test-cluster"
namespace := "default"
labelSelector := &metav1.LabelSelector{
MatchLabels: cluster.labelsSet(true),
}

pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
NumberOfInstances: 1,
Resources: &acidv1.Resources{
ResourceRequests: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
ResourceLimits: acidv1.ResourceDescription{CPU: k8sutil.StringToPointer("1"), Memory: k8sutil.StringToPointer("10")},
},
Volume: acidv1.Volume{
Size: "1G",
},
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: labelSelector,
},
},
},
}

cluster := New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Name = clusterName
cluster.Namespace = namespace
cluster.labelsSet(true)

s, err := cluster.generateStatefulSet(&pg.Spec)
assert.NoError(t, err)
assert.Contains(t, s.Spec.Template.Spec.TopologySpreadConstraints, v1.TopologySpreadConstraint{
MaxSkew: int32(1),
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: labelSelector,
},
)
}
Loading