diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 2c78e491418..962e3dc0da7 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -17,9 +17,12 @@ KUBERNETES_SERVICE_ACCOUNT) from metaflow.parameters import deploy_time_eval from metaflow.plugins.kubernetes.kubernetes import Kubernetes + +# TODO: Move chevron to _vendor from metaflow.plugins.cards.card_modules import chevron from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task from metaflow.util import dict_to_cli_options, get_username, compress_list +from metaflow.parameters import JSONTypeClass from . import airflow_utils from .airflow_decorator import SUPPORTED_SENSORS, AirflowSensorDecorator @@ -111,12 +114,12 @@ def _set_scheduling_interval(self): This method will extract interval from both and apply the one which is not None. We raise an exception in the airflow_cli.py if both flow decorators are set. """ - schedule_decorator_sint, airflow_schedule_decorator_sint = self._get_schedule(), self._get_airflow_schedule_interval() + schedule_decorator_cron_pattern, airflow_schedule_decorator_cron_pattern = self._get_schedule(), self._get_airflow_schedule_interval() self.schedule_interval = None - if schedule_decorator_sint is not None: - self.schedule_interval = schedule_decorator_sint - elif airflow_schedule_decorator_sint is not None: - self.schedule_interval = airflow_schedule_decorator_sint + if schedule_decorator_cron_pattern is not None: + self.schedule_interval = schedule_decorator_cron_pattern + elif airflow_schedule_decorator_cron_pattern is not None: + self.schedule_interval = airflow_schedule_decorator_cron_pattern def _get_schedule(self): # Using the cron presets provided here : @@ -168,6 +171,8 @@ def _process_parameters(self): str.__name__: "string", bool.__name__: "string", float.__name__: "number", + JSONTypeClass.name: "string" + } type_parser = {bool.__name__: lambda v: str(v)} @@ -182,20 +187,12 @@ def _process_parameters(self): ) seen.add(norm) - is_required = param.kwargs.get("required", False) - # Throw an exception if a schedule is set for a flow with required - # parameters with no defaults. - if "default" not in param.kwargs and is_required: - raise MetaflowException( - "The parameter *%s* does not have a " - "default while having 'required' set to 'True'. " - "A default is required for such parameters when deploying on Airflow." - ) - if "default" not in param.kwargs and self.schedule_interval: + # Airflow requires defaults set for parameters. + if "default" not in param.kwargs: raise MetaflowException( - "When @schedule is set with Airflow, Parameters require default values. " "The parameter *%s* does not have a " - "'default' set" + "default set. " + "A default is required for parameters when deploying on Airflow." ) value = deploy_time_eval(param.kwargs.get("default")) parameters.append(dict(name=param.name, value=value)) @@ -210,11 +207,13 @@ def _process_parameters(self): airflow_param["default"] = value if param_help: airflow_param["description"] = param_help - - if param_type is not None and param_type.__name__ in type_transform_dict: - airflow_param["type"] = type_transform_dict[param_type.__name__] - if param_type.__name__ in type_parser and value is not None: - airflow_param["default"] = type_parser[param_type.__name__](value) + if param_type is not None: + if isinstance(param_type,JSONTypeClass): + airflow_param["type"] = type_transform_dict[JSONTypeClass.name] + elif param_type.__name__ in type_transform_dict: + airflow_param["type"] = type_transform_dict[param_type.__name__] + if param_type.__name__ in type_parser and value is not None: + airflow_param["default"] = type_parser[param_type.__name__](value) airflow_params.append(airflow_param) self.parameters = airflow_params @@ -243,8 +242,8 @@ def _make_input_path(self, step_name): def _to_job(self, node): """ - This function will transform the node's specification into Airflow compatible operator arguements. - Since this function is long. We it performs two major duties: + This function will transform the node's specification into Airflow compatible operator arguments. + Since this function is long. It performs two major duties: 1. Based on the type of the graph node (start/linear/foreach/join etc.) it will decide how to set the input paths 2. Based on node's decorator specification convert the information into a job spec for the KubernetesPodOperator. """ @@ -323,9 +322,13 @@ def _to_job(self, node): runtime_limit = get_run_time_limit_for_task(node.decorators) k8s = Kubernetes( - self.flow_datastore, self.metadata, self.environment, + self.flow_datastore, self.metadata, self.environment ) user = util.get_username() + + airflow_task_id = AIRFLOW_TASK_ID_TEMPLATE_VALUE + mf_run_id = "%s-{{ [run_id, dag_run.dag_id] | run_id_creator }}" % RUN_ID_PREFIX # run_id_creator is added via the `user_defined_filters` + attempt = "{{ task_instance.try_number - 1 }}" labels = { "app": "metaflow", "app.kubernetes.io/name": "metaflow-task", @@ -337,7 +340,7 @@ def _to_job(self, node): additional_mf_variables = { "METAFLOW_CODE_SHA": self.code_package_sha, "METAFLOW_CODE_URL": self.code_package_url, - "METAFLOW_CODE_DS": self.datastore.TYPE, + "METAFLOW_CODE_DS": self.flow_datastore.TYPE, "METAFLOW_USER": user, "METAFLOW_SERVICE_URL": BATCH_METADATA_SERVICE_URL, "METAFLOW_SERVICE_HEADERS": json.dumps(BATCH_METADATA_SERVICE_HEADERS), @@ -346,39 +349,55 @@ def _to_job(self, node): "METAFLOW_DEFAULT_DATASTORE": "s3", "METAFLOW_DEFAULT_METADATA": "service", # Question for (savin) : what does `METAFLOW_KUBERNETES_WORKLOAD` do ? - "METAFLOW_KUBERNETES_WORKLOAD": 1, + "METAFLOW_KUBERNETES_WORKLOAD": str(1), "METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes", "METAFLOW_CARD_S3ROOT": DATASTORE_CARD_S3ROOT, + "METAFLOW_RUN_ID":mf_run_id, + "METAFLOW_AIRFLOW_TASK_ID":airflow_task_id, + "METAFLOW_AIRFLOW_DAG_RUN_ID":"{{run_id}}", + "METAFLOW_AIRFLOW_JOB_ID":"{{ti.job_id}}", + "METAFLOW_ATTEMPT_NUMBER":attempt, } env.update(additional_mf_variables) - - service_account = k8s_deco.attributes["service_account"] + service_account = KUBERNETES_SERVICE_ACCOUNT if k8s_deco.attributes["service_account"] is None else k8s_deco.attributes["service_account"] + k8s_namespace = k8s_deco.attributes["namespace"] if k8s_deco.attributes["namespace"] is not None else "default" k8s_operator_args = dict( - namespace=k8s_deco.attributes["namespace"], - service_account_name=KUBERNETES_SERVICE_ACCOUNT - if service_account is None - else service_account, + namespace=k8s_namespace, + service_account_name=service_account, node_selector=k8s_deco.attributes["node_selector"], cmds=k8s._command( self.flow.name, self.run_id, node.name, self.task_id, self.attempt, code_package_url=self.code_package_url, step_cmds= self._step_cli(node, input_paths, self.code_package_url, user_code_retries), ), - in_cluster=True, image=k8s_deco.attributes["image"], - cpu=k8s_deco.attributes["cpu"], - memory=k8s_deco.attributes["memory"], - disk=k8s_deco.attributes["disk"], + # TODO : (savin-comments) add gpu support with limits + resources = dict( + requests = { + "cpu":k8s_deco.attributes["cpu"], + "memory":"%sM"%str(k8s_deco.attributes["memory"]), + "ephemeral-storage":str(k8s_deco.attributes["disk"]), + } + ), execution_timeout=dict(seconds=runtime_limit), - retry_delay=dict(seconds=retry_delay.total_seconds()) if retry_delay else None, retries=user_code_retries, env_vars=[dict(name=k, value=v) for k, v in env.items()], labels=labels, + task_id = node.name, + in_cluster= True, + get_logs =True, + do_xcom_push= True, + log_events_on_failure=True, is_delete_operator_pod=True, + retry_exponential_backoff= False, # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will. + reattach_on_restart=False, ) if k8s_deco.attributes["secrets"]: k8s_operator_args["secrets"] = k8s_deco.attributes["secrets"] + + if retry_delay: + k8s_operator_args["retry_delay"]=dict(seconds=retry_delay.total_seconds()) return k8s_operator_args @@ -583,6 +602,7 @@ def _create_defaults(self): # TODO: Enable emails "retries": 0, "execution_timeout": timedelta(days=5), + "retry_delay": timedelta(seconds=200), # check https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?highlight=retry_delay#airflow.models.baseoperator.BaseOperatorMeta } if self.worker_pool is not None: diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index c1708c66dc4..9f5c2a7a8b9 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -124,10 +124,9 @@ def make_flow( # Validate if the workflow is correctly parsed. # _validate_workflow(obj.flow, obj.graph, obj.flow_datastore, obj.metadata) - # Attach @kubernetes and @environment decorator to the flow to - # ensure that the related decorator hooks are invoked. + # Attach @kubernetes. decorators._attach_decorators( - obj.flow, [KubernetesDecorator.name, EnvironmentDecorator.name] + obj.flow, [KubernetesDecorator.name] ) decorators._init_step_decorators( diff --git a/metaflow/plugins/airflow/airflow_decorator.py b/metaflow/plugins/airflow/airflow_decorator.py index 4419c825534..420f715df94 100644 --- a/metaflow/plugins/airflow/airflow_decorator.py +++ b/metaflow/plugins/airflow/airflow_decorator.py @@ -11,8 +11,9 @@ K8S_XCOM_DIR_PATH = "/airflow/xcom" -# there are 6 decorators in this class right now - making it difficult to parse -# the file. can we split this out with every decorator getting it's own file? +# # TODO : (savin-comments) : + # there are 6 decorators in this class right now - making it difficult to parse + # the file. can we split this out with every decorator getting it's own file? def safe_mkdir(dir): try: @@ -39,7 +40,7 @@ def push_xcom_values(xcom_dict): SKIPPED="skipped", ) -# is there a reason for this method to exist in it's current form? +# TODO : (savin-comments) : fix this is there a reason for this method to exist in it's current form? def _get_sensor_exception(): from .airflow import AirflowException @@ -49,6 +50,7 @@ class AirflowSensorException(AirflowException): return AirflowSensorException +# # TODO : (savin-comments) : refactor this to a common place. def _arg_exception(arg_name, deconame, value, allowed_values=None, allowed_type=None): msg_str = "`%s` cannot be `%s` when using @%s" % ( arg_name, @@ -131,6 +133,7 @@ class AirflowSensorDecorator(FlowDecorator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + # TODO : (savin-comments) : refactor the name of `self._task_name` to have a common name. # Is the task is task name a metaflow task? self._task_name = self.operator_type @@ -174,7 +177,7 @@ def compile(self): # If there are more than one decorator per sensor-type then we require the name argument. if sum([len(v) for v in sensor_deco_types.values()]) > len(sensor_deco_types): if self.attributes["name"] is None: - # can't we autogenerate this name? + # TODO : (savin-comments) autogenerate this name raise _get_sensor_exception()( "`name` argument cannot be `None` when multiple Airflow Sensor related decorators are attached to a flow." ) diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index 89ccd487832..2032942d8d3 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -47,7 +47,7 @@ def get_supported_sensors(cls): return list(cls.__dict__.values()) -# TODO: This can be removed. See how labels/annotations are handled in Metaflow today. +# TODO : (savin-comments) This can be removed. See how labels/annotations are handled in Metaflow today. def sanitize_label_value(val): # Label sanitization: if the value can be used as is, return it as is. # If it can't, sanitize and add a suffix based on hash of the original @@ -83,6 +83,7 @@ def json_dump(val): return json.dumps(val) +# TODO : (savin-comments) Fix serialization of args : class AirflowDAGArgs(object): # _arg_types This object helps map types of # different keys that need to be parsed. None of the "values" in this @@ -131,7 +132,7 @@ def __init__(self, **kwargs): self._args = kwargs @property - def arguements(self): # TODO: Fix spelling + def arguments(self): return dict(**self._args, **self.metaflow_specific_args) # just serialize? @@ -182,7 +183,7 @@ def to_dict(self): dd = self._serialize_args() return dd -# TODO: This shouldn't be strictly needed? +# TODO : (savin-comments) This shouldn't be strictly needed? def generate_rfc1123_name(flow_name, step_name): """ Generate RFC 1123 compatible name. Specifically, the format is: @@ -205,36 +206,28 @@ def generate_rfc1123_name(flow_name, step_name): # the name has to be under 63 chars total return sanitized_long_name[:57] + "-" + hash[:5] -# better name - _kubernetes_pod_operator_args -# also, the output of this method is something that we should be able to generate -# statically on the user's workstation and not on Airflow server. basically - we can -# massage operator_args in the correct format before writing them out to the DAG file. -# that has a great side-effect of allowing us to eye-ball the results in the DAG file -# and not rely on more on-the-fly transformations. -def set_k8s_operator_args(flow_name, step_name, operator_args): +def _kubernetes_pod_operator_args(flow_name, step_name, operator_args): from kubernetes import client + from airflow.kubernetes.secret import Secret - - task_id = AIRFLOW_TASK_ID_TEMPLATE_VALUE - run_id = "%s-{{ [run_id, dag_run.dag_id] | run_id_creator }}" % RUN_ID_PREFIX # run_id_creator is added via the `user_defined_filters` - attempt = "{{ task_instance.try_number - 1 }}" # Set dynamic env variables like run-id, task-id etc from here. - env_vars = ( - [ - client.V1EnvVar(name=v["name"], value=str(v["value"])) - for v in operator_args.get("env_vars", []) - ] - + [ - client.V1EnvVar(name=k, value=str(v)) - for k, v in dict( - METAFLOW_RUN_ID=run_id, - METAFLOW_AIRFLOW_TASK_ID=task_id, - METAFLOW_AIRFLOW_DAG_RUN_ID="{{run_id}}", - METAFLOW_AIRFLOW_JOB_ID="{{ti.job_id}}", - METAFLOW_ATTEMPT_NUMBER=attempt, - ).items() - ] - + [ + secrets = [ + Secret("env", secret, secret) for secret in operator_args.get("secrets", []) + ] + args = operator_args + args.update({ + # TODO : (savin-comments) : we should be able to have a cleaner name - take a look at the argo implementation + "name": generate_rfc1123_name(flow_name, step_name), + "secrets": secrets, + # Question for (savin): + # Default timeout in airflow is 120. I can remove `startup_timeout_seconds` for now. how should we expose it to the user? + + # todo :annotations are not empty. see @kubernetes or argo-workflows + "annotations": {}, + + }) + # Below cannot be passed in dictionary form. After trying a few times it didin't work. + additional_env_vars = [ client.V1EnvVar( name=k, value_from=client.V1EnvVarSource( @@ -245,91 +238,17 @@ def set_k8s_operator_args(flow_name, step_name, operator_args): "METAFLOW_KUBERNETES_POD_NAMESPACE": "metadata.namespace", "METAFLOW_KUBERNETES_POD_NAME": "metadata.name", "METAFLOW_KUBERNETES_POD_ID": "metadata.uid", - "METAFLOW_KUBERNETES_SERVICE_ACCOUNT_NAME": "spec.serviceAccountName", }.items() ] + args["env_vars"] = [client.V1EnvVar(name=x["name"],value=x["value"]) for x in args["env_vars"]] + additional_env_vars + + # We need to explicitly parse resources to k8s.V1ResourceRequirements otherwise airflow tries + # to parse dictionaries to `airflow.providers.cncf.kubernetes.backcompat.pod.Resources` object via + # `airflow.providers.cncf.kubernetes.backcompat.backward_compat_converts.convert_resources` function + resources = args.get("resources") + args["resources"] = client.V1ResourceRequirements( + requests = resources['requests'], ) - - labels = { - "metaflow/attempt": attempt, - "metaflow/run_id": run_id, - "metaflow/task_id": task_id, - } - # We don't support volumes at the moment for `@kubernetes` - volume_mounts = [ - client.V1VolumeMount(**v) for v in operator_args.get("volume_mounts", []) - ] - volumes = [client.V1Volume(**v) for v in operator_args.get("volumes", [])] - secrets = [ - Secret("env", secret, secret) for secret in operator_args.get("secrets", []) - ] - args = { - # "on_retry_callback": retry_callback, - # use the default namespace even no namespace is defined rather than airflow - "namespace": operator_args.get("namespace", "airflow"), - # image is always available - no need for a fallback - "image": operator_args.get("image", "python"), - # we should be able to have a cleaner name - take a look at the argo implementation - "name": generate_rfc1123_name(flow_name, step_name), - "task_id": step_name, - # do we need to specify args that are None? can we just rely on the system - # defaults for such args? - "random_name_suffix": None, - "cmds": operator_args.get("cmds", []), - "arguments": operator_args.get("arguments", []), - # do we use ports? - "ports": operator_args.get("ports", []), - # do we use volume mounts? - "volume_mounts": volume_mounts, - # do we use volumes? - "volumes": volumes, - "env_vars": env_vars, - # how are the values for env_from computed? - "env_from": operator_args.get("env_from", []), - "secrets": secrets, - # will this ever be false? - "in_cluster": operator_args.get( - "in_cluster", True # run kubernetes client with in_cluster configuration. - ), - "labels": operator_args.get("labels", {}), - "reattach_on_restart": False, - # is there a default value we can rely on? we would ideally like the sys-admin - # to be able to set these values inside their airflow deployment - "startup_timeout_seconds": 120, - "get_logs": True, # This needs to be set to True to ensure that doesn't error out looking for xcom - # we should use the default image pull policy - "image_pull_policy": None, - # annotations are not empty. see @kubernetes or argo-workflows - "annotations": {}, - "resources": client.V1ResourceRequirements( - # need to support disk and gpus - also the defaults don't match up to - # the expected values. let's avoid adding defaults ourselves where we can. - requests={ - "cpu": operator_args.get("cpu", 1), - "memory": operator_args.get("memory", "2000M"), - } - ), # kubernetes.client.models.v1_resource_requirements.V1ResourceRequirements - "retries": operator_args.get("retries", 0), # Base operator command - - "retry_exponential_backoff": False, # todo : should this be a arg we allow on CLI. not right now - there is an open ticket for this - maybe at some point we will. - "affinity": None, # kubernetes.client.models.v1_affinity.V1Affinity - "config_file": None, - # image_pull_secrets : typing.Union[typing.List[kubernetes.client.models.v1_local_object_reference.V1LocalObjectReference], NoneType], - "image_pull_secrets": operator_args.get("image_pull_secrets", []), - "service_account_name": operator_args.get( # Service account names can be essential for passing reference to IAM roles etc. - "service_account_name", None - ), - # let's rely on the default values - "is_delete_operator_pod": operator_args.get( - "is_delete_operator_pod", False - ), # if set to true will delete the pod once finished /failed execution. By default it is true - # let's rely on the default values - "hostnetwork": False, # If True enable host networking on the pod. - "security_context": {}, - "log_events_on_failure": True, - "do_xcom_push": True, - } - args["labels"].update(labels) if operator_args.get("execution_timeout", None): args["execution_timeout"] = timedelta( **operator_args.get( @@ -340,28 +259,6 @@ def set_k8s_operator_args(flow_name, step_name, operator_args): args["retry_delay"] = timedelta(**operator_args.get("retry_delay")) return args -# why do we need a separate method for this function? can we just embed the logic in -# _kubernetes_task? -def get_k8s_operator(): - - try: - from airflow.contrib.operators.kubernetes_pod_operator import ( - KubernetesPodOperator, - ) - except ImportError: - try: - from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( - KubernetesPodOperator, - ) - except ImportError as e: - raise KubernetesProviderNotFound( - "This DAG requires a `KubernetesPodOperator`. " - "Install the Airflow Kubernetes provider using : " - "`pip install apache-airflow-providers-cncf-kubernetes`" - ) - - return KubernetesPodOperator - def _parse_sensor_args(name, kwargs): if name == SensorNames.EXTERNAL_TASK_SENSOR: @@ -438,8 +335,22 @@ def from_dict(cls, jsd, flow_name=None): ).set_operator_args(**op_args) def _kubenetes_task(self): - KubernetesPodOperator = get_k8s_operator() - k8s_args = set_k8s_operator_args( + try: + from airflow.contrib.operators.kubernetes_pod_operator import ( + KubernetesPodOperator, + ) + except ImportError: + try: + from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator, + ) + except ImportError as e: + raise KubernetesProviderNotFound( + "This DAG requires a `KubernetesPodOperator`. " + "Install the Airflow Kubernetes provider using : " + "`pip install apache-airflow-providers-cncf-kubernetes`" + ) + k8s_args = _kubernetes_pod_operator_args( self._flow_name, self.name, self._operator_args ) return KubernetesPodOperator(**k8s_args) @@ -491,7 +402,7 @@ def from_dict(cls, data_dict): for sd in data_dict["states"].values(): re_cls.add_state( AirflowTask.from_dict( - sd, flow_name=re_cls._dag_instantiation_params.arguements["dag_id"] + sd, flow_name=re_cls._dag_instantiation_params.arguments["dag_id"] ) ) re_cls.set_parameters(data_dict["metaflow_params"]) @@ -521,7 +432,7 @@ def compile(self): # DAG Params can be seen here : # https://airflow.apache.org/docs/apache-airflow/2.0.0/_api/airflow/models/dag/index.html#airflow.models.dag.DAG # Airflow 2.0.0 Allows setting Params. - dag = DAG(params=params_dict, **self._dag_instantiation_params.arguements) + dag = DAG(params=params_dict, **self._dag_instantiation_params.arguments) dag.fileloc = self._file_path if self._file_path is not None else dag.fileloc def add_node(node, parents, dag):