Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit objects cached by DevWorkspace controller to reduce memory usage #652

Merged
merged 14 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 21 additions & 29 deletions controllers/controller/devworkspacerouting/sync_ingresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,15 @@ import (
"fmt"

"github.com/devfile/devworkspace-operator/pkg/constants"
"github.com/devfile/devworkspace-operator/pkg/provision/sync"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"

controllerv1alpha1 "github.com/devfile/devworkspace-operator/apis/controller/v1alpha1"
)

var ingressDiffOpts = cmp.Options{
cmpopts.IgnoreFields(networkingv1.Ingress{}, "TypeMeta", "ObjectMeta", "Status"),
cmpopts.IgnoreFields(networkingv1.HTTPIngressPath{}, "PathType"),
}

func (r *DevWorkspaceRoutingReconciler) syncIngresses(routing *controllerv1alpha1.DevWorkspaceRouting, specIngresses []networkingv1.Ingress) (ok bool, clusterIngresses []networkingv1.Ingress, err error) {
ingressesInSync := true

Expand All @@ -53,32 +46,31 @@ func (r *DevWorkspaceRoutingReconciler) syncIngresses(routing *controllerv1alpha
ingressesInSync = false
}

clusterAPI := sync.ClusterAPI{
Client: r.Client,
Scheme: r.Scheme,
Logger: r.Log.WithValues("Request.Namespace", routing.Namespace, "Request.Name", routing.Name),
Ctx: context.TODO(),
}

var updatedClusterIngresses []networkingv1.Ingress
for _, specIngress := range specIngresses {
if contains, idx := listContainsIngressByName(specIngress, clusterIngresses); contains {
clusterIngress := clusterIngresses[idx]
if !cmp.Equal(specIngress, clusterIngress, ingressDiffOpts) {
r.Log.Info(fmt.Sprintf("Updating ingress: %s", clusterIngress.Name))
if r.DebugLogging {
r.Log.Info(fmt.Sprintf("Diff: %s", cmp.Diff(specIngress, clusterIngress, ingressDiffOpts)))
}
// Update ingress's spec
clusterIngress.Spec = specIngress.Spec
err := r.Update(context.TODO(), &clusterIngress)
if err != nil && !errors.IsConflict(err) {
return false, nil, err
}
ingressesInSync = false
}
} else {
err := r.Create(context.TODO(), &specIngress)
if err != nil {
return false, nil, err
}
clusterObj, err := sync.SyncObjectWithCluster(&specIngress, clusterAPI)
switch t := err.(type) {
case nil:
break
case *sync.NotInSyncError:
ingressesInSync = false
continue
case *sync.UnrecoverableSyncError:
return false, nil, t.Cause
default:
return false, nil, err
}
updatedClusterIngresses = append(updatedClusterIngresses, *clusterObj.(*networkingv1.Ingress))
}

return ingressesInSync, clusterIngresses, nil
return ingressesInSync, updatedClusterIngresses, nil
}

func (r *DevWorkspaceRoutingReconciler) getClusterIngresses(routing *controllerv1alpha1.DevWorkspaceRouting) ([]networkingv1.Ingress, error) {
Expand Down
53 changes: 21 additions & 32 deletions controllers/controller/devworkspacerouting/sync_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,14 @@ import (
"fmt"

"github.com/devfile/devworkspace-operator/pkg/constants"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/devfile/devworkspace-operator/pkg/provision/sync"
routeV1 "github.com/openshift/api/route/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"

controllerv1alpha1 "github.com/devfile/devworkspace-operator/apis/controller/v1alpha1"
)

var routeDiffOpts = cmp.Options{
cmpopts.IgnoreFields(routeV1.Route{}, "TypeMeta", "ObjectMeta", "Status"),
cmpopts.IgnoreFields(routeV1.RouteSpec{}, "WildcardPolicy", "Host"),
cmpopts.IgnoreFields(routeV1.RouteTargetReference{}, "Weight"),
}

func (r *DevWorkspaceRoutingReconciler) syncRoutes(routing *controllerv1alpha1.DevWorkspaceRouting, specRoutes []routeV1.Route) (ok bool, clusterRoutes []routeV1.Route, err error) {
routesInSync := true

Expand All @@ -54,33 +45,31 @@ func (r *DevWorkspaceRoutingReconciler) syncRoutes(routing *controllerv1alpha1.D
routesInSync = false
}

for _, specRoute := range specRoutes {
if contains, idx := listContainsRouteByName(specRoute, clusterRoutes); contains {
clusterRoute := clusterRoutes[idx]
if !cmp.Equal(specRoute, clusterRoute, routeDiffOpts) {
r.Log.Info(fmt.Sprintf("Updating route: %s", clusterRoute.Name))
if r.DebugLogging {
r.Log.Info(fmt.Sprintf("Diff: %s", cmp.Diff(specRoute, clusterRoute, routeDiffOpts)))
}
// Update route's spec
clusterRoute.Spec = specRoute.Spec
err := r.Update(context.TODO(), &clusterRoute)
if err != nil && !errors.IsConflict(err) {
return false, nil, err
}
clusterAPI := sync.ClusterAPI{
Client: r.Client,
Scheme: r.Scheme,
Logger: r.Log.WithValues("Request.Namespace", routing.Namespace, "Request.Name", routing.Name),
Ctx: context.TODO(),
}

routesInSync = false
}
} else {
err := r.Create(context.TODO(), &specRoute)
if err != nil {
return false, nil, err
}
var updatedClusterRoutes []routeV1.Route
for _, specIngress := range specRoutes {
clusterObj, err := sync.SyncObjectWithCluster(&specIngress, clusterAPI)
switch t := err.(type) {
case nil:
break
case *sync.NotInSyncError:
routesInSync = false
continue
case *sync.UnrecoverableSyncError:
return false, nil, t.Cause
default:
return false, nil, err
}
updatedClusterRoutes = append(updatedClusterRoutes, *clusterObj.(*routeV1.Route))
}

return routesInSync, clusterRoutes, nil
return routesInSync, updatedClusterRoutes, nil
}

func (r *DevWorkspaceRoutingReconciler) getClusterRoutes(routing *controllerv1alpha1.DevWorkspaceRouting) ([]routeV1.Route, error) {
Expand Down
76 changes: 22 additions & 54 deletions controllers/controller/devworkspacerouting/sync_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,16 @@ package devworkspacerouting
import (
"context"
"fmt"
"sort"
"strings"

"github.com/devfile/devworkspace-operator/pkg/constants"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/devfile/devworkspace-operator/pkg/provision/sync"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"

controllerv1alpha1 "github.com/devfile/devworkspace-operator/apis/controller/v1alpha1"
)

var serviceDiffOpts = cmp.Options{
cmpopts.IgnoreFields(corev1.Service{}, "TypeMeta", "ObjectMeta", "Status"),
cmp.Comparer(func(x, y corev1.ServiceSpec) bool {
xCopy := x.DeepCopy()
yCopy := y.DeepCopy()
if !cmp.Equal(xCopy.Selector, yCopy.Selector) {
return false
}
// Function that takes a slice of servicePorts and returns the appropriate comparison
// function to pass to sort.Slice() for that slice of servicePorts.
servicePortSorter := func(servicePorts []corev1.ServicePort) func(i, j int) bool {
return func(i, j int) bool {
return strings.Compare(servicePorts[i].Name, servicePorts[j].Name) > 0
}
}
sort.Slice(xCopy.Ports, servicePortSorter(xCopy.Ports))
sort.Slice(yCopy.Ports, servicePortSorter(yCopy.Ports))
if !cmp.Equal(xCopy.Ports, yCopy.Ports) {
return false
}
return xCopy.Type == yCopy.Type
}),
}

func (r *DevWorkspaceRoutingReconciler) syncServices(routing *controllerv1alpha1.DevWorkspaceRouting, specServices []corev1.Service) (ok bool, clusterServices []corev1.Service, err error) {
servicesInSync := true

Expand All @@ -74,34 +45,31 @@ func (r *DevWorkspaceRoutingReconciler) syncServices(routing *controllerv1alpha1
servicesInSync = false
}

for _, specService := range specServices {
if contains, idx := listContainsByName(specService, clusterServices); contains {
clusterService := clusterServices[idx]
if !cmp.Equal(specService, clusterService, serviceDiffOpts) {
r.Log.Info(fmt.Sprintf("Updating service: %s", clusterService.Name))
if r.DebugLogging {
r.Log.Info(fmt.Sprintf("Diff: %s", cmp.Diff(specService, clusterService, serviceDiffOpts)))
}
// Cannot naively copy spec, as clusterIP is unmodifiable
clusterIP := clusterService.Spec.ClusterIP
clusterService.Spec = specService.Spec
clusterService.Spec.ClusterIP = clusterIP
err := r.Update(context.TODO(), &clusterService)
if err != nil && !errors.IsConflict(err) {
return false, nil, err
}
servicesInSync = false
}
} else {
err := r.Create(context.TODO(), &specService)
if err != nil {
return false, nil, err
}
clusterAPI := sync.ClusterAPI{
Client: r.Client,
Scheme: r.Scheme,
Logger: r.Log.WithValues("Request.Namespace", routing.Namespace, "Request.Name", routing.Name),
Ctx: context.TODO(),
}

var updatedClusterServices []corev1.Service
for _, specIngress := range specServices {
clusterObj, err := sync.SyncObjectWithCluster(&specIngress, clusterAPI)
switch t := err.(type) {
case nil:
break
case *sync.NotInSyncError:
servicesInSync = false
continue
case *sync.UnrecoverableSyncError:
return false, nil, t.Cause
default:
return false, nil, err
}
updatedClusterServices = append(updatedClusterServices, *clusterObj.(*corev1.Service))
}

return servicesInSync, clusterServices, nil
return servicesInSync, updatedClusterServices, nil
}

func (r *DevWorkspaceRoutingReconciler) getClusterServices(routing *controllerv1alpha1.DevWorkspaceRouting) ([]corev1.Service, error) {
Expand Down
7 changes: 4 additions & 3 deletions controllers/workspace/devworkspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

devfilevalidation "github.com/devfile/api/v2/pkg/validation"
"github.com/devfile/devworkspace-operator/pkg/provision/sync"

"github.com/devfile/devworkspace-operator/apis/controller/v1alpha1"
controllerv1alpha1 "github.com/devfile/devworkspace-operator/apis/controller/v1alpha1"
Expand Down Expand Up @@ -95,7 +96,7 @@ type DevWorkspaceReconciler struct {

func (r *DevWorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (reconcileResult ctrl.Result, err error) {
reqLogger := r.Log.WithValues("Request.Namespace", req.Namespace, "Request.Name", req.Name)
clusterAPI := wsprovision.ClusterAPI{
clusterAPI := sync.ClusterAPI{
Client: r.Client,
Scheme: r.Scheme,
Logger: reqLogger,
Expand Down Expand Up @@ -287,7 +288,7 @@ func (r *DevWorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request

timing.SetTime(timingInfo, timing.ComponentsReady)

rbacStatus := wsprovision.SyncRBAC(workspace, r.Client, reqLogger)
rbacStatus := wsprovision.SyncRBAC(workspace, clusterAPI)
if rbacStatus.Err != nil || !rbacStatus.Continue {
return reconcile.Result{Requeue: true}, rbacStatus.Err
}
Expand Down Expand Up @@ -325,7 +326,7 @@ func (r *DevWorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request
annotate.AddURLAttributesToEndpoints(&workspace.Spec.Template, routingStatus.ExposedEndpoints)

// Step three: provision a configmap on the cluster to mount the flattened devfile in deployment containers
err = metadata.ProvisionWorkspaceMetadata(devfilePodAdditions, clusterWorkspace, workspace, &clusterAPI)
err = metadata.ProvisionWorkspaceMetadata(devfilePodAdditions, clusterWorkspace, workspace, clusterAPI)
if err != nil {
switch provisionErr := err.(type) {
case *metadata.NotReadyError:
Expand Down
3 changes: 2 additions & 1 deletion controllers/workspace/finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"

dw "github.com/devfile/api/v2/pkg/apis/workspaces/v1alpha2"
"github.com/devfile/devworkspace-operator/pkg/provision/sync"

"github.com/go-logr/logr"
coputil "github.com/redhat-cop/operator-utils/pkg/util"
Expand Down Expand Up @@ -72,7 +73,7 @@ func (r *DevWorkspaceReconciler) finalize(ctx context.Context, log logr.Logger,
failedStatus.setConditionTrue(dw.DevWorkspaceError, err.Error())
return r.updateWorkspaceStatus(workspace, r.Log, &failedStatus, reconcile.Result{}, nil)
}
err = storageProvisioner.CleanupWorkspaceStorage(workspace, wsprovision.ClusterAPI{
err = storageProvisioner.CleanupWorkspaceStorage(workspace, sync.ClusterAPI{
Ctx: ctx,
Client: r.Client,
Scheme: r.Scheme,
Expand Down
4 changes: 2 additions & 2 deletions controllers/workspace/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

dw "github.com/devfile/api/v2/pkg/apis/workspaces/v1alpha2"
"github.com/devfile/devworkspace-operator/pkg/provision/sync"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/devfile/devworkspace-operator/controllers/workspace/metrics"
"github.com/devfile/devworkspace-operator/pkg/conditions"
"github.com/devfile/devworkspace-operator/pkg/config"
wsprovision "github.com/devfile/devworkspace-operator/pkg/provision/workspace"
)

const (
Expand Down Expand Up @@ -145,7 +145,7 @@ func syncConditions(workspaceStatus *dw.DevWorkspaceStatus, currentStatus *curre
})
}

func syncWorkspaceMainURL(workspace *dw.DevWorkspace, exposedEndpoints map[string]v1alpha1.ExposedEndpointList, clusterAPI wsprovision.ClusterAPI) (ok bool, err error) {
func syncWorkspaceMainURL(workspace *dw.DevWorkspace, exposedEndpoints map[string]v1alpha1.ExposedEndpointList, clusterAPI sync.ClusterAPI) (ok bool, err error) {
mainUrl := getMainUrl(exposedEndpoints)

if workspace.Status.MainUrl == mainUrl {
Expand Down
14 changes: 10 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ import (
"os"
"runtime"

"sigs.k8s.io/controller-runtime/pkg/client"

corev1 "k8s.io/api/core/v1"

"github.com/devfile/devworkspace-operator/controllers/controller/devworkspacerouting"
"github.com/devfile/devworkspace-operator/controllers/controller/devworkspacerouting/solvers"
"github.com/devfile/devworkspace-operator/pkg/cache"
"github.com/devfile/devworkspace-operator/pkg/config"
"github.com/devfile/devworkspace-operator/pkg/infrastructure"
"github.com/devfile/devworkspace-operator/pkg/webhook"
Expand All @@ -42,11 +39,13 @@ import (
oauthv1 "github.com/openshift/api/oauth/v1"
routev1 "github.com/openshift/api/route/v1"
templatev1 "github.com/openshift/api/template/v1"
corev1 "k8s.io/api/core/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -99,13 +98,20 @@ func main() {
setupLog.Info(fmt.Sprintf("Commit: %s", version.Commit))
setupLog.Info(fmt.Sprintf("BuildTime: %s", version.BuildTime))

cacheFunc, err := cache.GetCacheFunc()
if err != nil {
setupLog.Error(err, "failed to set up objects cache")
os.Exit(1)
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: ":6789",
LeaderElection: enableLeaderElection,
LeaderElectionID: "8d217f93.devfile.io",
NewCache: cacheFunc,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
Expand Down
Loading