Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kafka-connector chart for async invocations with backpressure #1233

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 94 additions & 35 deletions chart/kafka-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ $ helm repo add openfaas https://openfaas.github.io/faas-netes/

Prepare a custom [values.yaml](values.yaml) with:

* brokerHosts - comma separted list of host:port
* topics - the topics to subscribe to
* replicas - this should match the partition size, so if the size is 3, set this to 3
- brokerHosts - comma separted list of host:port
- topics - the topics to subscribe to
- replicas - this should match the partition size, so if the size is 3, set this to 3

Then you will need to read up on the encryption and authentication options and update the settings accordingly.

Expand All @@ -65,46 +65,105 @@ $ helm repo update && \

## Encryption options

1) TLS off (default)
2) TLS on
1. TLS off (default)
2. TLS on

## Authentication options

1) TLS with SASL using CA from the default trust store
3) TLS with SASL using a custom CA
4) TLS with client certificates
1. TLS with SASL using CA from the default trust store
2. TLS with SASL using a custom CA
3. TLS with client certificates

## Async invocations

The connector can be configured to invoke function asynchronously. This lets you use [OpenFaaS async](https://docs.openfaas.com/reference/async/) features like retries.
To prevent the connector from consuming all Kafka messages at once and submitting them to the OpenFaaS async queue a limit on the number of inflight async invocations can be configured.

Configure the connector for async invocations:

```yaml
# Invoke functions asynchronously.
asyncInvocation: true

async:
# Limit the number of inflight async invocations for the connector.
# A value of 0 indicates no concurrency limit.
maxInflight: 0

# Configure an externally-managed NATS server.
# NATS is used for async invocations and is required when
# setting the 'async.maxInflight' parameter to a value other than 0.
# By default the OpenFaaS embedded nats deployment is used.
# These values should be identical to the configuration in your OpenFaaS deployment values.yaml file
# when external nats is enabled.
nats:
external:
enabled: false
host: ""
port: ""
```

### Reset the inflight concurrency counter

If the inflight counter gets out if sync for some reason, e.g a misconfiguration, network issues, it can be forcefully reset.
The connecter checks if a Lease object exists on startup and resets the counter if the Lease does not exist.

Remove the lease and restart the connector to reset the counter.

1. Remove the lease

```sh
$ kubectl get lease -n openfaas

NAME HOLDER AGE
kafka-connector 18m
```

```sh
kubectl delete lease kafka-connector -n openfaas
```

2. Restart the connector

```sh
kubectl rollout restart deploy/kafka-connector -n openfaas
```

## Configuration

Additional kafka-connector options in `values.yaml`.

| Parameter | Description | Default |
|------------------------|------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
| `asyncInvocation` | For long running or slow functions, offload to asychronous function invocations and carry on processing the stream | `false` |
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
| `printResponse` | Output the response of calling a function in the logs. | `true` |
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `` |
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
| `sasl` | Enable auth with a SASL username/password | `false` |
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
| `group` | Set the Kafka consumer group name. | `""` |
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
| `initialOffset` | Either newest or oldest. | `"oldest"` |
| `logs.debug` | Print debug logs | `false` |
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |
| Parameter | Description | Default |
| ----------------------- | ---------------------------------------------------------------------------------------------------------------------------------- | ------------------------------ |
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
| `asyncInvocation` | Invoke function asychronously and carry on processing the stream | `false` |
| `async.maxInflight` | Limit the number of inflight async invocations for the connector. A value of 0 indicates no concurrency limit. | `0` |
| `nats.external.enabled` | Whether to use an externally-managed NATS server. | `false` |
| `nats.external.host` | The host at which the externally-managed NATS server can be reached | `""` |
| `nats.external.port` | The port at which the externally-managed NATS server can be reached | `""` |
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
| `printResponse` | Output the response of calling a function in the logs. | `true` |
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `""` |
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
| `sasl` | Enable auth with a SASL username/password | `false` |
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
| `group` | Set the Kafka consumer group name. | `""` |
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
| `initialOffset` | Either newest or oldest. | `"oldest"` |
| `logs.debug` | Print debug logs | `false` |
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |

Specify each parameter using the `--set key=value[,key=value]` argument to `helm install`. See `values.yaml` for the default configuration.

Expand Down
19 changes: 19 additions & 0 deletions chart/kafka-connector/templates/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ spec:
app: {{ template "connector.name" . }}
component: kafka-connector
spec:
{{- if and .Values.asyncInvocation (gt (int .Values.async.maxInflight) 0) }}
serviceAccountName: {{ template "connector.fullname" . }}
{{- end }}
volumes:
- name: openfaas-license
secret:
Expand Down Expand Up @@ -87,6 +90,12 @@ spec:
- "-key-file=/var/secrets/broker-key/broker-key"
{{- end }}
env:
- name: connector_id
value: "{{template "connector.fullname" . }}"
- name: namespace
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: gateway_url
value: {{ .Values.gatewayURL | quote }}
- name: topics
Expand All @@ -99,6 +108,16 @@ spec:
value: {{ .Values.printRequestBody | quote }}
- name: asynchronous_invocation
value: {{ .Values.asyncInvocation | quote }}
- name: async_max_inflight
value: {{ .Values.async.maxInflight | quote }}
- name: async_callback_url
value: "http://{{ template "connector.fullname" . }}.{{ .Release.Namespace }}:8080/api/v1/callback"
- name: nats_url
{{- if .Values.nats.external.enabled }}
value: "nats://{{ .Values.nats.external.host}}:{{ .Values.nats.external.port }}"
{{- else }}
value: "nats://nats.openfaas:4222"
{{- end }}
{{- if .Values.basic_auth }}
- name: basic_auth
value: "true"
Expand Down
40 changes: 40 additions & 0 deletions chart/kafka-connector/templates/rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{{- if and .Values.asyncInvocation (gt (int .Values.async.maxInflight) 0) }}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.fullname" . }}
component: kafka-connector
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.name" . }}
component: kafka-connector
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "create"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.fullname" . }}
component: kafka-connector
subjects:
- kind: ServiceAccount
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
roleRef:
kind: Role
name: {{ template "connector.fullname" . }}
apiGroup: rbac.authorization.k8s.io
{{- end }}
20 changes: 20 additions & 0 deletions chart/kafka-connector/templates/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: v1
kind: Service
metadata:
labels:
app: {{ template "connector.name" . }}
component: kafka-connector
chart: {{ .Chart.Name }}-{{ .Chart.Version }}
heritage: {{ .Release.Service }}
release: {{ .Release.Name }}
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
spec:
type: ClusterIP
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080
selector:
app: kafka-connector
21 changes: 19 additions & 2 deletions chart/kafka-connector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ upstreamTimeout: 2m
# interval for rebuilding the map of functions and topics
rebuildInterval: 30s

# Use with slow consumers or long running functions
# Invoke functions asynchronously.
asyncInvocation: false

async:
# Limit the number of inflight async invocations for the connector.
# A value of 0 indicates no concurrency limit.
maxInflight: 0

# 1MB = 1024 bytes * 1024
maxBytes: "1048576"

Expand Down Expand Up @@ -76,6 +81,18 @@ gatewayURL: http://gateway.openfaas:8080
# Basic auth for the gateway
basic_auth: true

# NATS is used for async invocations and is required when
# setting the 'async.maxInflight' parameter to a value other than 0.
nats:
# Configure an externally-managed NATS server.
# When disabled the OpenFaaS embedded nats deployment is used.
# These values should be identical to the configuration in your OpenFaaS deployment values.yaml file
# when external nats is enabled.
external:
enabled: false
host: ""
port: ""

nodeSelector: {}

tolerations: []
Expand Down Expand Up @@ -138,4 +155,4 @@ keySecret: ""

# caSecret: kafka-broker-ca
# certSecret: kafka-broker-cert
# keySecret: kafka-broker-key
# keySecret: kafka-broker-key
Loading