Skip to content

Commit 38a3146

Browse files
adwk67dervoeti
andauthored
feat: Airflow with Event-scheduling, Triggerer, Opa (#320)
* wip: kafka dag * working scheduled event with kafka/tls * added deferrable DAG and opa rules/users * added screenshots * opa docs * docs fine-tuning * restore yaml links * Update stacks/airflow/opa-rules.yaml Co-authored-by: Lukas Krug <lukas.krug@stackable.tech> * review feedback --------- Co-authored-by: Lukas Krug <lukas.krug@stackable.tech>
1 parent 26e5a40 commit 38a3146

32 files changed

+607
-103
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
---
2+
apiVersion: rbac.authorization.k8s.io/v1
3+
kind: ClusterRole
4+
metadata:
5+
name: airflow-demo-clusterrole
6+
rules:
7+
- apiGroups:
8+
- spark.stackable.tech
9+
resources:
10+
- sparkapplications
11+
verbs:
12+
- create
13+
- get
14+
- list
15+
- apiGroups:
16+
- apps
17+
resources:
18+
- statefulsets
19+
verbs:
20+
- get
21+
- watch
22+
- list
23+
- apiGroups:
24+
- ""
25+
resources:
26+
- persistentvolumeclaims
27+
verbs:
28+
- list
29+
- apiGroups:
30+
- ""
31+
resources:
32+
- pods
33+
verbs:
34+
- get
35+
- watch
36+
- list
37+
- apiGroups:
38+
- ""
39+
resources:
40+
- pods/exec
41+
verbs:
42+
- create

demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml

Lines changed: 0 additions & 36 deletions
This file was deleted.

demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml renamed to demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
apiVersion: rbac.authorization.k8s.io/v1
33
kind: ClusterRoleBinding
44
metadata:
5-
name: airflow-spark-clusterrole-binding
5+
name: airflow-demo-clusterrole-binding
66
roleRef:
77
apiGroup: rbac.authorization.k8s.io
88
kind: ClusterRole
9-
name: airflow-spark-clusterrole
9+
name: airflow-demo-clusterrole
1010
subjects:
1111
- apiGroup: rbac.authorization.k8s.io
1212
kind: Group

demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ spec:
99
containers:
1010
- name: start-pyspark-job
1111
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
12-
# N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly
13-
# restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps
14-
# below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that
15-
# all pods are reachable (albeit independent of each other).
12+
# N.B. it is possible for the scheduler to report that a DAG exists,
13+
# only for the worker task to fail if a pod is unexpectedly
14+
# restarted. The wait/watch steps below are not "water-tight" but add
15+
# a layer of stability by at least ensuring that the cluster is
16+
# initialized and ready and that all pods are reachable (albeit
17+
# independent of each other).
1618
command:
1719
- bash
1820
- -euo

demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ spec:
99
containers:
1010
- name: start-date-job
1111
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
12-
# N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly
13-
# restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps
14-
# below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that
15-
# all pods are reachable (albeit independent of each other).
12+
# N.B. it is possible for the scheduler to report that a DAG exists,
13+
# only for the worker task to fail if a pod is unexpectedly
14+
# restarted. The wait/watch steps below are not "water-tight" but add
15+
# a layer of stability by at least ensuring that the cluster is
16+
# initialized and ready and that all pods are reachable (albeit
17+
# independent of each other).
1618
command:
1719
- bash
1820
- -euo
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
---
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: start-kafka-job
6+
spec:
7+
template:
8+
spec:
9+
containers:
10+
- name: start-kafka-job
11+
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
12+
env:
13+
- name: NAMESPACE
14+
valueFrom:
15+
fieldRef:
16+
fieldPath: metadata.namespace
17+
# N.B. it is possible for the scheduler to report that a DAG exists,
18+
# only for the worker task to fail if a pod is unexpectedly
19+
# restarted. The wait/watch steps below are not "water-tight" but add
20+
# a layer of stability by at least ensuring that the cluster is
21+
# initialized and ready and that all pods are reachable (albeit
22+
# independent of each other).
23+
command:
24+
- bash
25+
- -euo
26+
- pipefail
27+
- -c
28+
- |
29+
# Kafka: wait for cluster
30+
kubectl rollout status --watch statefulset/kafka-broker-default
31+
kubectl rollout status --watch statefulset/kafka-controller-default
32+
33+
# Kafka: create consumer offsets topics (required for group coordinator)
34+
kubectl exec kafka-broker-default-0 -c kafka -- \
35+
/stackable/kafka/bin/kafka-topics.sh \
36+
--bootstrap-server kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093 \
37+
--create \
38+
--if-not-exists \
39+
--topic __consumer_offsets \
40+
--partitions 50 \
41+
--replication-factor 1 \
42+
--config cleanup.policy=compact \
43+
--command-config /stackable/config/client.properties
44+
45+
# Airflow: wait for cluster
46+
kubectl rollout status --watch statefulset/airflow-webserver-default
47+
kubectl rollout status --watch statefulset/airflow-scheduler-default
48+
49+
# Airflow: activate DAG
50+
AIRFLOW_ADMIN_PASSWORD=$(cat /airflow-credentials/adminUser.password)
51+
ACCESS_TOKEN=$(curl -XPOST http://airflow-webserver-default-headless:8080/auth/token -H 'Content-Type: application/json' -d '{"username": "admin", "password": "'$AIRFLOW_ADMIN_PASSWORD'"}' | jq -r .access_token)
52+
curl -H "Authorization: Bearer $ACCESS_TOKEN" -H 'Content-Type: application/json' -XPATCH http://airflow-webserver-default-headless:8080/api/v2/dags/kafka_watcher -d '{"is_paused": false}' | jq
53+
54+
# Kafka: produce a message to create the topic
55+
kubectl exec kafka-broker-default-0 -c kafka -- bash -c \
56+
'echo "Hello World at: $(date)" | /stackable/kafka/bin/kafka-console-producer.sh \
57+
--bootstrap-server kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093 \
58+
--producer.config /stackable/config/client.properties \
59+
--topic test-topic'
60+
volumeMounts:
61+
- name: airflow-credentials
62+
mountPath: /airflow-credentials
63+
volumes:
64+
- name: airflow-credentials
65+
secret:
66+
secretName: airflow-credentials
67+
restartPolicy: OnFailure
68+
backoffLimit: 20 # give some time for the Airflow cluster to be available
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
---
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: start-users-job
6+
spec:
7+
template:
8+
spec:
9+
containers:
10+
- name: start-users-job
11+
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
12+
# N.B. it is possible for the scheduler to report that a DAG exists,
13+
# only for the worker task to fail if a pod is unexpectedly
14+
# restarted. The wait/watch steps below are not "water-tight" but add
15+
# a layer of stability by at least ensuring that the cluster is
16+
# initialized and ready and that all pods are reachable (albeit
17+
# independent of each other).
18+
command:
19+
- bash
20+
- -euo
21+
- pipefail
22+
- -c
23+
- |
24+
# Airflow: wait for cluster
25+
kubectl rollout status --watch statefulset/airflow-webserver-default
26+
kubectl rollout status --watch statefulset/airflow-scheduler-default
27+
28+
# Airflow: create users
29+
kubectl exec airflow-webserver-default-0 -- airflow users create \
30+
--username "jane.doe" \
31+
--firstname "Jane" \
32+
--lastname "Doe" \
33+
--email "jane.doe@stackable.tech" \
34+
--password "jane.doe" \
35+
--role "User"
36+
37+
kubectl exec airflow-webserver-default-0 -- airflow users create \
38+
--username "richard.roe" \
39+
--firstname "Richard" \
40+
--lastname "Roe" \
41+
--email "richard.roe@stackable.tech" \
42+
--password "richard.roe" \
43+
--role "User"
44+
volumeMounts:
45+
- name: airflow-credentials
46+
mountPath: /airflow-credentials
47+
volumes:
48+
- name: airflow-credentials
49+
secret:
50+
secretName: airflow-credentials
51+
restartPolicy: OnFailure
52+
backoffLimit: 20 # give some time for the Airflow cluster to be available

demos/demos-v2.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ demos:
4646
- airflow
4747
- job-scheduling
4848
manifests:
49-
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml
50-
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml
49+
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/01-airflow-demo-clusterrole.yaml
50+
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml
5151
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml
5252
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml
53+
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml
54+
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/06-create-opa-users.yaml
5355
supportedNamespaces: []
5456
resourceRequests:
5557
cpu: 2401m
163 KB
Loading
85.8 KB
Loading

0 commit comments

Comments
 (0)