@@ -46,16 +46,11 @@ import (
4646)
4747
4848const (
49- klrEntrypoint = "/opt/aws-custom-runtime"
50- labelKey = "flow.trigermesh.io/function"
49+ klrEntrypoint = "/opt/aws-custom-runtime"
50+ labelKey = "flow.trigermesh.io/function"
51+ ceDefaultTypePrefix = "io.triggermesh.function."
5152)
5253
53- type ceAttributes struct {
54- Type string
55- Source string
56- Subject string
57- }
58-
5954// Reconciler implements addressableservicereconciler.Interface for
6055// AddressableService resources.
6156type Reconciler struct {
@@ -107,11 +102,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, o *functionv1alpha1.Func
107102 }
108103 o .Status .MarkConfigmapAvailable ()
109104
110- // Parse CE overrides
111- ceAttr := r .ceAttributes (o )
112-
113105 // Reconcile Transformation Adapter
114- ksvc , err := r .reconcileKnService (ctx , o , cm , ceAttr )
106+ ksvc , err := r .reconcileKnService (ctx , o , cm )
115107 if err != nil {
116108 logger .Error ("Error reconciling Kn Service" , zap .Error (err ))
117109 o .Status .MarkServiceUnavailable (o .Name )
@@ -141,7 +133,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, o *functionv1alpha1.Func
141133 }
142134 o .Status .MarkServiceAvailable ()
143135
144- o .Status .CloudEventAttributes = r .statusAttributes (ceAttr )
136+ if o .Spec .CloudEventOverrides != nil {
137+ // in status we can set default attributes only;
138+ // there is no reliable way to get dynamic CE attributes from function source code
139+ o .Status .CloudEventAttributes = r .statusAttributes (o .Spec .CloudEventOverrides .Extensions )
140+ }
141+
145142 o .Status .MarkSinkAvailable ()
146143
147144 logger .Debug ("Transformation reconciled" )
@@ -177,7 +174,7 @@ func (r *Reconciler) reconcileConfigmap(ctx context.Context, f *functionv1alpha1
177174 return actualCm , nil
178175}
179176
180- func (r * Reconciler ) reconcileKnService (ctx context.Context , f * functionv1alpha1.Function , cm * corev1.ConfigMap , ceAttr ceAttributes ) (* servingv1.Service , error ) {
177+ func (r * Reconciler ) reconcileKnService (ctx context.Context , f * functionv1alpha1.Function , cm * corev1.ConfigMap ) (* servingv1.Service , error ) {
181178 logger := logging .FromContext (ctx )
182179
183180 image , err := r .lookupRuntimeImage (f .Spec .Runtime )
@@ -206,16 +203,27 @@ func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1
206203 filename := fmt .Sprintf ("source.%s" , fileExtension (f .Spec .Runtime ))
207204 handler := fmt .Sprintf ("source.%s" , f .Spec .Entrypoint )
208205
206+ overrides := map [string ]string {
207+ // Default values for required attributes
208+ "type" : ceDefaultTypePrefix + f .Spec .Runtime ,
209+ "source" : filename ,
210+ }
211+
212+ if f .Spec .CloudEventOverrides != nil {
213+ for k , v := range f .Spec .CloudEventOverrides .Extensions {
214+ overrides [k ] = v
215+ }
216+ }
217+
209218 expectedKsvc := resources .NewKnService (f .Name + "-" + rand .String (6 ), f .Namespace ,
210219 resources .KnSvcImage (image ),
211220 resources .KnSvcMountCm (cm .Name , filename ),
212221 resources .KnSvcEntrypoint (klrEntrypoint ),
213222 resources .KnSvcEnvVar ("K_SINK" , sink ),
214223 resources .KnSvcEnvVar ("_HANDLER" , handler ),
215224 resources .KnSvcEnvVar ("RESPONSE_FORMAT" , "CLOUDEVENTS" ),
216- resources .KnSvcEnvVar ("CE_TYPE" , ceAttr .Type ),
217- resources .KnSvcEnvVar ("CE_SOURCE" , ceAttr .Source ),
218- resources .KnSvcEnvVar ("CE_SUBJECT" , ceAttr .Subject ),
225+ resources .KnSvcEnvVar ("CE_FUNCTION_RESPONSE_MODE" , f .Spec .ResponseMode ),
226+ resources .KnSvcEnvFromMap ("CE_OVERRIDES_" , overrides ),
219227 resources .KnSvcAnnotation ("extensions.triggermesh.io/codeVersion" , cm .ResourceVersion ),
220228 resources .KnSvcVisibility (f .Spec .Public ),
221229 resources .KnSvcLabel (map [string ]string {labelKey : f .Name }),
@@ -241,36 +249,18 @@ func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1
241249 return actualKsvc , nil
242250}
243251
244- func (r * Reconciler ) ceAttributes (f * functionv1alpha1.Function ) ceAttributes {
245- res := ceAttributes {
246- Source : f .SelfLink ,
247- Subject : f .Spec .Entrypoint ,
248- }
252+ func (r * Reconciler ) statusAttributes (attributes map [string ]string ) []duckv1.CloudEventAttributes {
253+ res := duckv1.CloudEventAttributes {}
249254
250- if f . Spec . CloudEventOverrides == nil {
251- return res
255+ if typ , ok := attributes [ "type" ]; ok {
256+ res . Type = typ
252257 }
253258
254- for k , v := range f .Spec .CloudEventOverrides .Extensions {
255- switch strings .ToLower (k ) {
256- case "type" :
257- res .Type = v
258- case "source" :
259- res .Source = v
260- case "subject" :
261- res .Subject = v
262- }
259+ if source , ok := attributes ["source" ]; ok {
260+ res .Source = source
263261 }
264- return res
265- }
266262
267- func (r * Reconciler ) statusAttributes (ceAttr ceAttributes ) []duckv1.CloudEventAttributes {
268- return []duckv1.CloudEventAttributes {
269- {
270- Type : ceAttr .Type ,
271- Source : ceAttr .Source ,
272- },
273- }
263+ return []duckv1.CloudEventAttributes {res }
274264}
275265
276266func (r * Reconciler ) lookupRuntimeImage (runtime string ) (string , error ) {
0 commit comments