diff --git a/chart/kafka-connector/README.md b/chart/kafka-connector/README.md index 87ff9c8798..95396cd1f9 100644 --- a/chart/kafka-connector/README.md +++ b/chart/kafka-connector/README.md @@ -47,9 +47,9 @@ $ helm repo add openfaas https://openfaas.github.io/faas-netes/ Prepare a custom [values.yaml](values.yaml) with: -* brokerHosts - comma separted list of host:port -* topics - the topics to subscribe to -* replicas - this should match the partition size, so if the size is 3, set this to 3 +- brokerHosts - comma separted list of host:port +- topics - the topics to subscribe to +- replicas - this should match the partition size, so if the size is 3, set this to 3 Then you will need to read up on the encryption and authentication options and update the settings accordingly. @@ -65,46 +65,105 @@ $ helm repo update && \ ## Encryption options -1) TLS off (default) -2) TLS on +1. TLS off (default) +2. TLS on ## Authentication options -1) TLS with SASL using CA from the default trust store -3) TLS with SASL using a custom CA -4) TLS with client certificates +1. TLS with SASL using CA from the default trust store +2. TLS with SASL using a custom CA +3. TLS with client certificates + +## Async invocations + +The connector can be configured to invoke function asynchronously. This lets you use [OpenFaaS async](https://docs.openfaas.com/reference/async/) features like retries. +To prevent the connector from consuming all Kafka messages at once and submitting them to the OpenFaaS async queue a limit on the number of inflight async invocations can be configured. + +Configure the connector for async invocations: + +```yaml +# Invoke functions asynchronously. +asyncInvocation: true + +async: + # Limit the number of inflight async invocations for the connector. + # A value of 0 indicates no concurrency limit. + maxInflight: 0 + +# Configure an externally-managed NATS server. +# NATS is used for async invocations and is required when +# setting the 'async.maxInflight' parameter to a value other than 0. +# By default the OpenFaaS embedded nats deployment is used. +# These values should be identical to the configuration in your OpenFaaS deployment values.yaml file +# when external nats is enabled. +nats: + external: + enabled: false + host: "" + port: "" +``` + +### Reset the inflight concurrency counter + +If the inflight counter gets out if sync for some reason, e.g a misconfiguration, network issues, it can be forcefully reset. +The connecter checks if a Lease object exists on startup and resets the counter if the Lease does not exist. + +Remove the lease and restart the connector to reset the counter. + +1. Remove the lease + +```sh +$ kubectl get lease -n openfaas + +NAME HOLDER AGE +kafka-connector 18m +``` + +```sh +kubectl delete lease kafka-connector -n openfaas +``` + +2. Restart the connector + +```sh +kubectl rollout restart deploy/kafka-connector -n openfaas +``` ## Configuration Additional kafka-connector options in `values.yaml`. -| Parameter | Description | Default | -|------------------------|------------------------------------------------------------------------------------------------------------------------------------|--------------------------------| -| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` | -| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` | -| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` | -| `asyncInvocation` | For long running or slow functions, offload to asychronous function invocations and carry on processing the stream | `false` | -| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` | -| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` | -| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` | -| `printResponse` | Output the response of calling a function in the logs. | `true` | -| `printResponseBody` | Output to the logs the response body when calling a function. | `false` | -| `printRequestBody` | Output to the logs the request body when calling a function. | `false` | -| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `` | -| `tls` | Connect to the broker server(s) using TLS encryption | `true` | -| `sasl` | Enable auth with a SASL username/password | `false` | -| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` | -| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` | -| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` | -| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` | -| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` | -| `contentType` | Set a HTTP Content Type during function invocation. | `""` | -| `group` | Set the Kafka consumer group name. | `""` | -| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` | -| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` | -| `initialOffset` | Either newest or oldest. | `"oldest"` | -| `logs.debug` | Print debug logs | `false` | -| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` | +| Parameter | Description | Default | +| ----------------------- | ---------------------------------------------------------------------------------------------------------------------------------- | ------------------------------ | +| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` | +| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` | +| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` | +| `asyncInvocation` | Invoke function asychronously and carry on processing the stream | `false` | +| `async.maxInflight` | Limit the number of inflight async invocations for the connector. A value of 0 indicates no concurrency limit. | `0` | +| `nats.external.enabled` | Whether to use an externally-managed NATS server. | `false` | +| `nats.external.host` | The host at which the externally-managed NATS server can be reached | `""` | +| `nats.external.port` | The port at which the externally-managed NATS server can be reached | `""` | +| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` | +| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` | +| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` | +| `printResponse` | Output the response of calling a function in the logs. | `true` | +| `printResponseBody` | Output to the logs the response body when calling a function. | `false` | +| `printRequestBody` | Output to the logs the request body when calling a function. | `false` | +| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `""` | +| `tls` | Connect to the broker server(s) using TLS encryption | `true` | +| `sasl` | Enable auth with a SASL username/password | `false` | +| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` | +| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` | +| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` | +| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` | +| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` | +| `contentType` | Set a HTTP Content Type during function invocation. | `""` | +| `group` | Set the Kafka consumer group name. | `""` | +| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` | +| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` | +| `initialOffset` | Either newest or oldest. | `"oldest"` | +| `logs.debug` | Print debug logs | `false` | +| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` | Specify each parameter using the `--set key=value[,key=value]` argument to `helm install`. See `values.yaml` for the default configuration. diff --git a/chart/kafka-connector/templates/deployment.yml b/chart/kafka-connector/templates/deployment.yml index 84139f0d0b..47c8013add 100644 --- a/chart/kafka-connector/templates/deployment.yml +++ b/chart/kafka-connector/templates/deployment.yml @@ -32,6 +32,9 @@ spec: app: {{ template "connector.name" . }} component: kafka-connector spec: + {{- if and .Values.asyncInvocation (gt (int .Values.async.maxInflight) 0) }} + serviceAccountName: {{ template "connector.fullname" . }} + {{- end }} volumes: - name: openfaas-license secret: @@ -87,6 +90,12 @@ spec: - "-key-file=/var/secrets/broker-key/broker-key" {{- end }} env: + - name: connector_id + value: "{{template "connector.fullname" . }}" + - name: namespace + valueFrom: + fieldRef: + fieldPath: metadata.namespace - name: gateway_url value: {{ .Values.gatewayURL | quote }} - name: topics @@ -99,6 +108,16 @@ spec: value: {{ .Values.printRequestBody | quote }} - name: asynchronous_invocation value: {{ .Values.asyncInvocation | quote }} + - name: async_max_inflight + value: {{ .Values.async.maxInflight | quote }} + - name: async_callback_url + value: "http://{{ template "connector.fullname" . }}.{{ .Release.Namespace }}:8080/api/v1/callback" + - name: nats_url + {{- if .Values.nats.external.enabled }} + value: "nats://{{ .Values.nats.external.host}}:{{ .Values.nats.external.port }}" + {{- else }} + value: "nats://nats.openfaas:4222" + {{- end }} {{- if .Values.basic_auth }} - name: basic_auth value: "true" diff --git a/chart/kafka-connector/templates/rbac.yaml b/chart/kafka-connector/templates/rbac.yaml new file mode 100644 index 0000000000..bff09eee84 --- /dev/null +++ b/chart/kafka-connector/templates/rbac.yaml @@ -0,0 +1,40 @@ +{{- if and .Values.asyncInvocation (gt (int .Values.async.maxInflight) 0) }} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ template "connector.fullname" . }} + namespace: {{ .Release.Namespace | quote }} + labels: + app: {{ template "connector.fullname" . }} + component: kafka-connector +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: {{ template "connector.fullname" . }} + namespace: {{ .Release.Namespace | quote }} + labels: + app: {{ template "connector.name" . }} + component: kafka-connector +rules: + - apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "create"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ template "connector.fullname" . }} + namespace: {{ .Release.Namespace | quote }} + labels: + app: {{ template "connector.fullname" . }} + component: kafka-connector +subjects: + - kind: ServiceAccount + name: {{ template "connector.fullname" . }} + namespace: {{ .Release.Namespace | quote }} +roleRef: + kind: Role + name: {{ template "connector.fullname" . }} + apiGroup: rbac.authorization.k8s.io +{{- end }} diff --git a/chart/kafka-connector/templates/service.yaml b/chart/kafka-connector/templates/service.yaml new file mode 100644 index 0000000000..b1ce47820e --- /dev/null +++ b/chart/kafka-connector/templates/service.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: {{ template "connector.name" . }} + component: kafka-connector + chart: {{ .Chart.Name }}-{{ .Chart.Version }} + heritage: {{ .Release.Service }} + release: {{ .Release.Name }} + name: {{ template "connector.fullname" . }} + namespace: {{ .Release.Namespace | quote }} +spec: + type: ClusterIP + ports: + - name: http + port: 8080 + protocol: TCP + targetPort: 8080 + selector: + app: kafka-connector \ No newline at end of file diff --git a/chart/kafka-connector/values.yaml b/chart/kafka-connector/values.yaml index 1c5e72712a..1286bc8092 100644 --- a/chart/kafka-connector/values.yaml +++ b/chart/kafka-connector/values.yaml @@ -30,9 +30,14 @@ upstreamTimeout: 2m # interval for rebuilding the map of functions and topics rebuildInterval: 30s -# Use with slow consumers or long running functions +# Invoke functions asynchronously. asyncInvocation: false +async: + # Limit the number of inflight async invocations for the connector. + # A value of 0 indicates no concurrency limit. + maxInflight: 0 + # 1MB = 1024 bytes * 1024 maxBytes: "1048576" @@ -76,6 +81,18 @@ gatewayURL: http://gateway.openfaas:8080 # Basic auth for the gateway basic_auth: true +# NATS is used for async invocations and is required when +# setting the 'async.maxInflight' parameter to a value other than 0. +nats: + # Configure an externally-managed NATS server. + # When disabled the OpenFaaS embedded nats deployment is used. + # These values should be identical to the configuration in your OpenFaaS deployment values.yaml file + # when external nats is enabled. + external: + enabled: false + host: "" + port: "" + nodeSelector: {} tolerations: [] @@ -138,4 +155,4 @@ keySecret: "" # caSecret: kafka-broker-ca # certSecret: kafka-broker-cert -# keySecret: kafka-broker-key +# keySecret: kafka-broker-key \ No newline at end of file