Skip to content

Commit

Permalink
Following Changes:
Browse files Browse the repository at this point in the history
- Refactors setting scheduling interval
- refactor dag file creating function
- refactored is_active to is_paused_upon_creation
- removed catchup
  • Loading branch information
valayDave committed Jul 28, 2022
1 parent 054e3f3 commit b9387dd
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,27 @@ def __init__(
self.username = username
self.max_workers = max_workers
self.description = description
# remove references to catch up
#self.catchup = catchup
self._depends_on_upstream_sensors = False

# let's use some more descriptive names instead of _schd and _sint
# you can also combine _get_schedule and _get_airflow_schedule_interval to
# create a cleaner method
_schd, _sint = self._get_schedule(), self._get_airflow_schedule_interval()
self.schedule_interval = None
if _schd is not None:
self.schedule_interval = _schd
elif _sint is not None:
self.schedule_interval = _sint

self._file_path = file_path
self.parameters = None
_, self.graph_structure = self.graph.output_steps()
self.worker_pool = worker_pool
# replace with `is_paused_upon_creation`
self.set_active = False
self.is_paused_upon_creation = is_paused_upon_creation
self._set_scheduling_interval()

def _set_scheduling_interval(self):
"""
The airflow integration allows setting schedule interval using `@schedule` and `@airflow_schedule_interval` decorator.
This method will extract interval from both and apply the one which is not None. We raise an exception in the
airflow_cli.py if both flow decorators are set.
"""
schedule_decorator_sint, airflow_schedule_decorator_sint = self._get_schedule(), self._get_airflow_schedule_interval()
self.schedule_interval = None
if schedule_decorator_sint is not None:
self.schedule_interval = schedule_decorator_sint
elif airflow_schedule_decorator_sint is not None:
self.schedule_interval = airflow_schedule_decorator_sint

def _get_schedule(self):
# Using the cron presets provided here :
Expand Down Expand Up @@ -535,8 +536,7 @@ def _visit(node, workflow, exit_node=None):
other_args = (
{} if self.max_workers is None else dict(max_active_tasks=self.max_workers)
)
if self.set_active:
other_args["is_paused_upon_creation"] = False
other_args["is_paused_upon_creation"] = self.is_paused_upon_creation

appending_sensors = self._collect_flow_sensors()
workflow = Workflow(
Expand All @@ -545,23 +545,21 @@ def _visit(node, workflow, exit_node=None):
description=self.description,
schedule_interval=self.schedule_interval,
start_date=datetime.now(),
catchup=False,
tags=self.tags,
file_path=self._file_path,
graph_structure=self.graph_structure,
**other_args
)
workflow = _visit(self.graph["start"], workflow)
# TODO: Just parameters?

workflow.set_parameters(self.parameters)
if len(appending_sensors) > 0:
for s in appending_sensors:
workflow.add_state(s)
workflow.graph_structure.insert(0, [[s.name] for s in appending_sensors])
return self._create_airflow_file(workflow.to_dict())
return self._to_airflow_dag_file(workflow.to_dict())

# Just _to_airflow_dag_file?
def _create_airflow_file(self, json_dag):
def _to_airflow_dag_file(self, json_dag):
util_file = None
with open(airflow_utils.__file__) as f:
util_file = f.read()
Expand Down

0 comments on commit b9387dd

Please sign in to comment.