Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(ingest): add example DAGs for Airflow #2116

Merged
merged 2 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

This module hosts an extensible Python-based metadata ingestion system for DataHub.
This supports sending data to DataHub using Kafka or through the REST api.
It can be used through our CLI tool or as a library e.g. with an orchestrator like Airflow.

## Architecture

Expand Down Expand Up @@ -55,6 +56,10 @@ source docker/docker_run.sh examples/recipes/file_to_file.yml
```
-->

We have also included a couple [sample DAGs](./examples/airflow) that can be used with [Airflow](https://airflow.apache.org/).
- `generic_recipe_sample_dag.py` - a simple Airflow DAG that picks up a DataHub ingestion recipe configuration and runs it.
- `mysql_sample_dag.py` - an Airflow DAG that runs a MySQL metadata ingestion pipeline using an inlined configuration.

# Recipes

A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink).
Expand Down
50 changes: 50 additions & 0 deletions metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Generic DataHub Ingest via Recipe

This example demonstrates how to load any configuration file and run a
DataHub ingestion pipeline within an Airflow DAG.
"""

from datetime import timedelta

import yaml

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

from datahub.ingestion.run.pipeline import Pipeline


default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["jdoe@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}


def datahub_recipe():
with open("path/to/recipe.yml") as config_file:
config = yaml.safe_load(config_file)

pipeline = Pipeline.create(config)
pipeline.run()


with DAG(
"datahub_ingest_using_recipe",
default_args=default_args,
description="An example DAG which runs a DataHub ingestion recipe",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=datahub_recipe,
)
62 changes: 62 additions & 0 deletions metadata-ingestion/examples/airflow/mysql_sample_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""MySQL DataHub Ingest DAG

This example demonstrates how to ingest metadata from MySQL into DataHub
from within an Airflow DAG. Note that the DB connection configuration is
embedded within the code.
"""

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

from datahub.ingestion.run.pipeline import Pipeline


default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["jdoe@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}


def ingest_from_mysql():
pipeline = Pipeline.create(
{
"source": {
"type": "mysql",
"config": {
"username": "user",
"password": "pass",
"database": "db_name",
"host_port": "localhost:3306",
},
},
"sink": {
"type": "datahub-rest",
"config": {"server": "http://localhost:8080"},
},
}
)
pipeline.run()


with DAG(
"datahub_mysql_ingest",
default_args=default_args,
description="An example DAG which ingests metadata from MySQL to DataHub",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_from_mysql",
python_callable=ingest_from_mysql,
)