Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add finalizer to ensure integration children are cleaned up #480

Merged
merged 3 commits into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/camel/v1alpha1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ const (
IntegrationPhaseError IntegrationPhase = "Error"
// IntegrationPhaseBuildFailureRecovery --
IntegrationPhaseBuildFailureRecovery IntegrationPhase = "Building Failure Recovery"
// IntegrationPhaseDeleting --
IntegrationPhaseDeleting IntegrationPhase = "Deleting"
)

func init() {
Expand Down
12 changes: 12 additions & 0 deletions pkg/cmd/completion_bash.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ __kamel_runtimes() {
COMPREPLY=( $( compgen -W "${type_list}" -- "$cur") )
}

__kamel_deletion_policy() {
local type_list="owner label"
COMPREPLY=( $( compgen -W "${type_list}" -- "$cur") )
}

__kamel_kubectl_get_configmap() {
local template
local kubectl_out
Expand Down Expand Up @@ -244,6 +249,13 @@ func configureKnownBashCompletions(command *cobra.Command) {
cobra.BashCompCustom: {"__kamel_traits"},
},
)
configureBashAnnotationForFlag(
command,
"deletion-policy",
map[string][]string{
cobra.BashCompCustom: {"__kamel_deletion_policy"},
},
)
}

func configureBashAnnotationForFlag(command *cobra.Command, flagName string, annotations map[string][]string) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (o *installCmdOptions) waitForPlatformReady(platform *v1alpha1.IntegrationP
return watch.HandlePlatformStateChanges(o.Context, platform, handler)
}

func (o *installCmdOptions) validate(cmd *cobra.Command, args []string) error {
func (o *installCmdOptions) validate(_ *cobra.Command, _ []string) error {
var result error

// Let's register only our own APIs
Expand Down
13 changes: 12 additions & 1 deletion pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"strings"
"syscall"

"github.com/apache/camel-k/pkg/util/finalizer"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/gzip"
Expand Down Expand Up @@ -68,7 +70,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The integration name")
cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", nil, "The integration dependency")
cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the integration to be running")
cmd.Flags().StringVarP(&options.IntegrationContext, "context", "x", "", "The contex used to run the integration")
cmd.Flags().StringVarP(&options.IntegrationContext, "context", "x", "", "The context used to run the integration")
cmd.Flags().StringArrayVarP(&options.Properties, "property", "p", nil, "Add a camel property")
cmd.Flags().StringSliceVar(&options.ConfigMaps, "configmap", nil, "Add a ConfigMap")
cmd.Flags().StringSliceVar(&options.Secrets, "secret", nil, "Add a Secret")
Expand All @@ -84,6 +86,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
cmd.Flags().BoolVar(&options.Compression, "compression", false, "Enable store source as a compressed binary blob")
cmd.Flags().StringSliceVar(&options.Resources, "resource", nil, "Add a resource")
cmd.Flags().StringSliceVar(&options.OpenAPIs, "open-api", nil, "Add an OpenAPI v2 spec")
cmd.Flags().StringVar(&options.DeletionPolicy, "deletion-policy", "owner", "Policy used to cleanup child resources, default owner")

// completion support
configureKnownCompletions(&cmd)
Expand All @@ -98,6 +101,7 @@ type runCmdOptions struct {
Logs bool
Sync bool
Dev bool
DeletionPolicy string
IntegrationContext string
Runtime string
IntegrationName string
Expand Down Expand Up @@ -258,6 +262,7 @@ func (o *runCmdOptions) createIntegration(c client.Client, sources []string) (*v
return o.updateIntegrationCode(c, sources)
}

//nolint: gocyclo
func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string) (*v1alpha1.Integration, error) {
namespace := o.Namespace

Expand Down Expand Up @@ -338,6 +343,12 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string)
})
}

if o.DeletionPolicy == "label" {
integration.Finalizers = []string{
finalizer.CamelIntegrationFinalizer,
}
}

if o.Runtime != "" {
integration.Spec.AddDependency("runtime:" + o.Runtime)
}
Expand Down
143 changes: 143 additions & 0 deletions pkg/controller/integration/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"context"
"fmt"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/finalizer"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/log"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
)

// NewDeleteAction creates a new monitoring action for an integration
func NewDeleteAction() Action {
return &deleteAction{}
}

type deleteAction struct {
baseAction
}

func (action *deleteAction) Name() string {
return "delete"
}

func (action *deleteAction) CanHandle(integration *v1alpha1.Integration) bool {
return integration.Status.Phase == v1alpha1.IntegrationPhaseDeleting
}

func (action *deleteAction) Handle(ctx context.Context, integration *v1alpha1.Integration) error {
l := log.Log.ForIntegration(integration)

ok, err := finalizer.Exists(integration, finalizer.CamelIntegrationFinalizer)
if err != nil {
return err
}
if !ok {
return nil
}

target := integration.DeepCopy()

// Select all resources created by this integration
selectors := []string{
fmt.Sprintf("camel.apache.org/integration=%s", integration.Name),
}

resources, err := kubernetes.LookUpResources(ctx, action.client, integration.Namespace, selectors)
if err != nil {
return err
}

// If the ForegroundDeletion deletion is not set remove the finalizer and
// delete child resources from a dedicated goroutine
foreground, err := finalizer.Exists(integration, finalizer.ForegroundDeletion)
if err != nil {
return err
}

if !foreground {
//
// Async
//
if err := action.removeFinalizer(ctx, target); err != nil {
return err
}

go func() {
if err := action.deleteChildResources(context.TODO(), target, resources); err != nil {
l.Error(err, "error deleting child resources")
}
}()
} else {
//
// Sync
//
if err := action.deleteChildResources(ctx, target, resources); err != nil {
return err
}
if err = action.removeFinalizer(ctx, target); err != nil {
return err
}
}

return nil
}

func (action *deleteAction) removeFinalizer(ctx context.Context, integration *v1alpha1.Integration) error {
_, err := finalizer.Remove(integration, finalizer.CamelIntegrationFinalizer)
if err != nil {
return err
}

return action.client.Update(ctx, integration)
}

func (action *deleteAction) deleteChildResources(ctx context.Context, integration *v1alpha1.Integration, resources []unstructured.Unstructured) error {
l := log.Log.ForIntegration(integration)

// And delete them
for _, resource := range resources {
// pin the resource
resource := resource

l.Infof("Deleting child resource: %s/%s", resource.GetKind(), resource.GetName())

err := action.client.Delete(ctx, &resource, k8sclient.PropagationPolicy(metav1.DeletePropagationOrphan))
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
l.Errorf(err, "cannot delete child resource: %s/%s", resource.GetKind(), resource.GetName())
}
} else {
l.Infof("Child resource deleted: %s/%s", resource.GetKind(), resource.GetName())
}
}

return nil
}
12 changes: 12 additions & 0 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/apache/camel-k/pkg/client"

"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"

"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -115,6 +116,12 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R
NewDeployAction(),
NewErrorRecoveryAction(),
NewMonitorAction(),
NewDeleteAction(),
}

// Delete phase
if instance.GetDeletionTimestamp() != nil {
instance.Status.Phase = camelv1alpha1.IntegrationPhaseDeleting
}

ilog := rlog.ForIntegration(instance)
Expand All @@ -131,12 +138,17 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R

// Fetch the Integration again and check the state
if err = r.client.Get(ctx, request.NamespacedName, instance); err != nil {
if k8serrors.IsNotFound(err) && instance.Status.Phase == camelv1alpha1.IntegrationPhaseDeleting {
return reconcile.Result{}, nil
}

return reconcile.Result{}, err
}

if instance.Status.Phase == camelv1alpha1.IntegrationPhaseRunning {
return reconcile.Result{}, nil
}

// Requeue
return reconcile.Result{
RequeueAfter: 5 * time.Second,
Expand Down
85 changes: 9 additions & 76 deletions pkg/trait/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ import (
"context"
"fmt"
"strconv"
"strings"

"github.com/apache/camel-k/pkg/util/kubernetes"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
)

type garbageCollectorTrait struct {
Expand Down Expand Up @@ -76,8 +73,14 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
// Register a post action that deletes the existing resources that are labelled
// with the previous integration generations.
e.PostActions = append(e.PostActions, func(environment *Environment) error {
selectors := []string{
fmt.Sprintf("camel.apache.org/integration=%s", e.Integration.Name),
"camel.apache.org/generation",
fmt.Sprintf("camel.apache.org/generation notin (%d)", e.Integration.GetGeneration()),
}

// Retrieve older generation resources that may can enlisted for garbage collection
resources, err := getOldGenerationResources(e)
resources, err := kubernetes.LookUpResources(context.TODO(), e.Client, e.Integration.Namespace, selectors)
if err != nil {
return err
}
Expand All @@ -102,73 +105,3 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {

return nil
}

func getOldGenerationResources(e *Environment) ([]unstructured.Unstructured, error) {
// We rely on the discovery API to retrieve all the resources group and kind.
// That results in an unbounded collection that can be a bit slow (a couple of seconds).
// We may want to refine that step by white-listing or enlisting types to speed-up
// the collection duration.
types, err := getDiscoveryTypes(e.Client)
if err != nil {
return nil, err
}

selectors := []string{
fmt.Sprintf("camel.apache.org/integration=%s", e.Integration.Name),
"camel.apache.org/generation",
fmt.Sprintf("camel.apache.org/generation notin (%d)", e.Integration.GetGeneration()),
}

selector, err := labels.Parse(strings.Join(selectors, ","))
if err != nil {
return nil, err
}

res := make([]unstructured.Unstructured, 0)

for _, t := range types {
options := k8sclient.ListOptions{
Namespace: e.Integration.Namespace,
LabelSelector: selector,
Raw: &metav1.ListOptions{
TypeMeta: t,
},
}
list := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": t.APIVersion,
"kind": t.Kind,
},
}
if err := e.Client.List(context.TODO(), &options, &list); err != nil {
if k8serrors.IsNotFound(err) ||
k8serrors.IsForbidden(err) ||
k8serrors.IsMethodNotSupported(err) {
continue
}
return nil, err
}

res = append(res, list.Items...)
}
return res, nil
}

func getDiscoveryTypes(client client.Client) ([]metav1.TypeMeta, error) {
resources, err := client.Discovery().ServerPreferredNamespacedResources()
if err != nil {
return nil, err
}

types := make([]metav1.TypeMeta, 0)
for _, resource := range resources {
for _, r := range resource.APIResources {
types = append(types, metav1.TypeMeta{
Kind: r.Kind,
APIVersion: resource.GroupVersion,
})
}
}

return types, nil
}
Loading