Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

porting changes from a commit mangled branch #56

Merged
merged 1 commit into from
Jun 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 58 additions & 38 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
KUBERNETES_SERVICE_ACCOUNT)
from metaflow.parameters import deploy_time_eval
from metaflow.plugins.kubernetes.kubernetes import Kubernetes

# TODO: Move chevron to _vendor
from metaflow.plugins.cards.card_modules import chevron
from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task
from metaflow.util import dict_to_cli_options, get_username, compress_list
from metaflow.parameters import JSONTypeClass

from . import airflow_utils
from .airflow_decorator import SUPPORTED_SENSORS, AirflowSensorDecorator
Expand Down Expand Up @@ -111,12 +114,12 @@ def _set_scheduling_interval(self):
This method will extract interval from both and apply the one which is not None. We raise an exception in the
airflow_cli.py if both flow decorators are set.
"""
schedule_decorator_sint, airflow_schedule_decorator_sint = self._get_schedule(), self._get_airflow_schedule_interval()
schedule_decorator_cron_pattern, airflow_schedule_decorator_cron_pattern = self._get_schedule(), self._get_airflow_schedule_interval()
self.schedule_interval = None
if schedule_decorator_sint is not None:
self.schedule_interval = schedule_decorator_sint
elif airflow_schedule_decorator_sint is not None:
self.schedule_interval = airflow_schedule_decorator_sint
if schedule_decorator_cron_pattern is not None:
self.schedule_interval = schedule_decorator_cron_pattern
elif airflow_schedule_decorator_cron_pattern is not None:
self.schedule_interval = airflow_schedule_decorator_cron_pattern

def _get_schedule(self):
# Using the cron presets provided here :
Expand Down Expand Up @@ -168,6 +171,8 @@ def _process_parameters(self):
str.__name__: "string",
bool.__name__: "string",
float.__name__: "number",
JSONTypeClass.name: "string"

}
type_parser = {bool.__name__: lambda v: str(v)}

Expand All @@ -182,20 +187,12 @@ def _process_parameters(self):
)
seen.add(norm)

is_required = param.kwargs.get("required", False)
# Throw an exception if a schedule is set for a flow with required
# parameters with no defaults.
if "default" not in param.kwargs and is_required:
raise MetaflowException(
"The parameter *%s* does not have a "
"default while having 'required' set to 'True'. "
"A default is required for such parameters when deploying on Airflow."
)
if "default" not in param.kwargs and self.schedule_interval:
# Airflow requires defaults set for parameters.
if "default" not in param.kwargs:
raise MetaflowException(
"When @schedule is set with Airflow, Parameters require default values. "
"The parameter *%s* does not have a "
"'default' set"
"default set. "
"A default is required for parameters when deploying on Airflow."
)
value = deploy_time_eval(param.kwargs.get("default"))
parameters.append(dict(name=param.name, value=value))
Expand All @@ -210,11 +207,13 @@ def _process_parameters(self):
airflow_param["default"] = value
if param_help:
airflow_param["description"] = param_help

if param_type is not None and param_type.__name__ in type_transform_dict:
airflow_param["type"] = type_transform_dict[param_type.__name__]
if param_type.__name__ in type_parser and value is not None:
airflow_param["default"] = type_parser[param_type.__name__](value)
if param_type is not None:
if isinstance(param_type,JSONTypeClass):
airflow_param["type"] = type_transform_dict[JSONTypeClass.name]
elif param_type.__name__ in type_transform_dict:
airflow_param["type"] = type_transform_dict[param_type.__name__]
if param_type.__name__ in type_parser and value is not None:
airflow_param["default"] = type_parser[param_type.__name__](value)

airflow_params.append(airflow_param)
self.parameters = airflow_params
Expand Down Expand Up @@ -243,8 +242,8 @@ def _make_input_path(self, step_name):

def _to_job(self, node):
"""
This function will transform the node's specification into Airflow compatible operator arguements.
Since this function is long. We it performs two major duties:
This function will transform the node's specification into Airflow compatible operator arguments.
Since this function is long. It performs two major duties:
1. Based on the type of the graph node (start/linear/foreach/join etc.) it will decide how to set the input paths
2. Based on node's decorator specification convert the information into a job spec for the KubernetesPodOperator.
"""
Expand Down Expand Up @@ -323,9 +322,13 @@ def _to_job(self, node):
runtime_limit = get_run_time_limit_for_task(node.decorators)

k8s = Kubernetes(
self.flow_datastore, self.metadata, self.environment,
self.flow_datastore, self.metadata, self.environment
)
user = util.get_username()

airflow_task_id = AIRFLOW_TASK_ID_TEMPLATE_VALUE
mf_run_id = "%s-{{ [run_id, dag_run.dag_id] | run_id_creator }}" % RUN_ID_PREFIX # run_id_creator is added via the `user_defined_filters`
attempt = "{{ task_instance.try_number - 1 }}"
labels = {
"app": "metaflow",
"app.kubernetes.io/name": "metaflow-task",
Expand All @@ -337,7 +340,7 @@ def _to_job(self, node):
additional_mf_variables = {
"METAFLOW_CODE_SHA": self.code_package_sha,
"METAFLOW_CODE_URL": self.code_package_url,
"METAFLOW_CODE_DS": self.datastore.TYPE,
"METAFLOW_CODE_DS": self.flow_datastore.TYPE,
"METAFLOW_USER": user,
"METAFLOW_SERVICE_URL": BATCH_METADATA_SERVICE_URL,
"METAFLOW_SERVICE_HEADERS": json.dumps(BATCH_METADATA_SERVICE_HEADERS),
Expand All @@ -346,39 +349,55 @@ def _to_job(self, node):
"METAFLOW_DEFAULT_DATASTORE": "s3",
"METAFLOW_DEFAULT_METADATA": "service",
# Question for (savin) : what does `METAFLOW_KUBERNETES_WORKLOAD` do ?
"METAFLOW_KUBERNETES_WORKLOAD": 1,
"METAFLOW_KUBERNETES_WORKLOAD": str(1),
"METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
"METAFLOW_CARD_S3ROOT": DATASTORE_CARD_S3ROOT,
"METAFLOW_RUN_ID":mf_run_id,
"METAFLOW_AIRFLOW_TASK_ID":airflow_task_id,
"METAFLOW_AIRFLOW_DAG_RUN_ID":"{{run_id}}",
"METAFLOW_AIRFLOW_JOB_ID":"{{ti.job_id}}",
"METAFLOW_ATTEMPT_NUMBER":attempt,
}
env.update(additional_mf_variables)

service_account = k8s_deco.attributes["service_account"]

service_account = KUBERNETES_SERVICE_ACCOUNT if k8s_deco.attributes["service_account"] is None else k8s_deco.attributes["service_account"]
k8s_namespace = k8s_deco.attributes["namespace"] if k8s_deco.attributes["namespace"] is not None else "default"
k8s_operator_args = dict(
namespace=k8s_deco.attributes["namespace"],
service_account_name=KUBERNETES_SERVICE_ACCOUNT
if service_account is None
else service_account,
namespace=k8s_namespace,
service_account_name=service_account,
node_selector=k8s_deco.attributes["node_selector"],
cmds=k8s._command(
self.flow.name, self.run_id, node.name, self.task_id, self.attempt,
code_package_url=self.code_package_url,
step_cmds= self._step_cli(node, input_paths, self.code_package_url, user_code_retries),
),
in_cluster=True,
image=k8s_deco.attributes["image"],
cpu=k8s_deco.attributes["cpu"],
memory=k8s_deco.attributes["memory"],
disk=k8s_deco.attributes["disk"],
# TODO : (savin-comments) add gpu support with limits
resources = dict(
requests = {
"cpu":k8s_deco.attributes["cpu"],
"memory":"%sM"%str(k8s_deco.attributes["memory"]),
"ephemeral-storage":str(k8s_deco.attributes["disk"]),
}
),
execution_timeout=dict(seconds=runtime_limit),
retry_delay=dict(seconds=retry_delay.total_seconds()) if retry_delay else None,
retries=user_code_retries,
env_vars=[dict(name=k, value=v) for k, v in env.items()],
labels=labels,
task_id = node.name,
in_cluster= True,
get_logs =True,
do_xcom_push= True,
log_events_on_failure=True,
is_delete_operator_pod=True,
retry_exponential_backoff= False, # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will.
reattach_on_restart=False,
)
if k8s_deco.attributes["secrets"]:
k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"]

if retry_delay:
k8s_operator_args["retry_delay"]=dict(seconds=retry_delay.total_seconds())

return k8s_operator_args

Expand Down Expand Up @@ -583,6 +602,7 @@ def _create_defaults(self):
# TODO: Enable emails
"retries": 0,
"execution_timeout": timedelta(days=5),
"retry_delay": timedelta(seconds=200),
# check https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?highlight=retry_delay#airflow.models.baseoperator.BaseOperatorMeta
}
if self.worker_pool is not None:
Expand Down
5 changes: 2 additions & 3 deletions metaflow/plugins/airflow/airflow_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ def make_flow(
# Validate if the workflow is correctly parsed.
# _validate_workflow(obj.flow, obj.graph, obj.flow_datastore, obj.metadata)

# Attach @kubernetes and @environment decorator to the flow to
# ensure that the related decorator hooks are invoked.
# Attach @kubernetes.
decorators._attach_decorators(
obj.flow, [KubernetesDecorator.name, EnvironmentDecorator.name]
obj.flow, [KubernetesDecorator.name]
)

decorators._init_step_decorators(
Expand Down
11 changes: 7 additions & 4 deletions metaflow/plugins/airflow/airflow_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
K8S_XCOM_DIR_PATH = "/airflow/xcom"


# there are 6 decorators in this class right now - making it difficult to parse
# the file. can we split this out with every decorator getting it's own file?
# # TODO : (savin-comments) :
# there are 6 decorators in this class right now - making it difficult to parse
# the file. can we split this out with every decorator getting it's own file?

def safe_mkdir(dir):
try:
Expand All @@ -39,7 +40,7 @@ def push_xcom_values(xcom_dict):
SKIPPED="skipped",
)

# is there a reason for this method to exist in it's current form?
# TODO : (savin-comments) : fix this is there a reason for this method to exist in it's current form?
def _get_sensor_exception():
from .airflow import AirflowException

Expand All @@ -49,6 +50,7 @@ class AirflowSensorException(AirflowException):
return AirflowSensorException


# # TODO : (savin-comments) : refactor this to a common place.
def _arg_exception(arg_name, deconame, value, allowed_values=None, allowed_type=None):
msg_str = "`%s` cannot be `%s` when using @%s" % (
arg_name,
Expand Down Expand Up @@ -131,6 +133,7 @@ class AirflowSensorDecorator(FlowDecorator):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO : (savin-comments) : refactor the name of `self._task_name` to have a common name.
# Is the task is task name a metaflow task?
self._task_name = self.operator_type

Expand Down Expand Up @@ -174,7 +177,7 @@ def compile(self):
# If there are more than one decorator per sensor-type then we require the name argument.
if sum([len(v) for v in sensor_deco_types.values()]) > len(sensor_deco_types):
if self.attributes["name"] is None:
# can't we autogenerate this name?
# TODO : (savin-comments) autogenerate this name
raise _get_sensor_exception()(
"`name` argument cannot be `None` when multiple Airflow Sensor related decorators are attached to a flow."
)
Expand Down
Loading