Skip to content
This repository was archived by the owner on Oct 12, 2021. It is now read-only.

Function Response Mode and empty overrides support added #27

Merged
merged 1 commit into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/300-function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
public:
description: 'Should the function be publicly available.'
type: boolean
responseMode:
description: 'Whether function responds with CE payload only or with full event.'
type: string
ceOverrides:
type: object
description: "Defines overrides to control modifications of the event attributes."
Expand Down
6 changes: 3 additions & 3 deletions config/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ spec:
value: triggermesh.io/routing

- name: RUNTIME_KLR_PYTHON
value: gcr.io/triggermesh/knative-lambda-python37:v1.5.1
value: gcr.io/triggermesh/knative-lambda-python37:v1.8.1
- name: RUNTIME_KLR_NODE
value: gcr.io/triggermesh/knative-lambda-node10:v1.5.1
value: gcr.io/triggermesh/knative-lambda-node10:v1.8.1
- name: RUNTIME_KLR_RUBY
value: gcr.io/triggermesh/knative-lambda-ruby25:v1.5.1
value: gcr.io/triggermesh/knative-lambda-ruby25:v1.8.1
32 changes: 22 additions & 10 deletions config/samples/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,33 @@ metadata:
name: inline-python-function
spec:
runtime: python
responseMode: event
public: true
ceOverrides:
extensions:
type: io.triggermesh.python.sample
entrypoint: foo
entrypoint: endpoint
code: |
import urllib.request
from random import randrange

def foo(event, context):
resp = urllib.request.urlopen(event['url'])
page = resp.read()
def endpoint(event, context):
val = randrange(10)
if (val % 2) == 0:
result = {
"type" : "io.triggermesh.klr.even",
"datacontenttype" : "application/json",
"data" : {
"value" : val
}
}

response = {
"statusCode": resp.status,
"body": str(page)
}
else:
result = {
"type" : "io.triggermesh.klr.odd",
"datacontenttype" : "application/json",
"data" : {
"value" : val
}
}
return result

return response
1 change: 1 addition & 0 deletions pkg/apis/function/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type FunctionSpec struct {
Entrypoint string `json:"entrypoint"`
Public bool `json:"public,omitempty"`
Code string `json:"code"`
ResponseMode string `json:"responseMode,omitempty"`
CloudEventOverrides *duckv1.CloudEventOverrides `json:"ceOverrides"`
Sink *duckv1.Destination `json:"sink"`
}
Expand Down
74 changes: 32 additions & 42 deletions pkg/reconciler/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,11 @@ import (
)

const (
klrEntrypoint = "/opt/aws-custom-runtime"
labelKey = "flow.trigermesh.io/function"
klrEntrypoint = "/opt/aws-custom-runtime"
labelKey = "flow.trigermesh.io/function"
ceDefaultTypePrefix = "io.triggermesh.function."
)

type ceAttributes struct {
Type string
Source string
Subject string
}

// Reconciler implements addressableservicereconciler.Interface for
// AddressableService resources.
type Reconciler struct {
Expand Down Expand Up @@ -107,11 +102,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, o *functionv1alpha1.Func
}
o.Status.MarkConfigmapAvailable()

// Parse CE overrides
ceAttr := r.ceAttributes(o)

// Reconcile Transformation Adapter
ksvc, err := r.reconcileKnService(ctx, o, cm, ceAttr)
ksvc, err := r.reconcileKnService(ctx, o, cm)
if err != nil {
logger.Error("Error reconciling Kn Service", zap.Error(err))
o.Status.MarkServiceUnavailable(o.Name)
Expand Down Expand Up @@ -141,7 +133,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, o *functionv1alpha1.Func
}
o.Status.MarkServiceAvailable()

o.Status.CloudEventAttributes = r.statusAttributes(ceAttr)
if o.Spec.CloudEventOverrides != nil {
// in status we can set default attributes only;
// there is no reliable way to get dynamic CE attributes from function source code
o.Status.CloudEventAttributes = r.statusAttributes(o.Spec.CloudEventOverrides.Extensions)
}

o.Status.MarkSinkAvailable()

logger.Debug("Transformation reconciled")
Expand Down Expand Up @@ -177,7 +174,7 @@ func (r *Reconciler) reconcileConfigmap(ctx context.Context, f *functionv1alpha1
return actualCm, nil
}

func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1.Function, cm *corev1.ConfigMap, ceAttr ceAttributes) (*servingv1.Service, error) {
func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1.Function, cm *corev1.ConfigMap) (*servingv1.Service, error) {
logger := logging.FromContext(ctx)

image, err := r.lookupRuntimeImage(f.Spec.Runtime)
Expand Down Expand Up @@ -206,16 +203,27 @@ func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1
filename := fmt.Sprintf("source.%s", fileExtension(f.Spec.Runtime))
handler := fmt.Sprintf("source.%s", f.Spec.Entrypoint)

overrides := map[string]string{
// Default values for required attributes
"type": ceDefaultTypePrefix + f.Spec.Runtime,
"source": filename,
}

if f.Spec.CloudEventOverrides != nil {
for k, v := range f.Spec.CloudEventOverrides.Extensions {
overrides[k] = v
}
}

expectedKsvc := resources.NewKnService(f.Name+"-"+rand.String(6), f.Namespace,
resources.KnSvcImage(image),
resources.KnSvcMountCm(cm.Name, filename),
resources.KnSvcEntrypoint(klrEntrypoint),
resources.KnSvcEnvVar("K_SINK", sink),
resources.KnSvcEnvVar("_HANDLER", handler),
resources.KnSvcEnvVar("RESPONSE_FORMAT", "CLOUDEVENTS"),
resources.KnSvcEnvVar("CE_TYPE", ceAttr.Type),
resources.KnSvcEnvVar("CE_SOURCE", ceAttr.Source),
resources.KnSvcEnvVar("CE_SUBJECT", ceAttr.Subject),
resources.KnSvcEnvVar("CE_FUNCTION_RESPONSE_MODE", f.Spec.ResponseMode),
resources.KnSvcEnvFromMap("CE_OVERRIDES_", overrides),
resources.KnSvcAnnotation("extensions.triggermesh.io/codeVersion", cm.ResourceVersion),
resources.KnSvcVisibility(f.Spec.Public),
resources.KnSvcLabel(map[string]string{labelKey: f.Name}),
Expand All @@ -241,36 +249,18 @@ func (r *Reconciler) reconcileKnService(ctx context.Context, f *functionv1alpha1
return actualKsvc, nil
}

func (r *Reconciler) ceAttributes(f *functionv1alpha1.Function) ceAttributes {
res := ceAttributes{
Source: f.SelfLink,
Subject: f.Spec.Entrypoint,
}
func (r *Reconciler) statusAttributes(attributes map[string]string) []duckv1.CloudEventAttributes {
res := duckv1.CloudEventAttributes{}

if f.Spec.CloudEventOverrides == nil {
return res
if typ, ok := attributes["type"]; ok {
res.Type = typ
}

for k, v := range f.Spec.CloudEventOverrides.Extensions {
switch strings.ToLower(k) {
case "type":
res.Type = v
case "source":
res.Source = v
case "subject":
res.Subject = v
}
if source, ok := attributes["source"]; ok {
res.Source = source
}
return res
}

func (r *Reconciler) statusAttributes(ceAttr ceAttributes) []duckv1.CloudEventAttributes {
return []duckv1.CloudEventAttributes{
{
Type: ceAttr.Type,
Source: ceAttr.Source,
},
}
return []duckv1.CloudEventAttributes{res}
}

func (r *Reconciler) lookupRuntimeImage(runtime string) (string, error) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/reconciler/function/resources/knservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package resources

import (
"path"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -183,3 +184,15 @@ func firstContainer(svc *servingv1.Service) *corev1.Container {
}
return &(*containers)[0]
}

func KnSvcEnvFromMap(prefix string, vars map[string]string) knSvcOption {
return func(svc *servingv1.Service) {
svcEnvVars := envVarsFrom(svc)
for k, v := range vars {
*svcEnvVars = append(*svcEnvVars, corev1.EnvVar{
Name: strings.ToUpper(prefix + k),
Value: v,
})
}
}
}