Skip to content

Commit

Permalink
Moving information from airflow_utils to compiler (#56)
Browse files Browse the repository at this point in the history
- commenting todos to organize unfinished changes.
- some environment variables set via`V1EnvVar`
    - `client.V1ObjectFieldSelector` mapped env vars were not working in json form
    - Moving k8s operator import into its own function.
    - env vars moved.
  • Loading branch information
valayDave committed Jul 14, 2022
1 parent e77ab4f commit fbbcb43
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 183 deletions.
96 changes: 58 additions & 38 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
KUBERNETES_SERVICE_ACCOUNT)
from metaflow.parameters import deploy_time_eval
from metaflow.plugins.kubernetes.kubernetes import Kubernetes

# TODO: Move chevron to _vendor
from metaflow.plugins.cards.card_modules import chevron
from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task
from metaflow.util import dict_to_cli_options, get_username, compress_list
from metaflow.parameters import JSONTypeClass

from . import airflow_utils
from .airflow_decorator import SUPPORTED_SENSORS, AirflowSensorDecorator
Expand Down Expand Up @@ -111,12 +114,12 @@ def _set_scheduling_interval(self):
This method will extract interval from both and apply the one which is not None. We raise an exception in the
airflow_cli.py if both flow decorators are set.
"""
schedule_decorator_sint, airflow_schedule_decorator_sint = self._get_schedule(), self._get_airflow_schedule_interval()
schedule_decorator_cron_pattern, airflow_schedule_decorator_cron_pattern = self._get_schedule(), self._get_airflow_schedule_interval()
self.schedule_interval = None
if schedule_decorator_sint is not None:
self.schedule_interval = schedule_decorator_sint
elif airflow_schedule_decorator_sint is not None:
self.schedule_interval = airflow_schedule_decorator_sint
if schedule_decorator_cron_pattern is not None:
self.schedule_interval = schedule_decorator_cron_pattern
elif airflow_schedule_decorator_cron_pattern is not None:
self.schedule_interval = airflow_schedule_decorator_cron_pattern

def _get_schedule(self):
# Using the cron presets provided here :
Expand Down Expand Up @@ -168,6 +171,8 @@ def _process_parameters(self):
str.__name__: "string",
bool.__name__: "string",
float.__name__: "number",
JSONTypeClass.name: "string"

}
type_parser = {bool.__name__: lambda v: str(v)}

Expand All @@ -182,20 +187,12 @@ def _process_parameters(self):
)
seen.add(norm)

is_required = param.kwargs.get("required", False)
# Throw an exception if a schedule is set for a flow with required
# parameters with no defaults.
if "default" not in param.kwargs and is_required:
raise MetaflowException(
"The parameter *%s* does not have a "
"default while having 'required' set to 'True'. "
"A default is required for such parameters when deploying on Airflow."
)
if "default" not in param.kwargs and self.schedule_interval:
# Airflow requires defaults set for parameters.
if "default" not in param.kwargs:
raise MetaflowException(
"When @schedule is set with Airflow, Parameters require default values. "
"The parameter *%s* does not have a "
"'default' set"
"default set. "
"A default is required for parameters when deploying on Airflow."
)
value = deploy_time_eval(param.kwargs.get("default"))
parameters.append(dict(name=param.name, value=value))
Expand All @@ -210,11 +207,13 @@ def _process_parameters(self):
airflow_param["default"] = value
if param_help:
airflow_param["description"] = param_help

if param_type is not None and param_type.__name__ in type_transform_dict:
airflow_param["type"] = type_transform_dict[param_type.__name__]
if param_type.__name__ in type_parser and value is not None:
airflow_param["default"] = type_parser[param_type.__name__](value)
if param_type is not None:
if isinstance(param_type,JSONTypeClass):
airflow_param["type"] = type_transform_dict[JSONTypeClass.name]
elif param_type.__name__ in type_transform_dict:
airflow_param["type"] = type_transform_dict[param_type.__name__]
if param_type.__name__ in type_parser and value is not None:
airflow_param["default"] = type_parser[param_type.__name__](value)

airflow_params.append(airflow_param)
self.parameters = airflow_params
Expand Down Expand Up @@ -243,8 +242,8 @@ def _make_input_path(self, step_name):

def _to_job(self, node):
"""
This function will transform the node's specification into Airflow compatible operator arguements.
Since this function is long. We it performs two major duties:
This function will transform the node's specification into Airflow compatible operator arguments.
Since this function is long. It performs two major duties:
1. Based on the type of the graph node (start/linear/foreach/join etc.) it will decide how to set the input paths
2. Based on node's decorator specification convert the information into a job spec for the KubernetesPodOperator.
"""
Expand Down Expand Up @@ -323,9 +322,13 @@ def _to_job(self, node):
runtime_limit = get_run_time_limit_for_task(node.decorators)

k8s = Kubernetes(
self.flow_datastore, self.metadata, self.environment,
self.flow_datastore, self.metadata, self.environment
)
user = util.get_username()

airflow_task_id = AIRFLOW_TASK_ID_TEMPLATE_VALUE
mf_run_id = "%s-{{ [run_id, dag_run.dag_id] | run_id_creator }}" % RUN_ID_PREFIX # run_id_creator is added via the `user_defined_filters`
attempt = "{{ task_instance.try_number - 1 }}"
labels = {
"app": "metaflow",
"app.kubernetes.io/name": "metaflow-task",
Expand All @@ -337,7 +340,7 @@ def _to_job(self, node):
additional_mf_variables = {
"METAFLOW_CODE_SHA": self.code_package_sha,
"METAFLOW_CODE_URL": self.code_package_url,
"METAFLOW_CODE_DS": self.datastore.TYPE,
"METAFLOW_CODE_DS": self.flow_datastore.TYPE,
"METAFLOW_USER": user,
"METAFLOW_SERVICE_URL": BATCH_METADATA_SERVICE_URL,
"METAFLOW_SERVICE_HEADERS": json.dumps(BATCH_METADATA_SERVICE_HEADERS),
Expand All @@ -346,39 +349,55 @@ def _to_job(self, node):
"METAFLOW_DEFAULT_DATASTORE": "s3",
"METAFLOW_DEFAULT_METADATA": "service",
# Question for (savin) : what does `METAFLOW_KUBERNETES_WORKLOAD` do ?
"METAFLOW_KUBERNETES_WORKLOAD": 1,
"METAFLOW_KUBERNETES_WORKLOAD": str(1),
"METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
"METAFLOW_CARD_S3ROOT": DATASTORE_CARD_S3ROOT,
"METAFLOW_RUN_ID":mf_run_id,
"METAFLOW_AIRFLOW_TASK_ID":airflow_task_id,
"METAFLOW_AIRFLOW_DAG_RUN_ID":"{{run_id}}",
"METAFLOW_AIRFLOW_JOB_ID":"{{ti.job_id}}",
"METAFLOW_ATTEMPT_NUMBER":attempt,
}
env.update(additional_mf_variables)

service_account = k8s_deco.attributes["service_account"]

service_account = KUBERNETES_SERVICE_ACCOUNT if k8s_deco.attributes["service_account"] is None else k8s_deco.attributes["service_account"]
k8s_namespace = k8s_deco.attributes["namespace"] if k8s_deco.attributes["namespace"] is not None else "default"
k8s_operator_args = dict(
namespace=k8s_deco.attributes["namespace"],
service_account_name=KUBERNETES_SERVICE_ACCOUNT
if service_account is None
else service_account,
namespace=k8s_namespace,
service_account_name=service_account,
node_selector=k8s_deco.attributes["node_selector"],
cmds=k8s._command(
self.flow.name, self.run_id, node.name, self.task_id, self.attempt,
code_package_url=self.code_package_url,
step_cmds= self._step_cli(node, input_paths, self.code_package_url, user_code_retries),
),
in_cluster=True,
image=k8s_deco.attributes["image"],
cpu=k8s_deco.attributes["cpu"],
memory=k8s_deco.attributes["memory"],
disk=k8s_deco.attributes["disk"],
# TODO : (savin-comments) add gpu support with limits
resources = dict(
requests = {
"cpu":k8s_deco.attributes["cpu"],
"memory":"%sM"%str(k8s_deco.attributes["memory"]),
"ephemeral-storage":str(k8s_deco.attributes["disk"]),
}
),
execution_timeout=dict(seconds=runtime_limit),
retry_delay=dict(seconds=retry_delay.total_seconds()) if retry_delay else None,
retries=user_code_retries,
env_vars=[dict(name=k, value=v) for k, v in env.items()],
labels=labels,
task_id = node.name,
in_cluster= True,
get_logs =True,
do_xcom_push= True,
log_events_on_failure=True,
is_delete_operator_pod=True,
retry_exponential_backoff= False, # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will.
reattach_on_restart=False,
)
if k8s_deco.attributes["secrets"]:
k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"]

if retry_delay:
k8s_operator_args["retry_delay"]=dict(seconds=retry_delay.total_seconds())

return k8s_operator_args

Expand Down Expand Up @@ -583,6 +602,7 @@ def _create_defaults(self):
# TODO: Enable emails
"retries": 0,
"execution_timeout": timedelta(days=5),
"retry_delay": timedelta(seconds=200),
# check https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?highlight=retry_delay#airflow.models.baseoperator.BaseOperatorMeta
}
if self.worker_pool is not None:
Expand Down
5 changes: 2 additions & 3 deletions metaflow/plugins/airflow/airflow_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ def make_flow(
# Validate if the workflow is correctly parsed.
# _validate_workflow(obj.flow, obj.graph, obj.flow_datastore, obj.metadata)

# Attach @kubernetes and @environment decorator to the flow to
# ensure that the related decorator hooks are invoked.
# Attach @kubernetes.
decorators._attach_decorators(
obj.flow, [KubernetesDecorator.name, EnvironmentDecorator.name]
obj.flow, [KubernetesDecorator.name]
)

decorators._init_step_decorators(
Expand Down
11 changes: 7 additions & 4 deletions metaflow/plugins/airflow/airflow_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
K8S_XCOM_DIR_PATH = "/airflow/xcom"


# there are 6 decorators in this class right now - making it difficult to parse
# the file. can we split this out with every decorator getting it's own file?
# # TODO : (savin-comments) :
# there are 6 decorators in this class right now - making it difficult to parse
# the file. can we split this out with every decorator getting it's own file?

def safe_mkdir(dir):
try:
Expand All @@ -39,7 +40,7 @@ def push_xcom_values(xcom_dict):
SKIPPED="skipped",
)

# is there a reason for this method to exist in it's current form?
# TODO : (savin-comments) : fix this is there a reason for this method to exist in it's current form?
def _get_sensor_exception():
from .airflow import AirflowException

Expand All @@ -49,6 +50,7 @@ class AirflowSensorException(AirflowException):
return AirflowSensorException


# # TODO : (savin-comments) : refactor this to a common place.
def _arg_exception(arg_name, deconame, value, allowed_values=None, allowed_type=None):
msg_str = "`%s` cannot be `%s` when using @%s" % (
arg_name,
Expand Down Expand Up @@ -131,6 +133,7 @@ class AirflowSensorDecorator(FlowDecorator):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO : (savin-comments) : refactor the name of `self._task_name` to have a common name.
# Is the task is task name a metaflow task?
self._task_name = self.operator_type

Expand Down Expand Up @@ -174,7 +177,7 @@ def compile(self):
# If there are more than one decorator per sensor-type then we require the name argument.
if sum([len(v) for v in sensor_deco_types.values()]) > len(sensor_deco_types):
if self.attributes["name"] is None:
# can't we autogenerate this name?
# TODO : (savin-comments) autogenerate this name
raise _get_sensor_exception()(
"`name` argument cannot be `None` when multiple Airflow Sensor related decorators are attached to a flow."
)
Expand Down
Loading

0 comments on commit fbbcb43

Please sign in to comment.