Skip to content

Commit

Permalink
Bug fixes WRT Kubernetes secrets + k8s deployments.
Browse files Browse the repository at this point in the history
- Fixing some error messages.
- Added some comments.
  • Loading branch information
valayDave committed Apr 10, 2022
1 parent 04c92b9 commit eb775cb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
2 changes: 1 addition & 1 deletion metaflow/plugins/airflow/airflow_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
"with": [
decorator.make_decorator_spec()
for decorator in node.decorators
if not decorator.statically_defined and decorator.name != "kubernetes"
if not decorator.statically_defined
]
}
# FlowDecorators can define their own top-level options. They are
Expand Down
58 changes: 42 additions & 16 deletions metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import re
from datetime import timedelta, datetime


class KubernetesProviderNotFound(Exception):
headline = "Kubernetes provider not found"


LABEL_VALUE_REGEX = re.compile(r"^[a-zA-Z0-9]([a-zA-Z0-9\-\_\.]{0,61}[a-zA-Z0-9])?$")

TASK_ID_XCOM_KEY = "metaflow_task_id"
Expand Down Expand Up @@ -59,8 +64,9 @@ def json_dump(val):

class AirflowDAGArgs(object):
# _arg_types This object helps map types of
# different keys that need to be parsed. None of the values in this
# dictionary are being used.
# different keys that need to be parsed. None of the "values" in this
# dictionary are being used. But the "types" of the values of are used when
# reparsing the arguments from the config variable.
_arg_types = {
"dag_id": "asdf",
"description": "asdfasf",
Expand Down Expand Up @@ -182,19 +188,35 @@ def set_k8s_operator_args(flow_name, step_name, operator_args):
run_id = "arf-{{ run_id | hash }}" # hash is added via the `user_defined_filters`
attempt = "{{ task_instance.try_number - 1 }}"
# Set dynamic env variables like run-id, task-id etc from here.
env_vars = [
client.V1EnvVar(name=v["name"], value=str(v["value"]))
for v in operator_args.get("env_vars", [])
] + [
client.V1EnvVar(name=k, value=str(v))
for k, v in dict(
METAFLOW_RUN_ID=run_id,
METAFLOW_AIRFLOW_TASK_ID=task_id,
METAFLOW_AIRFLOW_DAG_RUN_ID="{{run_id}}",
METAFLOW_AIRFLOW_JOB_ID="{{ti.job_id}}",
METAFLOW_ATTEMPT_NUMBER=attempt,
).items()
]
env_vars = (
[
client.V1EnvVar(name=v["name"], value=str(v["value"]))
for v in operator_args.get("env_vars", [])
]
+ [
client.V1EnvVar(name=k, value=str(v))
for k, v in dict(
METAFLOW_RUN_ID=run_id,
METAFLOW_AIRFLOW_TASK_ID=task_id,
METAFLOW_AIRFLOW_DAG_RUN_ID="{{run_id}}",
METAFLOW_AIRFLOW_JOB_ID="{{ti.job_id}}",
METAFLOW_ATTEMPT_NUMBER=attempt,
).items()
]
+ [
client.V1EnvVar(
name=k,
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(field_path=str(v))
),
)
for k, v in {
"METAFLOW_KUBERNETES_POD_NAMESPACE": "metadata.namespace",
"METAFLOW_KUBERNETES_POD_NAME": "metadata.name",
"METAFLOW_KUBERNETES_POD_ID": "metadata.uid",
}.items()
]
)

labels = {
"metaflow/attempt": attempt,
Expand Down Expand Up @@ -295,7 +317,11 @@ def get_k8s_operator():
)
except ImportError as e:
# todo : Fix error messages.
raise e
raise KubernetesProviderNotFound(
"Running this DAG requires KubernetesPodOperator. "
"Install Airflow Kubernetes provider using : "
"`pip install apache-airflow-providers-cncf-kubernetes`"
)

return KubernetesPodOperator

Expand Down
4 changes: 3 additions & 1 deletion metaflow/plugins/airflow/compute/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def create_k8s_args(
if service_account is None
else service_account,
# todo : pass secrets from metaflow to Kubernetes via airflow
secrets=secrets,
node_selector=node_selector,
cmds=k8s._command(
code_package_url=code_package_url,
Expand All @@ -105,6 +104,9 @@ def create_k8s_args(
labels=labels,
is_delete_operator_pod=True,
)
if secrets:
k8s_operator_args["secrets"] = secrets

return k8s_operator_args


Expand Down

0 comments on commit eb775cb

Please sign in to comment.