diff --git a/config/300-function.yaml b/config/300-function.yaml index a764364..613360a 100644 --- a/config/300-function.yaml +++ b/config/300-function.yaml @@ -60,6 +60,9 @@ spec: public: description: 'Should the function be publicly available.' type: boolean + responseMode: + description: 'Whether function responds with CE payload only or with full event.' + type: string ceOverrides: type: object description: "Defines overrides to control modifications of the event attributes." diff --git a/config/controller.yaml b/config/controller.yaml index c67bde7..678b73b 100644 --- a/config/controller.yaml +++ b/config/controller.yaml @@ -56,8 +56,8 @@ spec: value: triggermesh.io/routing - name: RUNTIME_KLR_PYTHON - value: gcr.io/triggermesh/knative-lambda-python37:v1.5.1 + value: gcr.io/triggermesh/knative-lambda-python37:v1.8.1 - name: RUNTIME_KLR_NODE - value: gcr.io/triggermesh/knative-lambda-node10:v1.5.1 + value: gcr.io/triggermesh/knative-lambda-node10:v1.8.1 - name: RUNTIME_KLR_RUBY - value: gcr.io/triggermesh/knative-lambda-ruby25:v1.5.1 + value: gcr.io/triggermesh/knative-lambda-ruby25:v1.8.1 diff --git a/config/samples/python.yaml b/config/samples/python.yaml index 9c5f3e3..433773a 100644 --- a/config/samples/python.yaml +++ b/config/samples/python.yaml @@ -18,21 +18,33 @@ metadata: name: inline-python-function spec: runtime: python + responseMode: event public: true ceOverrides: extensions: type: io.triggermesh.python.sample - entrypoint: foo + entrypoint: endpoint code: | - import urllib.request + from random import randrange - def foo(event, context): - resp = urllib.request.urlopen(event['url']) - page = resp.read() + def endpoint(event, context): + val = randrange(10) + if (val % 2) == 0: + result = { + "type" : "io.triggermesh.klr.even", + "datacontenttype" : "application/json", + "data" : { + "value" : val + } + } - response = { - "statusCode": resp.status, - "body": str(page) - } + else: + result = { + "type" : "io.triggermesh.klr.odd", + "datacontenttype" : "application/json", + "data" : { + "value" : val + } + } + return result - return response diff --git a/pkg/apis/function/v1alpha1/function_types.go b/pkg/apis/function/v1alpha1/function_types.go index 000a92a..d5d048d 100644 --- a/pkg/apis/function/v1alpha1/function_types.go +++ b/pkg/apis/function/v1alpha1/function_types.go @@ -55,6 +55,7 @@ type FunctionSpec struct { Entrypoint string `json:"entrypoint"` Public bool `json:"public,omitempty"` Code string `json:"code"` + ResponseMode string `json:"responseMode,omitempty"` CloudEventOverrides *duckv1.CloudEventOverrides `json:"ceOverrides"` Sink *duckv1.Destination `json:"sink"` } diff --git a/pkg/reconciler/function/function.go b/pkg/reconciler/function/function.go index 422709e..a0b4812 100644 --- a/pkg/reconciler/function/function.go +++ b/pkg/reconciler/function/function.go @@ -46,16 +46,11 @@ import ( ) const ( - klrEntrypoint = "/opt/aws-custom-runtime" - labelKey = "flow.trigermesh.io/function" + klrEntrypoint = "/opt/aws-custom-runtime" + labelKey = "flow.trigermesh.io/function" + ceDefaultTypePrefix = "io.triggermesh.function." ) -type ceAttributes struct { - Type string - Source string - Subject string -} - // Reconciler implements addressableservicereconciler.Interface for // AddressableService resources. type Reconciler struct { @@ -107,11 +102,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, o *functionv1alpha1.Func } o.Status.MarkConfigmapAvailable() - // Parse CE overrides - ceAttr := r.ceAttributes(o) - // Reconcile Transformation Adapter - ksvc, err := r.reconcileKnService(ctx, o, cm, ceAttr) + ksvc, err := r.reconcileKnService(ctx, o, cm) if err != nil { logger.Error("Error reconciling Kn Service", zap.Error(err)) o.Status.MarkServiceUnavailable(o.Name) @@ -141,7 +133,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, o *functionv1alpha1.Func } o.Status.MarkServiceAvailable() - o.Status.CloudEventAttributes = r.statusAttributes(ceAttr) + if o.Spec.CloudEventOverrides != nil { + // in status we can set default attributes only; + // there is no reliable way to get dynamic CE attributes from function source code + o.Status.CloudEventAttributes = r.statusAttributes(o.Spec.CloudEventOverrides.Extensions) + } + o.Status.MarkSinkAvailable() logger.Debug("Transformation reconciled") @@ -177,7 +174,7 @@ func (r *Reconciler) reconcileConfigmap(ctx context.Context, f *functionv1alpha1 return actualCm, nil } -func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1.Function, cm *corev1.ConfigMap, ceAttr ceAttributes) (*servingv1.Service, error) { +func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1.Function, cm *corev1.ConfigMap) (*servingv1.Service, error) { logger := logging.FromContext(ctx) image, err := r.lookupRuntimeImage(f.Spec.Runtime) @@ -206,6 +203,18 @@ func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1 filename := fmt.Sprintf("source.%s", fileExtension(f.Spec.Runtime)) handler := fmt.Sprintf("source.%s", f.Spec.Entrypoint) + overrides := map[string]string{ + // Default values for required attributes + "type": ceDefaultTypePrefix + f.Spec.Runtime, + "source": filename, + } + + if f.Spec.CloudEventOverrides != nil { + for k, v := range f.Spec.CloudEventOverrides.Extensions { + overrides[k] = v + } + } + expectedKsvc := resources.NewKnService(f.Name+"-"+rand.String(6), f.Namespace, resources.KnSvcImage(image), resources.KnSvcMountCm(cm.Name, filename), @@ -213,9 +222,8 @@ func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1 resources.KnSvcEnvVar("K_SINK", sink), resources.KnSvcEnvVar("_HANDLER", handler), resources.KnSvcEnvVar("RESPONSE_FORMAT", "CLOUDEVENTS"), - resources.KnSvcEnvVar("CE_TYPE", ceAttr.Type), - resources.KnSvcEnvVar("CE_SOURCE", ceAttr.Source), - resources.KnSvcEnvVar("CE_SUBJECT", ceAttr.Subject), + resources.KnSvcEnvVar("CE_FUNCTION_RESPONSE_MODE", f.Spec.ResponseMode), + resources.KnSvcEnvFromMap("CE_OVERRIDES_", overrides), resources.KnSvcAnnotation("extensions.triggermesh.io/codeVersion", cm.ResourceVersion), resources.KnSvcVisibility(f.Spec.Public), resources.KnSvcLabel(map[string]string{labelKey: f.Name}), @@ -241,36 +249,18 @@ func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1 return actualKsvc, nil } -func (r *Reconciler) ceAttributes(f *functionv1alpha1.Function) ceAttributes { - res := ceAttributes{ - Source: f.SelfLink, - Subject: f.Spec.Entrypoint, - } +func (r *Reconciler) statusAttributes(attributes map[string]string) []duckv1.CloudEventAttributes { + res := duckv1.CloudEventAttributes{} - if f.Spec.CloudEventOverrides == nil { - return res + if typ, ok := attributes["type"]; ok { + res.Type = typ } - for k, v := range f.Spec.CloudEventOverrides.Extensions { - switch strings.ToLower(k) { - case "type": - res.Type = v - case "source": - res.Source = v - case "subject": - res.Subject = v - } + if source, ok := attributes["source"]; ok { + res.Source = source } - return res -} -func (r *Reconciler) statusAttributes(ceAttr ceAttributes) []duckv1.CloudEventAttributes { - return []duckv1.CloudEventAttributes{ - { - Type: ceAttr.Type, - Source: ceAttr.Source, - }, - } + return []duckv1.CloudEventAttributes{res} } func (r *Reconciler) lookupRuntimeImage(runtime string) (string, error) { diff --git a/pkg/reconciler/function/resources/knservice.go b/pkg/reconciler/function/resources/knservice.go index 604db73..d114748 100644 --- a/pkg/reconciler/function/resources/knservice.go +++ b/pkg/reconciler/function/resources/knservice.go @@ -18,6 +18,7 @@ package resources import ( "path" + "strings" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -183,3 +184,15 @@ func firstContainer(svc *servingv1.Service) *corev1.Container { } return &(*containers)[0] } + +func KnSvcEnvFromMap(prefix string, vars map[string]string) knSvcOption { + return func(svc *servingv1.Service) { + svcEnvVars := envVarsFrom(svc) + for k, v := range vars { + *svcEnvVars = append(*svcEnvVars, corev1.EnvVar{ + Name: strings.ToUpper(prefix + k), + Value: v, + }) + } + } +}