Skip to content

Commit

Permalink
Added @schedule support + readme
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave committed Mar 16, 2022
1 parent 57bdde5 commit 68f13be
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
15 changes: 9 additions & 6 deletions metaflow/plugins/airflow/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
# Airflow Integration

## Compute Support :
- K8s

## How does it work ?
The cli provides a `airflow create` command which helps convert a metaflow flow into an Airflow Workflow file. In the below example `a.py` will be an airflow workflow.
```sh
# create python file
python section_exp.py airflow create a.py
```
Once `a.py` is created it can be placed into the Airflow's dags folder. From there the tasks will be orchestrated onto kubernetes.

## How does it work :
1. Metaflow converts the flow into a intermediate JSON like format and then writes it to a python file template provided in `af_deploy.py`.
2. The JSON in the compiled airflow file will be used to compile the Airflow DAG at runtime. Even though airflow provides `airflow.serialization.serialized_objects.SerializedDAG`, we cannot use this to directly serialize the DAG. This class can only be used in the scope of scheduler and webserver
## Details of workings.

## Some Pointers Before Running Scheduler
The [airflow_utils.py](airflow_utils.py) contains the utility functions/classes to create the workflow and export the workflow into dictionary format.

Before running `airflow scheduler` ensure that metaflow is installed in the PYTHONPATH.
The [airflow_compiler.py](airflow_compiler.py) leverages `airflow_utils.py` to the compile the workflow into dictionary format and then using mustache we create the airflow file. The [af_deploy.py](af_deploy.py) file has the python template. The [airflow_utils.py](airflow_utils.py) is pasted as is into the template. The code in the compiled workflow reusese the [airflow_utils.py](airflow_utils.py) code convert the dictionary format into airflow `Operator`s and `DAG`.

The DAG compiled from the flow will use API's exposed via metaflow to compile and interpret the DAG.
11 changes: 9 additions & 2 deletions metaflow/plugins/airflow/airflow_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,18 @@ def __init__(
# todo : should we allow mix and match of compute providers
# todo : fill information here.
self.workflow_timeout = 10
self.schedule_interval = "*/2 * * * *"
# todo : resolve schedule interval based on decorator.
self.schedule_interval = self._get_schedule()
self._file_path = file_path

def _get_schedule(self):
schedule = self.flow._flow_decorators.get("schedule")
if schedule:
return schedule.schedule
# Airflow requires a scheduling arguement so keeping this
return "*/2 * * * *"

def _k8s_job(self, node, input_paths, env):
# todo check if the Node has some
# todo : check for retry
# since we are attaching k8s at cli, there will be one for a step.
k8s_deco = [deco for deco in node.decorators if deco.name == "kubernetes"][0]
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/airflow/compute/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def create_k8s_args(
k8s_operator_args = dict(
namespace=namespace,
service_account=service_account,
# todo : pass secrets from metaflow to Kubernetes via airflow
secrets=secrets,
node_selector=node_selector,
cmds=k8s._command(
Expand Down

0 comments on commit 68f13be

Please sign in to comment.