Skip to content

Commit

Permalink
porting savin's comments
Browse files Browse the repository at this point in the history
- next changes : addressing comments.
  • Loading branch information
valayDave committed Jul 28, 2022
1 parent 3f2353a commit d8e6ec0
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,35 @@
import string
import sys
from collections import defaultdict

# Task instance attributes : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html
from datetime import datetime, timedelta

import metaflow.util as util
from metaflow import R
from metaflow.decorators import flow_decorators
from metaflow.exception import MetaflowException
from metaflow.metaflow_config import (
BATCH_METADATA_SERVICE_HEADERS,
BATCH_METADATA_SERVICE_URL,
DATASTORE_CARD_S3ROOT,
DATASTORE_SYSROOT_S3,
DATATOOLS_S3ROOT,
KUBERNETES_SERVICE_ACCOUNT,
)
from metaflow.metaflow_config import (BATCH_METADATA_SERVICE_HEADERS,
BATCH_METADATA_SERVICE_URL,
DATASTORE_CARD_S3ROOT,
DATASTORE_SYSROOT_S3, DATATOOLS_S3ROOT,
KUBERNETES_SERVICE_ACCOUNT)
from metaflow.parameters import deploy_time_eval
from metaflow.plugins.kubernetes.kubernetes import Kubernetes
from metaflow.plugins.cards.card_modules import chevron
from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task
from metaflow.util import dict_to_cli_options, get_username

from . import airflow_utils as af_utils
from .airflow_utils import (
AIRFLOW_TASK_ID_TEMPLATE_VALUE,
PARENT_TASK_INSTANCE_STATUS_MACRO,
RUN_ID_LEN,
TASK_ID_XCOM_KEY,
AirflowTask,
Workflow,
)
from .airflow_decorator import AirflowSensorDecorator, SUPPORTED_SENSORS
from . import airflow_utils
from .airflow_decorator import SUPPORTED_SENSORS, AirflowSensorDecorator
from .airflow_utils import (AIRFLOW_TASK_ID_TEMPLATE_VALUE,
PARENT_TASK_INSTANCE_STATUS_MACRO, RUN_ID_LEN,
TASK_ID_XCOM_KEY, AirflowTask, Workflow)

AIRFLOW_DEPLOY_TEMPLATE_FILE = os.path.join(os.path.dirname(__file__), "af_deploy.py")
AIRFLOW_DEPLOY_TEMPLATE_FILE = os.path.join(os.path.dirname(__file__), "dag.py")


AIRFLOW_PREFIX = "arf"
RUN_ID_PREFIX = "airflow"


# this method is only ever invoked in one place. we can merge this method into _k8s_job
def create_k8s_args(
datastore,
metadata,
Expand Down Expand Up @@ -150,7 +140,7 @@ class Airflow(object):
# Such run-ids break the `metaflow.util.decompress_list`; this is why we hash the runid
run_id = (
"%s-$(echo -n {{ run_id }}-{{ dag_run.dag_id }} | md5sum | awk '{print $1}' | awk '{print substr ($0, 0, %s)}')"
% (AIRFLOW_PREFIX, str(RUN_ID_LEN))
% (RUN_ID_PREFIX, str(RUN_ID_LEN))
)
# We do echo -n because emits line breaks and we dont want to consider that since it we want same hash value when retrieved in python.
run_id_arg = "--run-id %s" % run_id
Expand All @@ -173,12 +163,9 @@ def __init__(
username=None,
max_workers=None,
worker_pool=None,
email=None,
start_date=datetime.now(),
description=None,
catchup=False,
file_path=None,
set_active=False,
is_paused_upon_creation=True,
):
self.name = name
self.graph = graph
Expand All @@ -194,12 +181,14 @@ def __init__(
self.namespace = namespace # this is the username space
self.username = username
self.max_workers = max_workers
self.email = email
self.description = description
self.start_date = start_date
self.catchup = catchup
# remove references to catch up
#self.catchup = catchup
self._depends_on_upstream_sensors = False

# let's use some more descriptive names instead of _schd and _sint
# you can also combine _get_schedule and _get_airflow_schedule_interval to
# create a cleaner method
_schd, _sint = self._get_schedule(), self._get_airflow_schedule_interval()
self.schedule_interval = None
if _schd is not None:
Expand All @@ -211,7 +200,8 @@ def __init__(
self.metaflow_parameters = None
_, self.graph_structure = self.graph.output_steps()
self.worker_pool = worker_pool
self.set_active = set_active
# replace with `is_paused_upon_creation`
self.set_active = False

def _get_schedule(self):
# Using the cron presets provided here :
Expand All @@ -236,6 +226,7 @@ def _get_airflow_schedule_interval(self):
return schedule_interval.schedule

def _k8s_job(self, node, input_paths, env):
# what does this comment imply?
# since we are attaching k8s at cli, there will be one for a step.
k8s_deco = [deco for deco in node.decorators if deco.name == "kubernetes"][0]
user_code_retries, _ = self._get_retries(node)
Expand Down Expand Up @@ -316,6 +307,7 @@ def _process_parameters(self):
seen.add(norm)

is_required = param.kwargs.get("required", False)
# Fix comment that has reference to Event Bridge
# Throw an exception if a schedule is set for a flow with required
# parameters with no defaults. We currently don't have any notion
# of data triggers in AWS Event Bridge.
Expand All @@ -335,14 +327,17 @@ def _process_parameters(self):
parameters.append(dict(name=param.name, value=value))
# Setting airflow related param args.
param_type = param.kwargs.get("type", None)
# for consistency let's follow snake case everywhere
airflowparam = dict(
name=param.name,
)
# same comment as above
phelp = param.kwargs.get("help", None)
if value is not None:
airflowparam["default"] = value
if phelp:
airflowparam["description"] = phelp
# you can check membership in type_transform_dict instead.
if param_type in allowed_types:
airflowparam["type"] = type_transform_dict[param_type.__name__]
if param_type.__name__ in type_parser and value is not None:
Expand All @@ -353,6 +348,8 @@ def _process_parameters(self):

return parameters

# there is a compress method in utils.py that can be used instead of this logic
# here which can be broken very easily if our runtime decompression logic changes.
def _make_parent_input_path_compressed(
self,
step_names,
Expand All @@ -361,6 +358,10 @@ def _make_parent_input_path_compressed(
self._make_parent_input_path(s, only_task_id=True) for s in step_names
)

# there is no notion of parent input path. the input paths depend on parents - we
# rename this function to _input_path. also, not sure if you need to do
# _only_task_id_ flag is you rely on the compress method as mentioned in the above
# comment.
def _make_parent_input_path(self, step_name, only_task_id=False):
# This is set using the `airflow_internal` decorator.
# This will pull the `return_value` xcom which holds a dictionary.
Expand Down Expand Up @@ -468,6 +469,10 @@ def _to_job(self, node):

compute_type = "k8s" # todo : This will become more dynamic in the future.
if compute_type == "k8s":
# a better abstraction would be to generate the CLI and pass it to the
# _k8s_job method. at this point, a variety of responsibilities (env etc.)
# are being shared between this method and _k8s_job - I am not sure what
# is the actual intended split between the two methods.
return self._k8s_job(node, input_paths, env)

def _step_cli(self, node, paths, code_package_url, user_code_retries):
Expand All @@ -476,11 +481,9 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
script_name = os.path.basename(sys.argv[0])
executable = self.environment.executable(node.name)

if R.use_r():
entrypoint = [R.entrypoint()]
else:
entrypoint = [executable, script_name]
entrypoint = [executable, script_name]

# Can you clarfiy the comment? What does stuff mean here?
# Ignore compute decorators since this will already throw stuff there.
top_opts_dict = {
"with": [
Expand Down Expand Up @@ -637,24 +640,26 @@ def _visit(node, workflow, exit_node=None):
default_args=self._create_defaults(),
description=self.description,
schedule_interval=self.schedule_interval,
start_date=self.start_date,
catchup=self.catchup,
start_date=datetime.now(),
catchup=False,
tags=self.tags,
file_path=self._file_path,
graph_structure=self.graph_structure,
**other_args
)
workflow = _visit(self.graph["start"], workflow)
# TODO: Just parameters?
workflow.set_parameters(self.metaflow_parameters)
if len(appending_sensors) > 0:
for s in appending_sensors:
workflow.add_state(s)
workflow.graph_structure.insert(0, [[s.name] for s in appending_sensors])
return self._create_airflow_file(workflow.to_dict())

# Just _to_airflow_dag_file?
def _create_airflow_file(self, json_dag):
util_file = None
with open(af_utils.__file__) as f:
with open(airflow_utils.__file__) as f:
util_file = f.read()
with open(AIRFLOW_DEPLOY_TEMPLATE_FILE) as f:
return chevron.render(
Expand All @@ -668,12 +673,15 @@ def _create_airflow_file(self, json_dag):
),
)

# Not sure what is happening here. This can simply be the listed defaults in
# the DAG template.
def _create_defaults(self):
defu_ = {
"owner": get_username(),
# If set on a task, doesn’t run the task in the current DAG run if the previous run of the task has failed.
"depends_on_past": False,
"email": [] if self.email is None else [self.email],
# TODO: Enable emails
"email": [],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
Expand Down
Loading

0 comments on commit d8e6ec0

Please sign in to comment.