Skip to content

Commit

Permalink
Merge pull request #12 from tuanavu/development
Browse files Browse the repository at this point in the history
Airflow variables tutorial
  • Loading branch information
tuanavu authored Feb 17, 2019
2 parents 86da3c9 + d5eac3a commit 9784664
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 6 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ Check http://localhost:8080/

## Other commands

If you want to run other airflow sub-commands, you can do so like this:
If you want to run airflow sub-commands, you can do so like this:

- `docker-compose run --rm webserver airflow list_dags` - List dags
- `docker-compose run --rm webserver airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE]` - Test specific task

If you want to run/test python script, you can do so like this:
- `docker-compose run --rm webserver python /usr/local/airflow/dags/[PYTHON-FILE].py` - Test python script

## Connect to database

If you want to use Ad hoc query, make sure you've configured connections:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@
from datetime import timedelta, datetime

from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator


# Config variables
dag_config = Variable.get("bigquery_github_trends_variables", deserialize_json=True)
BQ_CONN_ID = dag_config["bq_conn_id"]
BQ_PROJECT = dag_config["bq_project"]
BQ_DATASET = dag_config["bq_dataset"]

default_args = {
'owner': 'airflow',
'depends_on_past': True,
Expand All @@ -28,11 +36,6 @@
schedule_interval=schedule_interval
)

# Config variables
BQ_CONN_ID = "my_gcp_conn"
BQ_PROJECT = "my-bq-project"
BQ_DATASET = "my-bq-dataset"

## Task 1: check that the github archive data has a dated table created for that date
# To test this task, run this command:
# docker-compose -f docker-compose-gcloud.yml run --rm webserver airflow test bigquery_github_trends bq_check_githubarchive_day 2018-12-01
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"bigquery_github_trends_variables": {
"bq_conn_id": "my_gcp_conn",
"bq_project": "my_bq_project",
"bq_dataset": "my_bq_dataset"
}
}
4 changes: 4 additions & 0 deletions examples/gcloud-example/dags/support/keys/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore
9 changes: 9 additions & 0 deletions examples/intro-example/dags/config/example_variables.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"example_variables_config": {
"var1": "value1",
"var2": [1, 2, 3],
"var3": {
"k": "value3"
}
}
}
67 changes: 67 additions & 0 deletions examples/intro-example/dags/example_variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import print_function

from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator

default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 2, 15),
'end_date': datetime(2019, 2, 15)
}

dag = DAG('example_variables',
schedule_interval="@once",
default_args=default_args)


# Config variables
## Common
# var1 = "value1"
# var2 = [1, 2, 3]
# var3 = {'k': 'value3'}

## 3 DB connections called
# var1 = Variable.get("var1")
# var2 = Variable.get("var2")
# var3 = Variable.get("var3")

## Recommended way
dag_config = Variable.get("example_variables_config", deserialize_json=True)
var1 = dag_config["var1"]
var2 = dag_config["var2"]
var3 = dag_config["var3"]

start = DummyOperator(
task_id="start",
dag=dag
)

# To test this task, run this command:
# docker-compose run --rm webserver airflow test example_variables get_dag_config 2019-02-15
t1 = BashOperator(
task_id="get_dag_config",
bash_command='echo "{0}"'.format(dag_config),
dag=dag,
)

# You can directly use a variable from a jinja template
## {{ var.value.<variable_name> }}

t2 = BashOperator(
task_id="get_variable_value",
bash_command='echo {{ var.value.var3 }} ',
dag=dag,
)

## {{ var.json.<variable_name> }}
t3 = BashOperator(
task_id="get_variable_json",
bash_command='echo {{ var.json.example_variables_config.var3 }} ',
dag=dag,
)

start >> [t1, t2, t3]

0 comments on commit 9784664

Please sign in to comment.