From 02547601e1e165e6e7ed9f4db21dda0142a56c2c Mon Sep 17 00:00:00 2001 From: Nicola Ferraro Date: Mon, 27 Jan 2020 12:53:26 +0100 Subject: [PATCH] # This is a combination of 2 commits. # This is the 1st commit message: #1199: use kubernetes events during reconciliation # This is the commit message #2: #1199: use kubernetes events during reconciliation --- .gitignore | 3 + akamel-config.yaml | 3 + .../camel/v1/integration_types_support.go | 10 +- pkg/cmd/operator/operator.go | 21 +++- pkg/cmd/reset.go | 2 +- pkg/cmd/run.go | 2 +- .../integration/integration_controller.go | 19 ++-- pkg/events/manager.go | 95 +++++++++++++++++++ pkg/metadata/metadata.go | 2 +- pkg/trait/deployment.go | 4 +- pkg/trait/knative_service.go | 5 +- pkg/trait/rest-dsl.go | 2 +- pkg/{ => util}/gzip/compress.go | 0 pkg/{ => util}/gzip/compress_test.go | 0 14 files changed, 152 insertions(+), 16 deletions(-) create mode 100755 akamel-config.yaml create mode 100644 pkg/events/manager.go rename pkg/{ => util}/gzip/compress.go (100%) rename pkg/{ => util}/gzip/compress_test.go (100%) diff --git a/.gitignore b/.gitignore index d74659172d..3bc834d08d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,9 @@ /kamel /license-check +# Config files +/kamel-config.yaml + # Released Packages *.tar.gz diff --git a/akamel-config.yaml b/akamel-config.yaml new file mode 100755 index 0000000000..1f20f9b8a7 --- /dev/null +++ b/akamel-config.yaml @@ -0,0 +1,3 @@ +kamel: + install: + maven-repositories: '[https://repository.apache.org/content/repositories/orgapachecamel-1171]' diff --git a/pkg/apis/camel/v1/integration_types_support.go b/pkg/apis/camel/v1/integration_types_support.go index 21b8c0e7fd..5516986e99 100644 --- a/pkg/apis/camel/v1/integration_types_support.go +++ b/pkg/apis/camel/v1/integration_types_support.go @@ -18,6 +18,7 @@ limitations under the License. package v1 import ( + "fmt" "strings" corev1 "k8s.io/api/core/v1" @@ -209,12 +210,17 @@ func (in *Integration) SetIntegrationPlatform(platform *IntegrationPlatform) { // SetIntegrationKit -- func (in *Integration) SetIntegrationKit(kit *IntegrationKit) { cs := corev1.ConditionTrue - + message := kit.Name if kit.Status.Phase != IntegrationKitPhaseReady { cs = corev1.ConditionFalse + if kit.Status.Phase == IntegrationKitPhaseNone { + message = fmt.Sprintf("creating a new integration kit") + } else { + message = fmt.Sprintf("integration kit %s is in state %q", kit.Name, kit.Status.Phase) + } } - in.Status.SetCondition(IntegrationConditionKitAvailable, cs, IntegrationConditionKitAvailableReason, kit.Name) + in.Status.SetCondition(IntegrationConditionKitAvailable, cs, IntegrationConditionKitAvailableReason, message) in.Status.Kit = kit.Name in.Status.Image = kit.Status.Image } diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index f5e6c09c14..92229df2da 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -26,10 +26,14 @@ import ( "runtime" "time" + "github.com/apache/camel-k/pkg/client" + camellog "github.com/apache/camel-k/pkg/util/log" "github.com/operator-framework/operator-sdk/pkg/k8sutil" "github.com/operator-framework/operator-sdk/pkg/leader" "github.com/operator-framework/operator-sdk/pkg/ready" sdkVersion "github.com/operator-framework/operator-sdk/version" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client/config" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -43,6 +47,8 @@ import ( ) var log = logf.Log.WithName("cmd") + +// GitCommit -- var GitCommit string func printVersion() { @@ -98,8 +104,21 @@ func Run() { } defer r.Unset() // nolint: errcheck + // Configure an event broadcaster + c, err := client.NewClient(false) + if err != nil { + log.Error(err, "cannot initialize client") + os.Exit(1) + } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(camellog.WithName("events").Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.CoreV1().Events(namespace)}) + // Create a new Cmd to provide shared dependencies and start components - mgr, err := manager.New(cfg, manager.Options{Namespace: namespace}) + mgr, err := manager.New(cfg, manager.Options{ + Namespace: namespace, + EventBroadcaster: eventBroadcaster, + }) if err != nil { log.Error(err, "") os.Exit(1) diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go index 4f6a3810f7..e45f17432f 100644 --- a/pkg/cmd/reset.go +++ b/pkg/cmd/reset.go @@ -78,7 +78,7 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) { } if err = o.resetIntegrationPlatform(c); err != nil { - fmt.Print(err) + fmt.Println(err) return } diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 5214ba7b4d..b529955e1e 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -33,9 +33,9 @@ import ( v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/client" - "github.com/apache/camel-k/pkg/gzip" "github.com/apache/camel-k/pkg/trait" "github.com/apache/camel-k/pkg/util" + "github.com/apache/camel-k/pkg/util/gzip" "github.com/apache/camel-k/pkg/util/kubernetes" k8slog "github.com/apache/camel-k/pkg/util/kubernetes/log" "github.com/apache/camel-k/pkg/util/sync" diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 63cff5eb2c..80e19111dc 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -20,10 +20,12 @@ package integration import ( "context" + "github.com/apache/camel-k/pkg/events" appsv1 "k8s.io/api/apps/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -58,8 +60,9 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { return &ReconcileIntegration{ - client: c, - scheme: mgr.GetScheme(), + client: c, + scheme: mgr.GetScheme(), + recorder: mgr.GetEventRecorderFor("camel-k-integration-controller"), } } @@ -206,8 +209,9 @@ var _ reconcile.Reconciler = &ReconcileIntegration{} type ReconcileIntegration struct { // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the apiserver - client client.Client - scheme *runtime.Scheme + client client.Client + scheme *runtime.Scheme + recorder record.EventRecorder } // Reconcile reads that state of the cluster for a Integration object and makes changes based on the state read @@ -256,12 +260,14 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R newTarget, err := a.Handle(ctx, target) if err != nil { + events.NotifyIntegrationError(r.recorder, &instance, newTarget, err) return reconcile.Result{}, err } if newTarget != nil { - if r, err := r.update(ctx, &instance, newTarget); err != nil { - return r, err + if res, err := r.update(ctx, &instance, newTarget); err != nil { + events.NotifyIntegrationError(r.recorder, &instance, newTarget, err) + return res, err } if newTarget.Status.Phase != target.Status.Phase { @@ -275,6 +281,7 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R // handle one action at time so the resource // is always at its latest state + events.NotifyIntegrationUpdated(r.recorder, &instance, newTarget) break } } diff --git a/pkg/events/manager.go b/pkg/events/manager.go new file mode 100644 index 0000000000..5cd9a70bdd --- /dev/null +++ b/pkg/events/manager.go @@ -0,0 +1,95 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "fmt" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" +) + +const ( + // ReasonIntegrationPhaseUpdated -- + ReasonIntegrationPhaseUpdated = "IntegrationPhaseUpdated" + // ReasonIntegrationConditionChanged -- + ReasonIntegrationConditionChanged = "IntegrationConditionChanged" + // ReasonIntegrationError + ReasonIntegrationError = "IntegrationError" +) + +// NotifyIntegrationError automatically generates error events when the integration reconcile cycle phase has an error +func NotifyIntegrationError(recorder record.EventRecorder, old, new *v1.Integration, err error) { + it := old + if new != nil { + it = new + } + if it == nil { + return + } + recorder.Eventf(it, corev1.EventTypeWarning, ReasonIntegrationError, "Cannot reconcile integration %s: %v", it.Name, err) +} + +// NotifyIntegrationUpdated automatically generates events when the integration changes +func NotifyIntegrationUpdated(recorder record.EventRecorder, old, new *v1.Integration) { + if new == nil { + return + } + + // Update information about phase changes + if old == nil || old.Status.Phase != new.Status.Phase { + phase := new.Status.Phase + if phase == v1.IntegrationPhaseNone { + phase = "[none]" + } + recorder.Eventf(new, corev1.EventTypeNormal, ReasonIntegrationPhaseUpdated, "Integration %s in phase %s", new.Name, phase) + } + + // Update information about changes in conditions + if new.Status.Phase != v1.IntegrationPhaseNone { + for _, cond := range getChangedConditions(old, new) { + head := "" + if cond.Status == corev1.ConditionFalse { + head = "No " + } + tail := "" + if cond.Message != "" { + tail = fmt.Sprintf(": %s", cond.Message) + } + recorder.Eventf(new, corev1.EventTypeNormal, ReasonIntegrationConditionChanged, "%s%s for integration %s%s", head, cond.Type, new.Name, tail) + } + } + +} + +func getChangedConditions(old, new *v1.Integration) (res []v1.IntegrationCondition) { + if old == nil { + old = &v1.Integration{} + } + if new == nil { + new = &v1.Integration{} + } + for _, newCond := range new.Status.Conditions { + oldCond := old.Status.GetCondition(newCond.Type) + if oldCond == nil || oldCond.Status != newCond.Status || oldCond.Message != newCond.Message { + res = append(res, newCond) + } + } + return res +} diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go index 6a6e916b21..9f9b53ea55 100644 --- a/pkg/metadata/metadata.go +++ b/pkg/metadata/metadata.go @@ -21,8 +21,8 @@ import ( "github.com/scylladb/go-set/strset" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" - "github.com/apache/camel-k/pkg/gzip" "github.com/apache/camel-k/pkg/util/camel" + "github.com/apache/camel-k/pkg/util/gzip" "github.com/apache/camel-k/pkg/util/log" src "github.com/apache/camel-k/pkg/util/source" diff --git a/pkg/trait/deployment.go b/pkg/trait/deployment.go index 4abf44cdbf..088a721562 100644 --- a/pkg/trait/deployment.go +++ b/pkg/trait/deployment.go @@ -18,6 +18,8 @@ limitations under the License. package trait import ( + "fmt" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -119,7 +121,7 @@ func (t *deploymentTrait) Apply(e *Environment) error { v1.IntegrationConditionDeploymentAvailable, corev1.ConditionTrue, v1.IntegrationConditionDeploymentAvailableReason, - deployment.Name, + fmt.Sprintf("deployment name is %s", deployment.Name), ) if e.IntegrationInPhase(v1.IntegrationPhaseRunning) { diff --git a/pkg/trait/knative_service.go b/pkg/trait/knative_service.go index 5f10270b67..37f18f2ddd 100644 --- a/pkg/trait/knative_service.go +++ b/pkg/trait/knative_service.go @@ -18,6 +18,7 @@ limitations under the License. package trait import ( + "fmt" "strconv" corev1 "k8s.io/api/core/v1" @@ -114,7 +115,7 @@ func (t *knativeServiceTrait) Configure(e *Environment) (bool, error) { v1.IntegrationConditionKnativeServiceAvailable, corev1.ConditionFalse, v1.IntegrationConditionKnativeServiceNotAvailableReason, - "controller strategy: "+string(ControllerStrategyDeployment), + fmt.Sprintf("different controller strategy chosen (%s)", string(ControllerStrategyDeployment)), ) // A controller is already present for the integration @@ -183,7 +184,7 @@ func (t *knativeServiceTrait) Apply(e *Environment) error { v1.IntegrationConditionKnativeServiceAvailable, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceAvailableReason, - ksvc.Name, + fmt.Sprintf("Knative service name is %s", ksvc.Name), ) if e.IntegrationInPhase(v1.IntegrationPhaseRunning) { diff --git a/pkg/trait/rest-dsl.go b/pkg/trait/rest-dsl.go index ec3400f33b..d3992b2184 100644 --- a/pkg/trait/rest-dsl.go +++ b/pkg/trait/rest-dsl.go @@ -31,8 +31,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" - "github.com/apache/camel-k/pkg/gzip" "github.com/apache/camel-k/pkg/util/defaults" + "github.com/apache/camel-k/pkg/util/gzip" "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/maven" ) diff --git a/pkg/gzip/compress.go b/pkg/util/gzip/compress.go similarity index 100% rename from pkg/gzip/compress.go rename to pkg/util/gzip/compress.go diff --git a/pkg/gzip/compress_test.go b/pkg/util/gzip/compress_test.go similarity index 100% rename from pkg/gzip/compress_test.go rename to pkg/util/gzip/compress_test.go