|
9 | 9 | productVersion: 3.0.6 |
10 | 10 | pullPolicy: IfNotPresent |
11 | 11 | clusterConfig: |
| 12 | + authorization: |
| 13 | + opa: |
| 14 | + configMapName: opa-airflow |
| 15 | + package: airflow |
| 16 | + cache: |
| 17 | + entryTimeToLive: 5s |
| 18 | + maxEntries: 10 |
12 | 19 | loadExamples: false |
13 | | - exposeConfig: false |
14 | 20 | credentialsSecret: airflow-credentials |
15 | 21 | volumes: |
16 | 22 | - name: airflow-dags |
|
32 | 38 | - name: airflow-dags |
33 | 39 | mountPath: /dags/kafka.py |
34 | 40 | subPath: kafka.py |
| 41 | + - name: airflow-dags |
| 42 | + mountPath: /dags/triggerer.py |
| 43 | + subPath: triggerer.py |
35 | 44 | - name: kafka-tls-pem |
36 | 45 | mountPath: /stackable/kafka-tls-pem |
37 | 46 | webservers: |
|
48 | 57 | AIRFLOW__CORE__DAGS_FOLDER: "/dags" |
49 | 58 | PYTHONPATH: "/stackable/app/log_config:/dags" |
50 | 59 | AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" |
51 | | - #AIRFLOW_CONN_KAFKA_CONN: "{\"conn_type\": \"kafka\", \"extra\": {\"bootstrap.servers\": \"kafka-broker-default-0-listener-broker.{{ NAMESPACE }}.svc.cluster.local:9093\", \"security.protocol\": \"SSL\", \"ssl.ca.location\": \"/stackable/kafka-tls-pem/ca.crt\", \"group.id\": \"airflow_group\", \"auto.offset.reset\": \"latest\"}}" |
| 60 | + # Airflow 3: Disable decision caching for easy debugging |
| 61 | + AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0" |
| 62 | + configOverrides: |
| 63 | + webserver_config.py: |
| 64 | + # Allow "POST /login/" without CSRF token |
| 65 | + WTF_CSRF_ENABLED: "False" |
52 | 66 | podOverrides: &podOverrides |
53 | 67 | spec: |
54 | 68 | containers: |
|
66 | 80 | default: |
67 | 81 | replicas: 1 |
68 | 82 | kubernetesExecutors: |
| 83 | + # do not apply the podOverrides here as we don't need and it will interfere |
| 84 | + # with the pod template |
69 | 85 | envOverrides: *envOverrides |
70 | 86 | schedulers: |
71 | 87 | envOverrides: *envOverrides |
@@ -127,6 +143,61 @@ data: |
127 | 143 | with DAG(dag_id="kafka_watcher", schedule=[asset]) as dag: |
128 | 144 | EmptyOperator(task_id="task") |
129 | 145 |
|
| 146 | + triggerer.py: | |
| 147 | + from datetime import datetime, timedelta |
| 148 | +
|
| 149 | + from airflow import DAG |
| 150 | + from airflow.models.baseoperator import BaseOperator |
| 151 | + from airflow.triggers.temporal import TimeDeltaTrigger |
| 152 | + from airflow.utils.context import Context |
| 153 | + from airflow.operators.empty import EmptyOperator |
| 154 | +
|
| 155 | + # ------------------------------------------------------ |
| 156 | + # Custom deferrable operator - does a simple async sleep |
| 157 | + # ------------------------------------------------------ |
| 158 | + class CoreDeferrableSleepOperator(BaseOperator): |
| 159 | + """ |
| 160 | + Sleeps for ``duration`` seconds without occupying a worker. |
| 161 | + The async hand-off happens via ``self.defer`` + ``TimeDeltaTrigger``. |
| 162 | + """ |
| 163 | + ui_color = "#ffefeb" |
| 164 | +
|
| 165 | + def __init__(self, *, duration: int, **kwargs): |
| 166 | + super().__init__(**kwargs) |
| 167 | + self.duration = duration |
| 168 | +
|
| 169 | + def execute(self, context: Context): |
| 170 | + """Run on a worker, then hand control to the Triggerer.""" |
| 171 | + # Build the trigger that will fire after `duration` seconds. |
| 172 | + trigger = TimeDeltaTrigger(timedelta(seconds=self.duration)) |
| 173 | +
|
| 174 | + # *** Asynchronous hand-off *** |
| 175 | + # This tells the scheduler: “pause this task, let the Triggerer watch the timer”. |
| 176 | + self.defer(trigger=trigger, method_name="execute_complete") |
| 177 | +
|
| 178 | + def execute_complete(self, context: Context, event=None): |
| 179 | + """Resumes here once the Triggerer fires.""" |
| 180 | + self.log.info("Deferrable sleep of %s seconds finished.", self.duration) |
| 181 | + return "DONE" |
| 182 | +
|
| 183 | + default_args = {"owner": "stackable", "retries": 0} |
| 184 | +
|
| 185 | + with DAG( |
| 186 | + dag_id="core_deferrable_sleep_demo", |
| 187 | + schedule=None, |
| 188 | + # N.B. this be earlier than the current timestamp! |
| 189 | + start_date=datetime(2025, 8, 1), |
| 190 | + catchup=False, |
| 191 | + default_args=default_args, |
| 192 | + tags=["example", "triggerer"], |
| 193 | + ) as dag: |
| 194 | +
|
| 195 | + sleep = CoreDeferrableSleepOperator( |
| 196 | + task_id="deferrable_sleep", |
| 197 | + duration=10, |
| 198 | + ) |
| 199 | +
|
| 200 | + sleep |
130 | 201 | date_demo.py: | |
131 | 202 | """Example DAG returning the current date""" |
132 | 203 | from datetime import datetime, timedelta |
|
0 commit comments