Skip to content

Commit

Permalink
Syncer: also mutate StatefulSets and ReplicaSets
Browse files Browse the repository at this point in the history
Signed-off-by: David Festal <dfestal@redhat.com>
  • Loading branch information
davidfestal committed Feb 23, 2023
1 parent 771cfc8 commit e26c87b
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/kcp-dev/logicalcluster/v3"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,7 +37,7 @@ import (

type ListSecretFunc func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error)

type DeploymentMutator struct {
type PodSpecableMutator struct {
upstreamURL *url.URL
listSecrets ListSecretFunc
serviceLister listerscorev1.ServiceLister
Expand All @@ -49,18 +48,30 @@ type DeploymentMutator struct {
upsyncPods bool
}

func (dm *DeploymentMutator) GVR() schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
func (dm *PodSpecableMutator) GVRs() []schema.GroupVersionResource {
return []schema.GroupVersionResource{
{
Group: "apps",
Version: "v1",
Resource: "deployments",
},
{
Group: "apps",
Version: "v1",
Resource: "statefulsets",
},
{
Group: "apps",
Version: "v1",
Resource: "replicasets",
},
}
}

func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister,
func NewPodspecableMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister,
syncTargetClusterName logicalcluster.Name,
syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *DeploymentMutator {
return &DeploymentMutator{
syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *PodSpecableMutator {
return &PodSpecableMutator{
upstreamURL: upstreamURL,
listSecrets: secretLister,
serviceLister: serviceLister,
Expand All @@ -73,24 +84,30 @@ func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, ser
}

// Mutate applies the mutator changes to the object.
func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
var deployment appsv1.Deployment
err := runtime.DefaultUnstructuredConverter.FromUnstructured(
obj.UnstructuredContent(),
&deployment)
func (dm *PodSpecableMutator) Mutate(obj *unstructured.Unstructured) error {
podTemplateUnstr, ok, err := unstructured.NestedMap(obj.UnstructuredContent(), "spec", "template")
if err != nil {
return err
}
upstreamLogicalName := logicalcluster.From(obj)
if !ok {
return fmt.Errorf("object should have a PodTemplate.Spec 'spec.template', but doesn't: %v", obj)
}

templateSpec := &deployment.Spec.Template.Spec
podTemplate := &corev1.PodTemplateSpec{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(
podTemplateUnstr,
&podTemplate)
if err != nil {
return err
}
upstreamLogicalName := logicalcluster.From(obj)

desiredServiceAccountName := "default"
if templateSpec.ServiceAccountName != "" && templateSpec.ServiceAccountName != "default" {
desiredServiceAccountName = templateSpec.ServiceAccountName
if podTemplate.Spec.ServiceAccountName != "" && podTemplate.Spec.ServiceAccountName != "default" {
desiredServiceAccountName = podTemplate.Spec.ServiceAccountName
}

rawSecretList, err := dm.listSecrets(upstreamLogicalName, deployment.Namespace)
rawSecretList, err := dm.listSecrets(upstreamLogicalName, obj.GetNamespace())
if err != nil {
return fmt.Errorf("error listing secrets for workspace %s: %w", upstreamLogicalName.String(), err)
}
Expand Down Expand Up @@ -123,14 +140,14 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
}

if desiredSecretName == "" {
return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, deployment.Namespace, upstreamLogicalName.String())
return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, obj.GetNamespace(), upstreamLogicalName.String())
}

// Setting AutomountServiceAccountToken to false allow us to control the ServiceAccount
// VolumeMount and Volume definitions.
templateSpec.AutomountServiceAccountToken = utilspointer.BoolPtr(false)
podTemplate.Spec.AutomountServiceAccountToken = utilspointer.BoolPtr(false)
// Set to empty the serviceAccountName on podTemplate as we are not syncing the serviceAccount down to the workload cluster.
templateSpec.ServiceAccountName = ""
podTemplate.Spec.ServiceAccountName = ""

kcpExternalHost := dm.upstreamURL.Hostname()
kcpExternalPort := dm.upstreamURL.Port()
Expand Down Expand Up @@ -193,42 +210,42 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
}

// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the containers
for i := range deployment.Spec.Template.Spec.Containers {
for i := range podTemplate.Spec.Containers {
for _, overrideEnv := range overrideEnvs {
templateSpec.Containers[i].Env = updateEnv(templateSpec.Containers[i].Env, overrideEnv)
podTemplate.Spec.Containers[i].Env = updateEnv(podTemplate.Spec.Containers[i].Env, overrideEnv)
}
templateSpec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.Containers[i].Env, deployment)
templateSpec.Containers[i].VolumeMounts = updateVolumeMount(templateSpec.Containers[i].VolumeMounts, serviceAccountMount)
podTemplate.Spec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.Containers[i].Env, obj)
podTemplate.Spec.Containers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.Containers[i].VolumeMounts, serviceAccountMount)
}

// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Init containers
for i := range templateSpec.InitContainers {
for i := range podTemplate.Spec.InitContainers {
for _, overrideEnv := range overrideEnvs {
templateSpec.InitContainers[i].Env = updateEnv(templateSpec.InitContainers[i].Env, overrideEnv)
podTemplate.Spec.InitContainers[i].Env = updateEnv(podTemplate.Spec.InitContainers[i].Env, overrideEnv)
}
templateSpec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.InitContainers[i].Env, deployment)
templateSpec.InitContainers[i].VolumeMounts = updateVolumeMount(templateSpec.InitContainers[i].VolumeMounts, serviceAccountMount)
podTemplate.Spec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.InitContainers[i].Env, obj)
podTemplate.Spec.InitContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.InitContainers[i].VolumeMounts, serviceAccountMount)
}

// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Ephemeral containers
for i := range templateSpec.EphemeralContainers {
for i := range podTemplate.Spec.EphemeralContainers {
for _, overrideEnv := range overrideEnvs {
templateSpec.EphemeralContainers[i].Env = updateEnv(templateSpec.EphemeralContainers[i].Env, overrideEnv)
podTemplate.Spec.EphemeralContainers[i].Env = updateEnv(podTemplate.Spec.EphemeralContainers[i].Env, overrideEnv)
}
templateSpec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.EphemeralContainers[i].Env, deployment)
templateSpec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(templateSpec.EphemeralContainers[i].VolumeMounts, serviceAccountMount)
podTemplate.Spec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.EphemeralContainers[i].Env, obj)
podTemplate.Spec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.EphemeralContainers[i].VolumeMounts, serviceAccountMount)
}

// Add the ServiceAccount volume with our overrides.
found := false
for i := range templateSpec.Volumes {
if templateSpec.Volumes[i].Name == "kcp-api-access" {
templateSpec.Volumes[i] = serviceAccountVolume
for i := range podTemplate.Spec.Volumes {
if podTemplate.Spec.Volumes[i].Name == "kcp-api-access" {
podTemplate.Spec.Volumes[i] = serviceAccountVolume
found = true
}
}
if !found {
templateSpec.Volumes = append(templateSpec.Volumes, serviceAccountVolume)
podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, serviceAccountVolume)
}

// Overrides DNS to point to the workspace DNS
Expand All @@ -238,8 +255,8 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
return err // retry
}

deployment.Spec.Template.Spec.DNSPolicy = corev1.DNSNone
deployment.Spec.Template.Spec.DNSConfig = &corev1.PodDNSConfig{
podTemplate.Spec.DNSPolicy = corev1.DNSNone
podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{
Nameservers: []string{dnsIP},
Searches: []string{ // TODO(LV): from /etc/resolv.conf
obj.GetNamespace() + ".svc.cluster.local",
Expand All @@ -256,31 +273,29 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {

if dm.upsyncPods {
syncTargetKey := workloadv1alpha1.ToSyncTargetKey(dm.syncTargetClusterName, dm.syncTargetName)
labels := deployment.Spec.Template.Labels
labels := podTemplate.Labels
if labels == nil {
labels = map[string]string{}
}
labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] = string(workloadv1alpha1.ResourceStateUpsync)
labels[workloadv1alpha1.InternalDownstreamClusterLabel] = syncTargetKey
deployment.Spec.Template.Labels = labels
podTemplate.Labels = labels

// TODO(davidfestal): In the future we could add a diff annotation to transform the resource while upsyncing:
// - remove unnecessary fields we don't want leaking to upstream
// - add an owner reference to the upstream deployment
}

unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deployment)
newPodTemplateUnstr, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplate)
if err != nil {
return err
}

// Set the changes back into the obj.
obj.SetUnstructuredContent(unstructured)

return nil
return unstructured.SetNestedMap(obj.Object, newPodTemplateUnstr, "spec", "template")
}

func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) {
func (dm *PodSpecableMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) {
// Retrieve the DNS IP associated to the workspace
dnsServiceName := shared.GetDNSID(workspace, dm.syncTargetUID, dm.syncTargetName)

Expand All @@ -299,13 +314,13 @@ func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name)
}

// resolveDownwardAPIFieldRefEnv replaces the downwardAPI FieldRef EnvVars with the value from the deployment, right now it only replaces the metadata.namespace.
func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, deployment appsv1.Deployment) []corev1.EnvVar {
func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, podspecable *unstructured.Unstructured) []corev1.EnvVar {
var result []corev1.EnvVar
for _, env := range envs {
if env.ValueFrom != nil && env.ValueFrom.FieldRef != nil && env.ValueFrom.FieldRef.FieldPath == "metadata.namespace" {
result = append(result, corev1.EnvVar{
Name: env.Name,
Value: deployment.Namespace,
Value: podspecable.GetNamespace(),
})
} else {
result = append(result, env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ func TestDeploymentMutate(t *testing.T) {
require.NoError(t, err, "Service Add() = %v", err)
svcLister := listerscorev1.NewServiceLister(serviceIndexer)

dm := NewDeploymentMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods)
dm := NewPodspecableMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods)

unstrOriginalDeployment, err := toUnstructured(c.originalDeployment)
require.NoError(t, err, "toUnstructured() = %v", err)
Expand Down
12 changes: 7 additions & 5 deletions pkg/syncer/spec/mutators/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
type SecretMutator struct {
}

func (sm *SecretMutator) GVR() schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "secrets",
func (sm *SecretMutator) GVRs() []schema.GroupVersionResource {
return []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "secrets",
},
}
}

Expand Down
38 changes: 15 additions & 23 deletions pkg/syncer/spec/spec_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package spec
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"time"
Expand All @@ -33,8 +32,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -53,7 +50,6 @@ import (
syncerindexers "github.com/kcp-dev/kcp/pkg/syncer/indexers"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/syncer/spec/dns"
specmutators "github.com/kcp-dev/kcp/pkg/syncer/spec/mutators"
. "github.com/kcp-dev/kcp/tmc/pkg/logging"
)

Expand All @@ -63,10 +59,15 @@ const (

var namespaceGVR schema.GroupVersionResource = corev1.SchemeGroupVersion.WithResource("namespaces")

type Mutator interface {
GVRs() []schema.GroupVersionResource
Mutate(obj *unstructured.Unstructured) error
}

type Controller struct {
queue workqueue.RateLimitingInterface

mutators mutatorGvrMap
mutators map[schema.GroupVersionResource]Mutator
dnsProcessor *dns.DNSProcessor

upstreamClient kcpdynamic.ClusterInterface
Expand Down Expand Up @@ -94,7 +95,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
dnsNamespace string,
syncerNamespaceInformerFactory informers.SharedInformerFactory,
dnsImage string,
upsyncPods bool) (*Controller, error) {
mutators ...Mutator) (*Controller, error) {
c := Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),

Expand All @@ -118,9 +119,9 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
informer, ok := informers[gvr]
if !ok {
if shared.ContainsGVR(notSynced, gvr) {
return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr)
return nil, fmt.Errorf("informer for gvr %v not synced in the upstream informer factory", gvr)
}
return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr)
return nil, fmt.Errorf("gvr %v should be known in the upstream informer factory", gvr)
}
return informer.Lister(), nil
},
Expand All @@ -137,6 +138,8 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
syncTargetUID: syncTargetUID,
syncTargetKey: syncTargetKey,
advancedSchedulingEnabled: advancedSchedulingEnabled,

mutators: make(map[schema.GroupVersionResource]Mutator, 2),
}

logger := logging.WithReconciler(syncerLogger, controllerName)
Expand Down Expand Up @@ -254,29 +257,18 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
},
})

secretMutator := specmutators.NewSecretMutator()

dnsServiceLister := syncerNamespaceInformerFactory.Core().V1().Services().Lister()

deploymentMutator := specmutators.NewDeploymentMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) {
secretLister, err := c.getUpstreamLister(corev1.SchemeGroupVersion.WithResource("secrets"))
if err != nil {
return nil, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory")
for _, mutator := range mutators {
for _, gvr := range mutator.GVRs() {
c.mutators[gvr] = mutator
}
return secretLister.ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything())
}, dnsServiceLister, syncTargetClusterName, syncTargetUID, syncTargetName, dnsNamespace, upsyncPods)

c.mutators = mutatorGvrMap{
deploymentMutator.GVR(): deploymentMutator.Mutate,
secretMutator.GVR(): secretMutator.Mutate,
}

c.dnsProcessor = dns.NewDNSProcessor(downstreamKubeClient,
syncerNamespaceInformerFactory.Core().V1().ServiceAccounts().Lister(),
syncerNamespaceInformerFactory.Rbac().V1().Roles().Lister(),
syncerNamespaceInformerFactory.Rbac().V1().RoleBindings().Lister(),
syncerNamespaceInformerFactory.Apps().V1().Deployments().Lister(),
dnsServiceLister,
syncerNamespaceInformerFactory.Core().V1().Services().Lister(),
syncerNamespaceInformerFactory.Core().V1().Endpoints().Lister(),
syncerNamespaceInformerFactory.Networking().V1().NetworkPolicies().Lister(),
syncTargetUID, syncTargetName, dnsNamespace, dnsImage)
Expand Down
4 changes: 1 addition & 3 deletions pkg/syncer/spec/spec_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ const (
syncerApplyManager = "syncer"
)

type mutatorGvrMap map[schema.GroupVersionResource]func(obj *unstructured.Unstructured) error

func deepEqualApartFromStatus(logger logr.Logger, oldUnstrob, newUnstrob *unstructured.Unstructured) bool {
// TODO(jmprusi): Remove this after switching to virtual workspaces.
// remove status annotation from oldObj and newObj before comparing
Expand Down Expand Up @@ -443,7 +441,7 @@ func (c *Controller) applyToDownstream(ctx context.Context, gvr schema.GroupVers

// Run any transformations on the object before we apply it to the downstream cluster.
if mutator, ok := c.mutators[gvr]; ok {
if err := mutator(downstreamObj); err != nil {
if err := mutator.Mutate(downstreamObj); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit e26c87b

Please sign in to comment.