Skip to content

Commit

Permalink
Changes based on PR comments:
Browse files Browse the repository at this point in the history
- removed airflow xcom push file , moved to decorator code
- removed prefix configuration
- nit fixes.
  • Loading branch information
valayDave committed Apr 14, 2022
1 parent 9e622ba commit b2970dd
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 44 deletions.
4 changes: 0 additions & 4 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ def from_conf(name, default=None):
)
#

###
# Airflow Configuration
###
AIRFLOW_STATE_MACHINE_PREFIX = from_conf("METAFLOW_AIRFLOW_STATE_MACHINE_PREFIX")

###
# Conda configuration
Expand Down
16 changes: 6 additions & 10 deletions metaflow/plugins/airflow/airflow_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,24 @@
VALID_NAME = re.compile("[^a-zA-Z0-9_\-\.]")


def resolve_state_machine_name(name):
def attach_prefix(name):
if AIRFLOW_STATE_MACHINE_PREFIX is not None:
return AIRFLOW_STATE_MACHINE_PREFIX + "_" + name
return name
def resolve_dag_name(name):

project = current.get("project_name")
if project:
if name:
raise MetaflowException(
"--name is not supported for @projects. " "Use --branch instead."
)
state_machine_name = attach_prefix(current.project_flow_name)
dag_name = current.project_flow_name
is_project = True
else:
if name and VALID_NAME.search(name):
raise MetaflowException("Name '%s' contains invalid characters." % name)

state_machine_name = attach_prefix(name if name else current.flow_name)
dag_name = name if name else current.flow_name
is_project = False

return state_machine_name, is_project
return dag_name, is_project


@click.group()
Expand Down Expand Up @@ -72,7 +68,7 @@ def make_flow(
package_url, package_sha = obj.flow_datastore.save_data(
[obj.package.blob], len_hint=1
)[0]
flow_name, is_project = resolve_state_machine_name(dag_name)
flow_name, is_project = resolve_dag_name(dag_name)
return Airflow(
flow_name,
obj.graph,
Expand Down Expand Up @@ -130,7 +126,7 @@ def make_flow(
"--max-workers",
default=100,
show_default=True,
help="Maximum number of concurrent airflow tasks to run for the DAG. ",
help="Maximum number of concurrent Airflow tasks.",
)
@click.option(
"--worker-pool",
Expand Down
8 changes: 4 additions & 4 deletions metaflow/plugins/airflow/airflow_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ def _to_job(self, node: DAGNode):
# or join_in_foreach
# ):

# # Todo : Find ways to pass state using for the below usecases:
# # 1. To set the cardinality of foreaches
# # 2. To set the input paths from the parent steps of a foreach join.
# # 3. To read the input paths in a foreach join.
# Todo : Find ways to pass state using for the below usecases:
# 1. To set the cardinality of foreaches
# 2. To set the input paths from the parent steps of a foreach join.
# 3. To read the input paths in a foreach join.

compute_type = "k8s" # todo : This will become more dynamic in the future.
if compute_type == "k8s":
Expand Down
23 changes: 15 additions & 8 deletions metaflow/plugins/airflow/airflow_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,23 @@

from metaflow.decorators import StepDecorator
from metaflow.metadata import MetaDatum
from .plumbing.airflow_xcom_push import push_xcom_values
from .airflow_utils import TASK_ID_XCOM_KEY


K8S_XCOM_DIR_PATH = "/airflow/xcom"

def safe_mkdir(dir):
try:
os.makedirs(dir)
except FileExistsError:
pass


def push_xcom_values(xcom_dict):
safe_mkdir(K8S_XCOM_DIR_PATH)
with open(os.path.join(K8S_XCOM_DIR_PATH, "return.json"), "w") as f:
json.dump(xcom_dict, f)

class AirflowInternalDecorator(StepDecorator):
name = "airflow_internal"

Expand All @@ -25,11 +38,10 @@ def task_pre_step(
ubf_context,
inputs,
):
# todo : find out where the execution is taking place.
# find out where the execution is taking place.
# Once figured where the execution is happening then we can do
# handle xcom push / pull differently
meta = {}
meta["airflow-execution"] = os.environ["METAFLOW_RUN_ID"]
meta["airflow-dag-run-id"] = os.environ["METAFLOW_AIRFLOW_DAG_RUN_ID"]
meta["airflow-job-id"] = os.environ["METAFLOW_AIRFLOW_JOB_ID"]
entries = [
Expand All @@ -46,8 +58,3 @@ def task_pre_step(
}
)

def task_finished(
self, step_name, flow, graph, is_task_ok, retry_count, max_user_code_retries
):
pass
# todo : Figure ways to find out foreach cardinality over here,
18 changes: 0 additions & 18 deletions metaflow/plugins/airflow/plumbing/airflow_xcom_push.py

This file was deleted.

0 comments on commit b2970dd

Please sign in to comment.