diff --git a/.gitignore b/.gitignore index be19816797..6040af5a4b 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,9 @@ build/_output build/_test build/_maven_output +# envrc +.envrc + # eclipse / vscode .settings .classpath diff --git a/deploy/resources.go b/deploy/resources.go index 882550aded..c6878fa5ad 100644 --- a/deploy/resources.go +++ b/deploy/resources.go @@ -8351,29 +8351,6 @@ spec: http: false passive: false -` - Resources["cr-example.yaml"] = - ` -apiVersion: camel.apache.org/v1alpha1 -kind: Integration -metadata: - name: example -spec: - source: - content: |- - // This is Camel K Groovy example route - - rnd = new Random() - - from('timer:groovy?period=1s') - .routeId('groovy') - .setBody() - .constant('Hello Camel K!') - .process { - it.in.headers['RandomValue'] = rnd.nextInt() - } - .to('log:info?showHeaders=true') - name: routes.groovy ` Resources["crd-build.yaml"] = ` @@ -8540,6 +8517,29 @@ spec: description: The IntegrationContext to use JSONPath: .status.context +` + Resources["cr-example.yaml"] = + ` +apiVersion: camel.apache.org/v1alpha1 +kind: Integration +metadata: + name: example +spec: + source: + content: |- + // This is Camel K Groovy example route + + rnd = new Random() + + from('timer:groovy?period=1s') + .routeId('groovy') + .setBody() + .constant('Hello Camel K!') + .process { + it.in.headers['RandomValue'] = rnd.nextInt() + } + .to('log:info?showHeaders=true') + name: routes.groovy ` Resources["operator-deployment.yaml"] = ` diff --git a/docs/traits.adoc b/docs/traits.adoc index 0493800cb7..6fe592919e 100644 --- a/docs/traits.adoc +++ b/docs/traits.adoc @@ -157,6 +157,9 @@ More information can be found in the official Kubernetes documentation about htt ! knative.endpoint-sinks ! Configures a (comma-separated) list of endpoints the Knative consumes. +! knative.filter-source-channels +! Force the knative endpoint to filter messages based on the `ce-knativehistory` header (Knative experimental feature). It's enabled automatically when there are more than 2 source channels. It's optional (default to false) when there's a single source channel. + !=== | istio diff --git a/examples/knative/messages-channel.yaml b/examples/knative/messages-channel.yaml index 2dcd271fda..a418bdcb30 100644 --- a/examples/knative/messages-channel.yaml +++ b/examples/knative/messages-channel.yaml @@ -6,4 +6,4 @@ spec: provisioner: apiVersion: eventing.knative.dev/v1alpha1 kind: ClusterChannelProvisioner - name: in-memory-channel \ No newline at end of file + name: in-memory \ No newline at end of file diff --git a/examples/knative/words-channel.yaml b/examples/knative/words-channel.yaml index ad8640fb6f..f5516564bf 100644 --- a/examples/knative/words-channel.yaml +++ b/examples/knative/words-channel.yaml @@ -6,4 +6,4 @@ spec: provisioner: apiVersion: eventing.knative.dev/v1alpha1 kind: ClusterChannelProvisioner - name: in-memory-channel \ No newline at end of file + name: in-memory \ No newline at end of file diff --git a/pkg/apis/camel/v1alpha1/knative/types.go b/pkg/apis/camel/v1alpha1/knative/types.go index 5c80e43f5a..833931be96 100644 --- a/pkg/apis/camel/v1alpha1/knative/types.go +++ b/pkg/apis/camel/v1alpha1/knative/types.go @@ -60,11 +60,13 @@ const ( // Meta Options const ( - CamelMetaServicePath = "service.path" - CamelMetaServiceID = "service.id" - CamelMetaServiceName = "service.name" - CamelMetaServiceHost = "service.host" - CamelMetaServicePort = "service.port" - CamelMetaServiceZone = "service.zone" - CamelMetaServiceProtocol = "service.protocol" + CamelMetaServicePath = "service.path" + CamelMetaServiceID = "service.id" + CamelMetaServiceName = "service.name" + CamelMetaServiceHost = "service.host" + CamelMetaServicePort = "service.port" + CamelMetaServiceZone = "service.zone" + CamelMetaServiceProtocol = "service.protocol" + CamelMetaFilterHeaderName = "filter.header.name" + CamelMetaFilterHeaderValue = "filter.header.value" ) diff --git a/pkg/trait/deployment.go b/pkg/trait/deployment.go index 64320a0546..91bf938c4a 100644 --- a/pkg/trait/deployment.go +++ b/pkg/trait/deployment.go @@ -127,9 +127,12 @@ func (t *deploymentTrait) getDeploymentFor(e *Environment) *appsv1.Deployment { "camel.apache.org/integration": e.Integration.Name, } - annotations := e.Integration.Annotations - if annotations == nil { - annotations = make(map[string]string) + // create a copy to avoid sharing the underlying annotation map + annotations := make(map[string]string) + if e.Integration.Annotations != nil { + for k, v := range FilterTransferableAnnotations(e.Integration.Annotations) { + annotations[k] = v + } } // Resolve registry host names when used @@ -143,9 +146,7 @@ func (t *deploymentTrait) getDeploymentFor(e *Environment) *appsv1.Deployment { ObjectMeta: metav1.ObjectMeta{ Name: e.Integration.Name, Namespace: e.Integration.Namespace, - Labels: map[string]string{ - "camel.apache.org/integration": e.Integration.Name, - }, + Labels: labels, Annotations: annotations, }, Spec: appsv1.DeploymentSpec{ @@ -156,6 +157,7 @@ func (t *deploymentTrait) getDeploymentFor(e *Environment) *appsv1.Deployment { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, + Annotations: annotations, }, Spec: corev1.PodSpec{ ServiceAccountName: e.Integration.Spec.ServiceAccountName, diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index 7436f67f17..6b15d2477b 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -34,15 +34,20 @@ import ( ) type knativeTrait struct { - BaseTrait `property:",squash"` - Configuration string `property:"configuration"` - ChannelSources string `property:"channel-sources"` - ChannelSinks string `property:"channel-sinks"` - EndpointSources string `property:"endpoint-sources"` - EndpointSinks string `property:"endpoint-sinks"` - Auto *bool `property:"auto"` + BaseTrait `property:",squash"` + Configuration string `property:"configuration"` + ChannelSources string `property:"channel-sources"` + ChannelSinks string `property:"channel-sinks"` + EndpointSources string `property:"endpoint-sources"` + EndpointSinks string `property:"endpoint-sinks"` + FilterSourceChannels *bool `property:"filter-source-channels"` + Auto *bool `property:"auto"` } +const ( + knativeHistoryHeader = "ce-knativehistory" +) + func newKnativeTrait() *knativeTrait { t := &knativeTrait{ BaseTrait: newBaseTrait("knative"), @@ -101,6 +106,13 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { t.EndpointSinks = strings.Join(items, ",") } + if len(strings.Split(t.ChannelSources, ",")) > 1 { + // Always filter channels when the integration subscribes to more than one + // Using Knative experimental header: https://github.com/knative/eventing/blob/master/pkg/provisioners/message.go#L28 + // TODO: filter automatically all source channels when the feature becomes stable + filter := true + t.FilterSourceChannels = &filter + } } return true, nil @@ -169,15 +181,21 @@ func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEn if env.ContainsService(ch, knativeapi.CamelServiceTypeChannel) { continue } + meta := map[string]string{ + knativeapi.CamelMetaServicePath: "/", + } + if t.FilterSourceChannels != nil && *t.FilterSourceChannels { + fullName := ch + "." + e.Integration.Namespace + ".channels.cluster.local" + meta[knativeapi.CamelMetaFilterHeaderName] = knativeHistoryHeader + meta[knativeapi.CamelMetaFilterHeaderValue] = fullName + } svc := knativeapi.CamelServiceDefinition{ Name: ch, Host: "0.0.0.0", Port: 8080, Protocol: knativeapi.CamelProtocolHTTP, ServiceType: knativeapi.CamelServiceTypeChannel, - Metadata: map[string]string{ - knativeapi.CamelMetaServicePath: "/", - }, + Metadata: meta, } env.Services = append(env.Services, svc) } diff --git a/pkg/trait/knative_service.go b/pkg/trait/knative_service.go index bae73dd83d..cb04bc628c 100644 --- a/pkg/trait/knative_service.go +++ b/pkg/trait/knative_service.go @@ -125,6 +125,14 @@ func (t *knativeServiceTrait) getServiceFor(e *Environment) *serving.Service { } annotations := make(map[string]string) + + // Copy annotations from the integration resource + if e.Integration.Annotations != nil { + for k, v := range FilterTransferableAnnotations(e.Integration.Annotations) { + annotations[k] = v + } + } + // Resolve registry host names when used annotations["alpha.image.policy.openshift.io/resolve-names"] = "*" diff --git a/pkg/trait/util.go b/pkg/trait/util.go index 30ab45a22e..4355c8d2f1 100644 --- a/pkg/trait/util.go +++ b/pkg/trait/util.go @@ -133,6 +133,19 @@ func parseCsvMap(csvMap *string) (map[string]string, error) { return m, nil } +// FilterTransferableAnnotations returns a map containing annotations that are meaningful for being transferred to child resources. +func FilterTransferableAnnotations(annotations map[string]string) map[string]string { + res := make(map[string]string) + for k, v := range annotations { + if strings.HasPrefix(k, "kubectl.kubernetes.io") { + // filter out kubectl annotations + continue + } + res[k]=v + } + return res +} + func decodeTraitSpec(in *v1alpha1.TraitSpec, target interface{}) error { md := mapstructure.Metadata{}