diff --git a/deploy/operator-role-kubernetes.yaml b/deploy/operator-role-kubernetes.yaml index 4e9db4cdf0..bca4878163 100644 --- a/deploy/operator-role-kubernetes.yaml +++ b/deploy/operator-role-kubernetes.yaml @@ -66,6 +66,7 @@ rules: resources: - events verbs: + - create - get - list - watch diff --git a/deploy/operator-role-olm.yaml b/deploy/operator-role-olm.yaml index a213197f4f..18a6f9162b 100644 --- a/deploy/operator-role-olm.yaml +++ b/deploy/operator-role-olm.yaml @@ -66,6 +66,7 @@ rules: resources: - events verbs: + - create - get - list - watch diff --git a/deploy/operator-role-openshift.yaml b/deploy/operator-role-openshift.yaml index 9da513255e..46ea8c5409 100644 --- a/deploy/operator-role-openshift.yaml +++ b/deploy/operator-role-openshift.yaml @@ -66,6 +66,7 @@ rules: resources: - events verbs: + - create - get - list - watch diff --git a/e2e/dev_mode_test.go b/e2e/dev_mode_test.go index fd057e97de..a028917006 100644 --- a/e2e/dev_mode_test.go +++ b/e2e/dev_mode_test.go @@ -48,11 +48,12 @@ func TestRunDevMode(t *testing.T) { kamelRun := kamelWithContext(ctx, "run", "-n", ns, file, "--dev") kamelRun.SetOut(pipew) - logScanner := util.NewLogScanner(ctx, piper, "Magicstring!", "Magicjordan!") + logScanner := util.NewLogScanner(ctx, piper, "Integration yaml in phase Running", "Magicstring!", "Magicjordan!") go kamelRun.Execute() - Eventually(logScanner.IsFound("Magicstring!"), 5*time.Minute).Should(BeTrue()) + Eventually(logScanner.IsFound("Integration yaml in phase Running"), 5*time.Minute).Should(BeTrue()) + Eventually(logScanner.IsFound("Magicstring!"), 3*time.Minute).Should(BeTrue()) Expect(logScanner.IsFound("Magicjordan!")()).To(BeFalse()) util.ReplaceInFile(t, file, "string!", "jordan!") diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go index b0441dcb15..248d7d72bf 100644 --- a/pkg/cmd/install.go +++ b/pkg/cmd/install.go @@ -366,7 +366,7 @@ func (o *installCmdOptions) install(cobraCmd *cobra.Command, _ []string) error { if collection == nil { if o.Wait { - err = o.waitForPlatformReady(platform) + err = o.waitForPlatformReady(cobraCmd, platform) if err != nil { return err } @@ -418,25 +418,20 @@ func (o *installCmdOptions) printOutput(collection *kubernetes.Collection) error return nil } -func (o *installCmdOptions) waitForPlatformReady(platform *v1.IntegrationPlatform) error { +func (o *installCmdOptions) waitForPlatformReady(cmd *cobra.Command, platform *v1.IntegrationPlatform) error { handler := func(i *v1.IntegrationPlatform) bool { - if i.Status.Phase != "" { - fmt.Println("platform \""+platform.Name+"\" in phase", i.Status.Phase) - - if i.Status.Phase == v1.IntegrationPlatformPhaseReady { - // TODO display some error info when available in the status - return false - } - - if i.Status.Phase == v1.IntegrationPlatformPhaseError { - fmt.Println("platform installation failed") - return false - } + if i.Status.Phase == v1.IntegrationPlatformPhaseReady || i.Status.Phase == v1.IntegrationPlatformPhaseError { + return false } return true } + go watch.HandleIntegrationPlatformEvents(o.Context, platform, func(event *corev1.Event) bool { + fmt.Fprintln(cmd.OutOrStdout(), event.Message) + return true + }) + return watch.HandlePlatformStateChanges(o.Context, platform, handler) } diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index b529955e1e..bb7b6f3a9f 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -43,6 +43,7 @@ import ( "github.com/magiconair/properties" "github.com/pkg/errors" "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -245,7 +246,7 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args []string) error { } if o.Wait || o.Dev { for { - integrationPhase, err := o.waitForIntegrationReady(integration) + integrationPhase, err := o.waitForIntegrationReady(cmd, integration) if err != nil { return err } @@ -253,7 +254,6 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args []string) error { if integrationPhase == nil || *integrationPhase == v1.IntegrationPhaseError { return fmt.Errorf("integration \"%s\" deployment failed", integration.Name) } else if *integrationPhase == v1.IntegrationPhaseRunning { - fmt.Println("Running") break } @@ -297,27 +297,23 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args []string) error { return nil } -func (o *runCmdOptions) waitForIntegrationReady(integration *v1.Integration) (*v1.IntegrationPhase, error) { +func (o *runCmdOptions) waitForIntegrationReady(cmd *cobra.Command, integration *v1.Integration) (*v1.IntegrationPhase, error) { handler := func(i *v1.Integration) bool { // // TODO when we add health checks, we should Wait until they are passed // - if i.Status.Phase != "" { - fmt.Println("integration \""+integration.Name+"\" in phase", i.Status.Phase) - - if i.Status.Phase == v1.IntegrationPhaseRunning { - // TODO display some error info when available in the status - return false - } - - if i.Status.Phase == v1.IntegrationPhaseError { - return false - } + if i.Status.Phase == v1.IntegrationPhaseRunning || i.Status.Phase == v1.IntegrationPhaseError { + return false } return true } + go watch.HandleIntegrationEvents(o.Context, integration, func(event *corev1.Event) bool { + fmt.Fprintln(cmd.OutOrStdout(), event.Message) + return true + }) + return watch.HandleIntegrationStateChanges(o.Context, integration, handler) } diff --git a/pkg/controller/build/build_controller.go b/pkg/controller/build/build_controller.go index ecb2995244..092dc6819c 100644 --- a/pkg/controller/build/build_controller.go +++ b/pkg/controller/build/build_controller.go @@ -22,7 +22,7 @@ import ( "sync" "time" - "github.com/apache/camel-k/pkg/events" + camelevent "github.com/apache/camel-k/pkg/event" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -208,13 +208,13 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result, newTarget, err := a.Handle(ctx, target) if err != nil { - events.NotifyBuildError(ctx, r.client, r.recorder, &instance, newTarget, err) + camelevent.NotifyBuildError(ctx, r.client, r.recorder, &instance, newTarget, err) return reconcile.Result{}, err } if newTarget != nil { if res, err := r.update(ctx, &instance, newTarget); err != nil { - events.NotifyBuildError(ctx, r.client, r.recorder, &instance, newTarget, err) + camelevent.NotifyBuildError(ctx, r.client, r.recorder, &instance, newTarget, err) return res, err } @@ -231,7 +231,7 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result, // handle one action at time so the resource // is always at its latest state - events.NotifyBuildUpdated(ctx, r.client, r.recorder, &instance, newTarget) + camelevent.NotifyBuildUpdated(ctx, r.client, r.recorder, &instance, newTarget) break } } diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 205004be4c..1532c45da6 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -20,7 +20,7 @@ package integration import ( "context" - "github.com/apache/camel-k/pkg/events" + camelevent "github.com/apache/camel-k/pkg/event" appsv1 "k8s.io/api/apps/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -260,13 +260,13 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R newTarget, err := a.Handle(ctx, target) if err != nil { - events.NotifyIntegrationError(ctx, r.client, r.recorder, &instance, newTarget, err) + camelevent.NotifyIntegrationError(ctx, r.client, r.recorder, &instance, newTarget, err) return reconcile.Result{}, err } if newTarget != nil { if res, err := r.update(ctx, &instance, newTarget); err != nil { - events.NotifyIntegrationError(ctx, r.client, r.recorder, &instance, newTarget, err) + camelevent.NotifyIntegrationError(ctx, r.client, r.recorder, &instance, newTarget, err) return res, err } @@ -281,7 +281,7 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R // handle one action at time so the resource // is always at its latest state - events.NotifyIntegrationUpdated(ctx, r.client, r.recorder, &instance, newTarget) + camelevent.NotifyIntegrationUpdated(ctx, r.client, r.recorder, &instance, newTarget) break } } diff --git a/pkg/controller/integrationkit/integrationkit_controller.go b/pkg/controller/integrationkit/integrationkit_controller.go index c86947a2c2..dd4b271996 100644 --- a/pkg/controller/integrationkit/integrationkit_controller.go +++ b/pkg/controller/integrationkit/integrationkit_controller.go @@ -20,7 +20,7 @@ package integrationkit import ( "context" - "github.com/apache/camel-k/pkg/events" + camelevent "github.com/apache/camel-k/pkg/event" "github.com/apache/camel-k/pkg/platform" "k8s.io/client-go/tools/record" @@ -227,13 +227,13 @@ func (r *ReconcileIntegrationKit) Reconcile(request reconcile.Request) (reconcil newTarget, err := a.Handle(ctx, target) if err != nil { - events.NotifyIntegrationKitError(ctx, r.client, r.recorder, &instance, newTarget, err) + camelevent.NotifyIntegrationKitError(ctx, r.client, r.recorder, &instance, newTarget, err) return reconcile.Result{}, err } if newTarget != nil { if res, err := r.update(ctx, &instance, newTarget); err != nil { - events.NotifyIntegrationKitError(ctx, r.client, r.recorder, &instance, newTarget, err) + camelevent.NotifyIntegrationKitError(ctx, r.client, r.recorder, &instance, newTarget, err) return res, err } @@ -248,7 +248,7 @@ func (r *ReconcileIntegrationKit) Reconcile(request reconcile.Request) (reconcil // handle one action at time so the resource // is always at its latest state - events.NotifyIntegrationKitUpdated(ctx, r.client, r.recorder, &instance, newTarget) + camelevent.NotifyIntegrationKitUpdated(ctx, r.client, r.recorder, &instance, newTarget) break } } diff --git a/pkg/controller/integrationplatform/integrationplatform_controller.go b/pkg/controller/integrationplatform/integrationplatform_controller.go index 1ad9a60965..0f8668af0e 100644 --- a/pkg/controller/integrationplatform/integrationplatform_controller.go +++ b/pkg/controller/integrationplatform/integrationplatform_controller.go @@ -21,7 +21,7 @@ import ( "context" "time" - "github.com/apache/camel-k/pkg/events" + camelevent "github.com/apache/camel-k/pkg/event" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" @@ -151,13 +151,13 @@ func (r *ReconcileIntegrationPlatform) Reconcile(request reconcile.Request) (rec target, err = a.Handle(ctx, target) if err != nil { - events.NotifyIntegrationPlatformError(ctx, r.client, r.recorder, &instance, target, err) + camelevent.NotifyIntegrationPlatformError(ctx, r.client, r.recorder, &instance, target, err) return reconcile.Result{}, err } if target != nil { if err := r.client.Status().Patch(ctx, target, k8sclient.MergeFrom(&instance)); err != nil { - events.NotifyIntegrationPlatformError(ctx, r.client, r.recorder, &instance, target, err) + camelevent.NotifyIntegrationPlatformError(ctx, r.client, r.recorder, &instance, target, err) return reconcile.Result{}, err } @@ -174,7 +174,7 @@ func (r *ReconcileIntegrationPlatform) Reconcile(request reconcile.Request) (rec // handle one action at time so the resource // is always at its latest state - events.NotifyIntegrationPlatformUpdated(ctx, r.client, r.recorder, &instance, target) + camelevent.NotifyIntegrationPlatformUpdated(ctx, r.client, r.recorder, &instance, target) break } } diff --git a/pkg/events/manager.go b/pkg/event/manager.go similarity index 99% rename from pkg/events/manager.go rename to pkg/event/manager.go index 5622d278b5..9e4869af0b 100644 --- a/pkg/events/manager.go +++ b/pkg/event/manager.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package events +package event import ( "context" diff --git a/pkg/trait/deployment_test.go b/pkg/trait/deployment_test.go index 7d9c666658..8a6a7857e3 100644 --- a/pkg/trait/deployment_test.go +++ b/pkg/trait/deployment_test.go @@ -119,7 +119,7 @@ func TestApplyDeploymentTraitWhileDeployingIntegrationDoesSucceed(t *testing.T) conditions := environment.Integration.Status.Conditions assert.Len(t, conditions, 1) assert.Equal(t, v1.IntegrationConditionDeploymentAvailable, conditions[0].Type) - assert.Equal(t, "integration-name", conditions[0].Message) + assert.Equal(t, "deployment name is integration-name", conditions[0].Message) } func TestApplyDeploymentTraitWhileRunningIntegrationDoesSucceed(t *testing.T) { diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go index baaaa7ce33..289fc12e4b 100644 --- a/pkg/util/watch/watch.go +++ b/pkg/util/watch/watch.go @@ -19,12 +19,13 @@ package watch import ( "context" - - "github.com/apache/camel-k/pkg/util/kubernetes" - "github.com/apache/camel-k/pkg/util/log" + "fmt" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/kubernetes/customclient" + "github.com/apache/camel-k/pkg/util/log" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" @@ -96,6 +97,69 @@ func HandleIntegrationStateChanges(ctx context.Context, integration *v1.Integrat } } +// +// HandleIntegrationEvents watches all events related to the given integration. +// +// watch.HandleIntegrationEvents(o.Context, integration, func(event *corev1.Event) bool { +// println(event.Message) +// return true +// }) +// +// This function blocks until the handler function returns true or either the events channel or the context is closed. +// +func HandleIntegrationEvents(ctx context.Context, integration *v1.Integration, + handler func(event *corev1.Event) bool) error { + dynamicClient, err := customclient.GetDynamicClientFor("", "v1", "events", integration.Namespace) + if err != nil { + return err + } + watcher, err := dynamicClient.Watch(metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.kind=Integration,"+ + "involvedObject.apiVersion=%s,"+ + "involvedObject.name=%s", + v1.SchemeGroupVersion.String(), integration.Name), + }) + if err != nil { + return err + } + + defer watcher.Stop() + events := watcher.ResultChan() + + for { + select { + case <-ctx.Done(): + return nil + case e, ok := <-events: + if !ok { + return nil + } + + if e.Object != nil { + if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { + jsondata, err := kubernetes.ToJSON(runtimeUnstructured) + if err != nil { + return err + } + evt := corev1.Event{} + err = json.Unmarshal(jsondata, &evt) + if err != nil { + log.Error(err, "Unexpected error detected when watching resource") + return nil + } + + if evt.CreationTimestamp.UnixNano() >= integration.CreationTimestamp.UnixNano() { + if !handler(&evt) { + return nil + } + } + } + } + } + } +} + + // // HandlePlatformStateChanges watches a platform resource and invoke the given handler when its status changes. // @@ -159,3 +223,65 @@ func HandlePlatformStateChanges(ctx context.Context, platform *v1.IntegrationPla } } } + +// +// HandleIntegrationPlatformEvents watches all events related to the given integration platform. +// +// watch.HandleIntegrationPlatformEvents(o.Context, platform, func(event *corev1.Event) bool { +// println(event.Message) +// return true +// }) +// +// This function blocks until the handler function returns true or either the events channel or the context is closed. +// +func HandleIntegrationPlatformEvents(ctx context.Context, p *v1.IntegrationPlatform, + handler func(event *corev1.Event) bool) error { + dynamicClient, err := customclient.GetDynamicClientFor("", "v1", "events", p.Namespace) + if err != nil { + return err + } + watcher, err := dynamicClient.Watch(metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.kind=IntegrationPlatform,"+ + "involvedObject.apiVersion=%s,"+ + "involvedObject.name=%s", + v1.SchemeGroupVersion.String(), p.Name), + }) + if err != nil { + return err + } + + defer watcher.Stop() + events := watcher.ResultChan() + + for { + select { + case <-ctx.Done(): + return nil + case e, ok := <-events: + if !ok { + return nil + } + + if e.Object != nil { + if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { + jsondata, err := kubernetes.ToJSON(runtimeUnstructured) + if err != nil { + return err + } + evt := corev1.Event{} + err = json.Unmarshal(jsondata, &evt) + if err != nil { + log.Error(err, "Unexpected error detected when watching resource") + return nil + } + + if evt.CreationTimestamp.UnixNano() >= p.CreationTimestamp.UnixNano() { + if !handler(&evt) { + return nil + } + } + } + } + } + } +}