-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add KubernetesPodOperator & DockerOperator management #133
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making this @juldrixx! Can you please add some tests for the new functionality? We'll have CI/CD set up soon so even basic ones suffice for now. Also is there an example you can give us so we can add it to our integration test suite?
I'm not sure how to implement tests 😕. For documentation/example, I could also provide some but the implementation of docker and kubernetes depends on a Docker image. That I can't make public on my side 😕. But I can give you the Dag example and the Dockerfile to build the image. In my examples, I'm running DBT on BigQuery. Execution mode = dockerDagfrom pendulum import datetime
from airflow import DAG
from cosmos.providers.dbt.dag import DbtDag
# dag for the project jaffle_shop
jaffle_shop = DbtDag(
execution_mode="docker",
dbt_project_name="jaffle_shop",
dbt_root_path="/home/juldrixx/Documents/sandbox/dbt-tutorial",
conn_id="jaffle_shop",
dbt_args={
"schema": "dbt_development",
"image": "dbt/jaffle_shop:1.0.0",
"environment": {
"ENVIRONMENT_MODE": "dev",
},
"dbt_cmd_flags": {
"project_dir": "/app",
}
},
dbt_profiles_dir= "/conf",
dag_id="jaffle_shop_docker",
start_date=datetime(2023, 2, 13),
) Task Groupfrom pendulum import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from cosmos.providers.dbt.task_group import DbtTaskGroup
with DAG(
dag_id="jaffle_shop_docker_tg",
start_date=datetime(2023, 2, 13),
schedule="@daily",
) as dag:
start = EmptyOperator(task_id="start")
dbt_tg = DbtTaskGroup(
execution_mode="docker",
group_id="dbt_tg",
dbt_project_name="jaffle_shop",
dbt_root_path="/home/juldrixx/Documents/sandbox/dbt-tutorial",
conn_id="jaffle_shop",
dbt_args={
"schema": "dbt_development",
"image": "dbt/jaffle_shop:1.0.0",
"environment": {
"ENVIRONMENT_MODE": "dev",
},
"dbt_cmd_flags": {
"project_dir": "/app",
}
},
dbt_profiles_dir= "/conf",
dag=dag,
)
end = EmptyOperator(task_id="end")
start >> dbt_tg >> end Execution mode = kubernetesDagfrom pendulum import datetime
from airflow import DAG
from cosmos.providers.dbt.dag import DbtDag
# dag for the project jaffle_shop
jaffle_shop = DbtDag(
execution_mode="kubernetes",
dbt_project_name="jaffle_shop",
dbt_root_path="/home/juldrixx/Documents/sandbox/dbt-tutorial",
conn_id="jaffle_shop",
dbt_args={
"schema": "dbt_development",
"image": "europe-west1-docker.pkg.dev/<GCP_PROJECT_ID>/dbt/jaffle_shop:1.0.0",
"env_vars": {
"ENVIRONMENT_MODE": "dev",
},
"dbt_cmd_flags": {
"project_dir": "/app"
},
"namespace": "dbt-run",
"service_account_name": "dbt-sa",
},
dbt_profiles_dir= "/conf",
dag_id="jaffle_shop_kubernetes",
start_date=datetime(2023, 2, 13),
) Task Groupfrom pendulum import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from cosmos.providers.dbt.task_group import DbtTaskGroup
from airflow.models import Variable
with DAG(
dag_id="jaffle_shop_kubernetes_tg",
start_date=datetime(2023, 2, 13),
schedule="@daily",
) as dag:
start = EmptyOperator(task_id="start")
dbt_tg = DbtTaskGroup(
execution_mode="kubernetes",
group_id="dbt_tg",
dbt_project_name="jaffle_shop",
dbt_root_path="/home/juldrixx/Documents/sandbox/dbt-tutorial",
conn_id="jaffle_shop",
dbt_args={
"schema": "dbt_development",
"image": "europe-west1-docker.pkg.dev/<GCP_PROJECT_ID>/dbt/jaffle_shop:1.0.0",
"env_vars": {
"ENVIRONMENT_MODE": "dev",
},
"dbt_cmd_flags": {
"project_dir": "/app",
},
"namespace": "dbt-run",
"service_account_name": "dbt-sa",
},
dbt_profiles_dir= "/conf",
dag=dag,
)
end = EmptyOperator(task_id="end")
start >> dbt_tg >> end Dockerfilerequirements.txtdbt-bigquery
dbt-core profiles.ymlbigquery:
outputs:
dev:
type: bigquery
method: oauth
project: {{ PROJECT_ID }}
dataset: {{ DATASET_ID }}
target: dev Execution mode = dockerFROM python:3.9
ARG PROJECT_ID
ARG DATASET_ID
RUN mkdir /conf/
COPY requirements.txt /conf/
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir --root-user-action=ignore -r /conf/requirements.txt \
&& pip install --no-cache-dir envtpl
COPY key.json /conf/
ENV PROJECT_ID $PROJECT_ID
ENV DATASET_ID $DATASET_ID
ENV GOOGLE_APPLICATION_CREDENTIALS=/conf/key.json
COPY profiles.yml.tpl /conf/
RUN envtpl < /conf/profiles.yml.tpl > /conf/profiles.yml
# Set working directory
WORKDIR /app
COPY jaffle_shop .
# Override dbt_project.yml to use the profile defined in th profiles.yml.tpl
COPY dbt_project.yml . Execution mode = kubernetesFROM python:3.9
ARG PROJECT_ID
ARG DATASET_ID
RUN mkdir /conf/
COPY requirements.txt /conf/
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir --root-user-action=ignore -r /conf/requirements.txt \
&& pip install --no-cache-dir envtpl
ENV PROJECT_ID $PROJECT_ID
ENV DATASET_ID $DATASET_ID
COPY profiles.yml.tpl /conf/
RUN envtpl < /conf/profiles.yml.tpl > /conf/profiles.yml
# Set working directory
WORKDIR /app
COPY jaffle_shop .
# Override dbt_project.yml to use the profile defined in th profiles.yml.tpl
COPY dbt_project.yml . |
Hey @juldrixx so I've got kubernetes and docker tests to pass, but I'm not quite understanding the logic of how we pass environment variables into the kubernetes pod. Can you please look into this a bit? Just want to make sure I'm not missing something. |
To pass environment variables into the kubernetes pod, you need to set: dbt_args={
...
"env_vars": {
...
"ENVIRONMENT_MODE": "dev",
}, In fact we just have to use the attributes from the KubernetesPodOperator that we inherit. And if we need to add the one calculated we have function that convert them: https://github.com/astronomer/astronomer-cosmos/pull/133/files#diff-e47d9c838c394e4fe22fb19f7ec2a8c05b100cdf3284f82997ba372df09600e4R39 |
Ahh perfect, thank you for clarifying. I've added an env var to the test. |
…d is 'dbt' on a docker image (this can be changed in a future ticket)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs are fixed and tests are passing, let's do this 🚀
The KPO and DockerOperator weren't tested well enough prior to merging, and there's still some work to be done around connection sharing, model sharing, etc. Reverting this for now. Reverts #133
As #133 has been reverted I'm reopenning it to restart the discussion around this feature. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: dimerman <danielryan2430@gmail.com>
This pull request creates two operators,
DbtKubernetesBaseOperator
andDbtDockerBaseOperator
, to clone the logic in theDbtBaseOperator
with the same subclasses (DbtLSOperator
,DbtSeedOperator
,DbtRunOperator
, ...) but to use aKubernetesPodOperator
or aDockerOperator
.I'm trying to meet a community need (see #128 or #97).
My PR is not perfect at all I think, but I will open it to start discussions and make it evolve to improve it as these discussions go on.