Skip to content

Commit

Permalink
Removing traces of @airflow_schedule_interval
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave committed Jul 28, 2022
1 parent 0673db7 commit 6d81b75
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 32 deletions.
23 changes: 1 addition & 22 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(
self.worker_pool = worker_pool
self.is_paused_upon_creation = is_paused_upon_creation
self.workflow_timeout = workflow_timeout
self.schedule = self._scheduling_interval()
self.schedule = self._get_schedule()
self.parameters = self._process_parameters()
self.production_token = production_token

Expand Down Expand Up @@ -127,21 +127,6 @@ def save_deployment_token(cls, owner, token, flow_datastore):
overwrite=False,
)

def _scheduling_interval(self):
"""
The airflow integration allows setting schedule interval using `@schedule` and `@airflow_schedule_interval` decorator.
This method will extract interval from both and apply the one which is not None. We raise an exception in the
airflow_cli.py if both flow decorators are set.
"""
schedule_decorator_cron_pattern, airflow_schedule_decorator_cron_pattern = (
self._get_schedule(),
self._get_airflow_schedule_interval(),
)
if schedule_decorator_cron_pattern is not None:
return schedule_decorator_cron_pattern
elif airflow_schedule_decorator_cron_pattern is not None:
return airflow_schedule_decorator_cron_pattern

def _get_schedule(self):
# Using the cron presets provided here :
# https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html?highlight=schedule%20interval#cron-presets
Expand All @@ -158,12 +143,6 @@ def _get_schedule(self):
return "@daily"
return None

def _get_airflow_schedule_interval(self):
schedule_interval = self.flow._flow_decorators.get("airflow_schedule_interval")
if schedule_interval is None:
return None
return schedule_interval.schedule

def _get_retries(self, node):
max_user_code_retries = 0
max_error_retries = 0
Expand Down
10 changes: 0 additions & 10 deletions metaflow/plugins/airflow/airflow_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,7 @@ def traverse_graph(node, state):


def _validate_workflow(flow, graph, flow_datastore, metadata, workflow_timeout):
no_scheduling = not (
flow._flow_decorators.get("airflow_schedule_interval")
or flow._flow_decorators.get("schedule")
)

if no_scheduling and workflow_timeout is not None:
raise AirflowException(
"Cannot set `--workflow-timeout` for an unscheduled DAG. Add `@schedule` or `@airflow_schedule_interval` to the flow to set `--workflow-timeout`."
)
# check for other compute related decorators.
# supported compute : k8s (v1), local(v2), batch(v3),
_validate_foreach_constraints(graph)
for node in graph:
if any([d.name == "batch" for d in node.decorators]):
Expand Down

0 comments on commit 6d81b75

Please sign in to comment.